1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 * Copyright 2025 MNX Cloud, Inc. 26 */ 27 28 #include "lint.h" 29 #include "thr_uberdata.h" 30 #include "libc.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fcntl(int, int, ...); 47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48 49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50 static void _aiodone(aio_req_t *, ssize_t, int); 51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53 54 /* 55 * switch for kernel async I/O 56 */ 57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58 59 /* 60 * Key for thread-specific data 61 */ 62 pthread_key_t _aio_key; 63 64 /* 65 * Array for determining whether or not a file supports kaio. 66 * Initialized in _kaio_init(). 67 */ 68 uint32_t *_kaio_supported = NULL; 69 70 /* 71 * workers for read/write requests 72 * (__aio_mutex lock protects circular linked list of workers) 73 */ 74 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76 int __rw_workerscnt; /* number of read/write workers */ 77 78 /* 79 * worker for notification requests. 80 */ 81 aio_worker_t *__workers_no; /* circular list of AIO workers */ 82 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83 int __no_workerscnt; /* number of write workers */ 84 85 aio_req_t *_aio_done_tail; /* list of done requests */ 86 aio_req_t *_aio_done_head; 87 88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89 cond_t __aio_initcv = DEFAULTCV; 90 int __aio_initbusy = 0; 91 92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94 95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97 98 aio_hash_t *_aio_hash; 99 100 aio_req_t *_aio_doneq; /* double linked done queue list */ 101 102 int _aio_donecnt = 0; 103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104 int _aio_doneq_cnt = 0; 105 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110 111 int _max_workers = 256; /* max number of workers permitted */ 112 int _min_workers = 4; /* min number of workers */ 113 int _minworkload = 2; /* min number of request in q */ 114 int _aio_worker_cnt = 0; /* number of workers to do requests */ 115 int __uaio_ok = 0; /* AIO has been enabled */ 116 sigset_t _worker_set; /* worker's signal mask */ 117 118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119 int _aio_flags = 0; /* see asyncio.h defines for */ 120 121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122 123 int hz; /* clock ticks per second */ 124 125 static int 126 _kaio_supported_init(void) 127 { 128 void *ptr; 129 size_t size; 130 131 if (_kaio_supported != NULL) /* already initialized */ 132 return (0); 133 134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137 if (ptr == MAP_FAILED) 138 return (-1); 139 _kaio_supported = ptr; 140 return (0); 141 } 142 143 /* 144 * The aio subsystem is initialized when an AIO request is made. 145 * Constants are initialized like the max number of workers that 146 * the subsystem can create, and the minimum number of workers 147 * permitted before imposing some restrictions. Also, some 148 * workers are created. 149 */ 150 int 151 __uaio_init(void) 152 { 153 int ret = -1; 154 int i; 155 int cancel_state; 156 157 lmutex_lock(&__aio_initlock); 158 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 159 while (__aio_initbusy) 160 (void) cond_wait(&__aio_initcv, &__aio_initlock); 161 (void) pthread_setcancelstate(cancel_state, NULL); 162 if (__uaio_ok) { /* already initialized */ 163 lmutex_unlock(&__aio_initlock); 164 return (0); 165 } 166 __aio_initbusy = 1; 167 lmutex_unlock(&__aio_initlock); 168 169 hz = (int)sysconf(_SC_CLK_TCK); 170 __pid = getpid(); 171 172 setup_cancelsig(SIGAIOCANCEL); 173 174 if (_kaio_supported_init() != 0) 175 goto out; 176 177 /* 178 * Allocate and initialize the hash table. 179 * Do this only once, even if __uaio_init() is called twice. 180 */ 181 if (_aio_hash == NULL) { 182 /* LINTED pointer cast */ 183 _aio_hash = (aio_hash_t *)mmap(NULL, 184 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 185 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 186 if ((void *)_aio_hash == MAP_FAILED) { 187 _aio_hash = NULL; 188 goto out; 189 } 190 for (i = 0; i < HASHSZ; i++) 191 (void) mutex_init(&_aio_hash[i].hash_lock, 192 USYNC_THREAD, NULL); 193 } 194 195 /* 196 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 197 */ 198 (void) sigfillset(&_worker_set); 199 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 200 201 /* 202 * Create one worker to send asynchronous notifications. 203 * Do this only once, even if __uaio_init() is called twice. 204 */ 205 if (__no_workerscnt == 0 && 206 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 207 errno = EAGAIN; 208 goto out; 209 } 210 211 /* 212 * Create the minimum number of read/write workers. 213 * And later check whether atleast one worker is created; 214 * lwp_create() calls could fail because of segkp exhaustion. 215 */ 216 for (i = 0; i < _min_workers; i++) 217 (void) _aio_create_worker(NULL, AIOREAD); 218 if (__rw_workerscnt == 0) { 219 errno = EAGAIN; 220 goto out; 221 } 222 223 ret = 0; 224 out: 225 lmutex_lock(&__aio_initlock); 226 if (ret == 0) 227 __uaio_ok = 1; 228 __aio_initbusy = 0; 229 (void) cond_broadcast(&__aio_initcv); 230 lmutex_unlock(&__aio_initlock); 231 return (ret); 232 } 233 234 /* 235 * Called from close() before actually performing the real _close(). 236 */ 237 void 238 _aio_close(int fd) 239 { 240 if (fd < 0) /* avoid cancelling everything */ 241 return; 242 /* 243 * Cancel all outstanding aio requests for this file descriptor. 244 */ 245 if (__uaio_ok) 246 (void) aiocancel_all(fd); 247 /* 248 * If we have allocated the bit array, clear the bit for this file. 249 * The next open may re-use this file descriptor and the new file 250 * may have different kaio() behaviour. 251 */ 252 if (_kaio_supported != NULL) 253 CLEAR_KAIO_SUPPORTED(fd); 254 } 255 256 /* 257 * special kaio cleanup thread sits in a loop in the 258 * kernel waiting for pending kaio requests to complete. 259 */ 260 void * 261 _kaio_cleanup_thread(void *arg) 262 { 263 if (pthread_setspecific(_aio_key, arg) != 0) 264 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 265 (void) _kaio(AIOSTART); 266 return (arg); 267 } 268 269 /* 270 * initialize kaio. 271 */ 272 void 273 _kaio_init() 274 { 275 int error; 276 sigset_t oset; 277 int cancel_state; 278 279 lmutex_lock(&__aio_initlock); 280 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state); 281 while (__aio_initbusy) 282 (void) cond_wait(&__aio_initcv, &__aio_initlock); 283 (void) pthread_setcancelstate(cancel_state, NULL); 284 if (_kaio_ok) { /* already initialized */ 285 lmutex_unlock(&__aio_initlock); 286 return; 287 } 288 __aio_initbusy = 1; 289 lmutex_unlock(&__aio_initlock); 290 291 if (_kaio_supported_init() != 0) 292 error = ENOMEM; 293 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 294 error = ENOMEM; 295 else if ((error = (int)_kaio(AIOINIT)) == 0) { 296 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 297 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 298 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 299 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 300 } 301 if (error && _kaiowp != NULL) { 302 _aio_worker_free(_kaiowp); 303 _kaiowp = NULL; 304 } 305 306 lmutex_lock(&__aio_initlock); 307 if (error) 308 _kaio_ok = -1; 309 else 310 _kaio_ok = 1; 311 __aio_initbusy = 0; 312 (void) cond_broadcast(&__aio_initcv); 313 lmutex_unlock(&__aio_initlock); 314 } 315 316 int 317 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 318 aio_result_t *resultp) 319 { 320 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 321 } 322 323 int 324 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 325 aio_result_t *resultp) 326 { 327 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 328 } 329 330 #if !defined(_LP64) 331 int 332 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 333 aio_result_t *resultp) 334 { 335 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 336 } 337 338 int 339 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 340 aio_result_t *resultp) 341 { 342 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 343 } 344 #endif /* !defined(_LP64) */ 345 346 int 347 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 348 aio_result_t *resultp, int mode) 349 { 350 aio_req_t *reqp; 351 aio_args_t *ap; 352 offset_t loffset; 353 struct stat64 stat64; 354 int error = 0; 355 int kerr; 356 int umode; 357 358 switch (whence) { 359 360 case SEEK_SET: 361 loffset = offset; 362 break; 363 case SEEK_CUR: 364 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 365 error = -1; 366 else 367 loffset += offset; 368 break; 369 case SEEK_END: 370 if (fstat64(fd, &stat64) == -1) 371 error = -1; 372 else 373 loffset = offset + stat64.st_size; 374 break; 375 default: 376 errno = EINVAL; 377 error = -1; 378 } 379 380 if (error) 381 return (error); 382 383 /* initialize kaio */ 384 if (!_kaio_ok) 385 _kaio_init(); 386 387 /* 388 * _aio_do_request() needs the original request code (mode) to be able 389 * to choose the appropiate 32/64 bit function. All other functions 390 * only require the difference between READ and WRITE (umode). 391 */ 392 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 393 umode = mode - AIOAREAD64; 394 else 395 umode = mode; 396 397 /* 398 * Try kernel aio first. 399 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 400 */ 401 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 402 resultp->aio_errno = 0; 403 sig_mutex_lock(&__aio_mutex); 404 _kaio_outstand_cnt++; 405 sig_mutex_unlock(&__aio_mutex); 406 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 407 (umode | AIO_POLL_BIT) : umode), 408 fd, buf, bufsz, loffset, resultp); 409 if (kerr == 0) { 410 return (0); 411 } 412 sig_mutex_lock(&__aio_mutex); 413 _kaio_outstand_cnt--; 414 sig_mutex_unlock(&__aio_mutex); 415 if (errno != ENOTSUP && errno != EBADFD) 416 return (-1); 417 if (errno == EBADFD) 418 SET_KAIO_NOT_SUPPORTED(fd); 419 } 420 421 if (!__uaio_ok && __uaio_init() == -1) 422 return (-1); 423 424 if ((reqp = _aio_req_alloc()) == NULL) { 425 errno = EAGAIN; 426 return (-1); 427 } 428 429 /* 430 * _aio_do_request() checks reqp->req_op to differentiate 431 * between 32 and 64 bit access. 432 */ 433 reqp->req_op = mode; 434 reqp->req_resultp = resultp; 435 ap = &reqp->req_args; 436 ap->fd = fd; 437 ap->buf = buf; 438 ap->bufsz = bufsz; 439 ap->offset = loffset; 440 441 if (_aio_hash_insert(resultp, reqp) != 0) { 442 _aio_req_free(reqp); 443 errno = EINVAL; 444 return (-1); 445 } 446 /* 447 * _aio_req_add() only needs the difference between READ and 448 * WRITE to choose the right worker queue. 449 */ 450 _aio_req_add(reqp, &__nextworker_rw, umode); 451 return (0); 452 } 453 454 int 455 aiocancel(aio_result_t *resultp) 456 { 457 aio_req_t *reqp; 458 aio_worker_t *aiowp; 459 int ret; 460 int done = 0; 461 int canceled = 0; 462 463 if (!__uaio_ok) { 464 errno = EINVAL; 465 return (-1); 466 } 467 468 sig_mutex_lock(&__aio_mutex); 469 reqp = _aio_hash_find(resultp); 470 if (reqp == NULL) { 471 if (_aio_outstand_cnt == _aio_req_done_cnt) 472 errno = EINVAL; 473 else 474 errno = EACCES; 475 ret = -1; 476 } else { 477 aiowp = reqp->req_worker; 478 sig_mutex_lock(&aiowp->work_qlock1); 479 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 480 sig_mutex_unlock(&aiowp->work_qlock1); 481 482 if (canceled) { 483 ret = 0; 484 } else { 485 if (_aio_outstand_cnt == 0 || 486 _aio_outstand_cnt == _aio_req_done_cnt) 487 errno = EINVAL; 488 else 489 errno = EACCES; 490 ret = -1; 491 } 492 } 493 sig_mutex_unlock(&__aio_mutex); 494 return (ret); 495 } 496 497 static void 498 _aiowait_cleanup(void *arg __unused) 499 { 500 sig_mutex_lock(&__aio_mutex); 501 _aiowait_flag--; 502 sig_mutex_unlock(&__aio_mutex); 503 } 504 505 /* 506 * This must be asynch safe and cancel safe 507 */ 508 aio_result_t * 509 aiowait(struct timeval *uwait) 510 { 511 aio_result_t *uresultp; 512 aio_result_t *kresultp; 513 aio_result_t *resultp; 514 int dontblock; 515 int timedwait = 0; 516 int kaio_errno = 0; 517 struct timeval twait; 518 struct timeval *wait = NULL; 519 hrtime_t hrtend; 520 hrtime_t hres; 521 522 if (uwait) { 523 /* 524 * Check for a valid specified wait time. 525 * If it is invalid, fail the call right away. 526 */ 527 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 528 uwait->tv_usec >= MICROSEC) { 529 errno = EINVAL; 530 return ((aio_result_t *)-1); 531 } 532 533 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 534 hrtend = gethrtime() + 535 (hrtime_t)uwait->tv_sec * NANOSEC + 536 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 537 twait = *uwait; 538 wait = &twait; 539 timedwait++; 540 } else { 541 /* polling */ 542 sig_mutex_lock(&__aio_mutex); 543 if (_kaio_outstand_cnt == 0) { 544 kresultp = (aio_result_t *)-1; 545 } else { 546 kresultp = (aio_result_t *)_kaio(AIOWAIT, 547 (struct timeval *)-1, 1); 548 if (kresultp != (aio_result_t *)-1 && 549 kresultp != NULL && 550 kresultp != (aio_result_t *)1) { 551 _kaio_outstand_cnt--; 552 sig_mutex_unlock(&__aio_mutex); 553 return (kresultp); 554 } 555 } 556 uresultp = _aio_req_done(); 557 sig_mutex_unlock(&__aio_mutex); 558 if (uresultp != NULL && 559 uresultp != (aio_result_t *)-1) { 560 return (uresultp); 561 } 562 if (uresultp == (aio_result_t *)-1 && 563 kresultp == (aio_result_t *)-1) { 564 errno = EINVAL; 565 return ((aio_result_t *)-1); 566 } else { 567 return (NULL); 568 } 569 } 570 } 571 572 for (;;) { 573 sig_mutex_lock(&__aio_mutex); 574 uresultp = _aio_req_done(); 575 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 576 sig_mutex_unlock(&__aio_mutex); 577 resultp = uresultp; 578 break; 579 } 580 _aiowait_flag++; 581 dontblock = (uresultp == (aio_result_t *)-1); 582 if (dontblock && _kaio_outstand_cnt == 0) { 583 kresultp = (aio_result_t *)-1; 584 kaio_errno = EINVAL; 585 } else { 586 sig_mutex_unlock(&__aio_mutex); 587 pthread_cleanup_push(_aiowait_cleanup, NULL); 588 _cancel_prologue(); 589 kresultp = (aio_result_t *)_kaio(AIOWAIT, 590 wait, dontblock); 591 _cancel_epilogue(); 592 pthread_cleanup_pop(0); 593 sig_mutex_lock(&__aio_mutex); 594 kaio_errno = errno; 595 } 596 _aiowait_flag--; 597 sig_mutex_unlock(&__aio_mutex); 598 if (kresultp == (aio_result_t *)1) { 599 /* aiowait() awakened by an aionotify() */ 600 continue; 601 } else if (kresultp != NULL && 602 kresultp != (aio_result_t *)-1) { 603 resultp = kresultp; 604 sig_mutex_lock(&__aio_mutex); 605 _kaio_outstand_cnt--; 606 sig_mutex_unlock(&__aio_mutex); 607 break; 608 } else if (kresultp == (aio_result_t *)-1 && 609 kaio_errno == EINVAL && 610 uresultp == (aio_result_t *)-1) { 611 errno = kaio_errno; 612 resultp = (aio_result_t *)-1; 613 break; 614 } else if (kresultp == (aio_result_t *)-1 && 615 kaio_errno == EINTR) { 616 errno = kaio_errno; 617 resultp = (aio_result_t *)-1; 618 break; 619 } else if (timedwait) { 620 hres = hrtend - gethrtime(); 621 if (hres <= 0) { 622 /* time is up; return */ 623 resultp = NULL; 624 break; 625 } else { 626 /* 627 * Some time left. Round up the remaining time 628 * in nanoseconds to microsec. Retry the call. 629 */ 630 hres += (NANOSEC / MICROSEC) - 1; 631 wait->tv_sec = hres / NANOSEC; 632 wait->tv_usec = 633 (hres % NANOSEC) / (NANOSEC / MICROSEC); 634 } 635 } else { 636 ASSERT(kresultp == NULL && uresultp == NULL); 637 resultp = NULL; 638 continue; 639 } 640 } 641 return (resultp); 642 } 643 644 /* 645 * _aio_get_timedelta calculates the remaining time and stores the result 646 * into timespec_t *wait. 647 */ 648 649 int 650 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 651 { 652 int ret = 0; 653 struct timeval cur; 654 timespec_t curtime; 655 656 (void) gettimeofday(&cur, NULL); 657 curtime.tv_sec = cur.tv_sec; 658 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 659 660 if (end->tv_sec >= curtime.tv_sec) { 661 wait->tv_sec = end->tv_sec - curtime.tv_sec; 662 if (end->tv_nsec >= curtime.tv_nsec) { 663 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 664 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 665 ret = -1; /* timer expired */ 666 } else { 667 if (end->tv_sec > curtime.tv_sec) { 668 wait->tv_sec -= 1; 669 wait->tv_nsec = NANOSEC - 670 (curtime.tv_nsec - end->tv_nsec); 671 } else { 672 ret = -1; /* timer expired */ 673 } 674 } 675 } else { 676 ret = -1; 677 } 678 return (ret); 679 } 680 681 /* 682 * If closing by file descriptor: we will simply cancel all the outstanding 683 * aio`s and return. Those aio's in question will have either noticed the 684 * cancellation notice before, during, or after initiating io. 685 */ 686 int 687 aiocancel_all(int fd) 688 { 689 aio_req_t *reqp; 690 aio_req_t **reqpp, *last; 691 aio_worker_t *first; 692 aio_worker_t *next; 693 int canceled = 0; 694 int done = 0; 695 int cancelall = 0; 696 697 sig_mutex_lock(&__aio_mutex); 698 699 if (_aio_outstand_cnt == 0) { 700 sig_mutex_unlock(&__aio_mutex); 701 return (AIO_ALLDONE); 702 } 703 704 /* 705 * Cancel requests from the read/write workers' queues. 706 */ 707 first = __nextworker_rw; 708 next = first; 709 do { 710 _aio_cancel_work(next, fd, &canceled, &done); 711 } while ((next = next->work_forw) != first); 712 713 /* 714 * finally, check if there are requests on the done queue that 715 * should be canceled. 716 */ 717 if (fd < 0) 718 cancelall = 1; 719 reqpp = &_aio_done_tail; 720 last = _aio_done_tail; 721 while ((reqp = *reqpp) != NULL) { 722 if (cancelall || reqp->req_args.fd == fd) { 723 *reqpp = reqp->req_next; 724 if (last == reqp) { 725 last = reqp->req_next; 726 } 727 if (_aio_done_head == reqp) { 728 /* this should be the last req in list */ 729 _aio_done_head = last; 730 } 731 _aio_donecnt--; 732 _aio_set_result(reqp, -1, ECANCELED); 733 (void) _aio_hash_del(reqp->req_resultp); 734 _aio_req_free(reqp); 735 } else { 736 reqpp = &reqp->req_next; 737 last = reqp; 738 } 739 } 740 741 if (cancelall) { 742 ASSERT(_aio_donecnt == 0); 743 _aio_done_head = NULL; 744 } 745 sig_mutex_unlock(&__aio_mutex); 746 747 if (canceled && done == 0) 748 return (AIO_CANCELED); 749 else if (done && canceled == 0) 750 return (AIO_ALLDONE); 751 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 752 return ((int)_kaio(AIOCANCEL, fd, NULL)); 753 return (AIO_NOTCANCELED); 754 } 755 756 /* 757 * Cancel requests from a given work queue. If the file descriptor 758 * parameter, fd, is non-negative, then only cancel those requests 759 * in this queue that are to this file descriptor. If the fd 760 * parameter is -1, then cancel all requests. 761 */ 762 static void 763 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 764 { 765 aio_req_t *reqp; 766 767 sig_mutex_lock(&aiowp->work_qlock1); 768 /* 769 * cancel queued requests first. 770 */ 771 reqp = aiowp->work_tail1; 772 while (reqp != NULL) { 773 if (fd < 0 || reqp->req_args.fd == fd) { 774 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 775 /* 776 * Callers locks were dropped. 777 * reqp is invalid; start traversing 778 * the list from the beginning again. 779 */ 780 reqp = aiowp->work_tail1; 781 continue; 782 } 783 } 784 reqp = reqp->req_next; 785 } 786 /* 787 * Since the queued requests have been canceled, there can 788 * only be one inprogress request that should be canceled. 789 */ 790 if ((reqp = aiowp->work_req) != NULL && 791 (fd < 0 || reqp->req_args.fd == fd)) 792 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 793 sig_mutex_unlock(&aiowp->work_qlock1); 794 } 795 796 /* 797 * Cancel a request. Return 1 if the callers locks were temporarily 798 * dropped, otherwise return 0. 799 */ 800 int 801 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 802 { 803 int ostate = reqp->req_state; 804 805 ASSERT(MUTEX_HELD(&__aio_mutex)); 806 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 807 if (ostate == AIO_REQ_CANCELED) 808 return (0); 809 if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) && 810 aiowp->work_prev1 == reqp) { 811 ASSERT(aiowp->work_done1 != 0); 812 /* 813 * If not on the done queue yet, just mark it CANCELED, 814 * _aio_work_done() will do the necessary clean up. 815 * This is required to ensure that aiocancel_all() cancels 816 * all the outstanding requests, including this one which 817 * is not yet on done queue but has been marked done. 818 */ 819 _aio_set_result(reqp, -1, ECANCELED); 820 (void) _aio_hash_del(reqp->req_resultp); 821 reqp->req_state = AIO_REQ_CANCELED; 822 (*canceled)++; 823 return (0); 824 } 825 826 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 827 (*done)++; 828 return (0); 829 } 830 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 831 ASSERT(POSIX_AIO(reqp)); 832 /* Cancel the queued aio_fsync() request */ 833 if (!reqp->req_head->lio_canned) { 834 reqp->req_head->lio_canned = 1; 835 _aio_outstand_cnt--; 836 (*canceled)++; 837 } 838 return (0); 839 } 840 reqp->req_state = AIO_REQ_CANCELED; 841 _aio_req_del(aiowp, reqp, ostate); 842 (void) _aio_hash_del(reqp->req_resultp); 843 (*canceled)++; 844 if (reqp == aiowp->work_req) { 845 ASSERT(ostate == AIO_REQ_INPROGRESS); 846 /* 847 * Set the result values now, before _aiodone() is called. 848 * We do this because the application can expect aio_return 849 * and aio_errno to be set to -1 and ECANCELED, respectively, 850 * immediately after a successful return from aiocancel() 851 * or aio_cancel(). 852 */ 853 _aio_set_result(reqp, -1, ECANCELED); 854 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 855 return (0); 856 } 857 if (!POSIX_AIO(reqp)) { 858 _aio_outstand_cnt--; 859 _aio_set_result(reqp, -1, ECANCELED); 860 _aio_req_free(reqp); 861 return (0); 862 } 863 sig_mutex_unlock(&aiowp->work_qlock1); 864 sig_mutex_unlock(&__aio_mutex); 865 _aiodone(reqp, -1, ECANCELED); 866 sig_mutex_lock(&__aio_mutex); 867 sig_mutex_lock(&aiowp->work_qlock1); 868 return (1); 869 } 870 871 int 872 _aio_create_worker(aio_req_t *reqp, int mode) 873 { 874 aio_worker_t *aiowp, **workers, **nextworker; 875 int *aio_workerscnt; 876 void *(*func)(void *); 877 sigset_t oset; 878 int error; 879 880 /* 881 * Put the new worker thread in the right queue. 882 */ 883 switch (mode) { 884 case AIOREAD: 885 case AIOWRITE: 886 case AIOAREAD: 887 case AIOAWRITE: 888 #if !defined(_LP64) 889 case AIOAREAD64: 890 case AIOAWRITE64: 891 #endif 892 workers = &__workers_rw; 893 nextworker = &__nextworker_rw; 894 aio_workerscnt = &__rw_workerscnt; 895 func = _aio_do_request; 896 break; 897 case AIONOTIFY: 898 workers = &__workers_no; 899 nextworker = &__nextworker_no; 900 func = _aio_do_notify; 901 aio_workerscnt = &__no_workerscnt; 902 break; 903 default: 904 aio_panic("_aio_create_worker: invalid mode"); 905 break; 906 } 907 908 if ((aiowp = _aio_worker_alloc()) == NULL) 909 return (-1); 910 911 if (reqp) { 912 reqp->req_state = AIO_REQ_QUEUED; 913 reqp->req_worker = aiowp; 914 aiowp->work_head1 = reqp; 915 aiowp->work_tail1 = reqp; 916 aiowp->work_next1 = reqp; 917 aiowp->work_count1 = 1; 918 aiowp->work_minload1 = 1; 919 } 920 921 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 922 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 923 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 924 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 925 if (error) { 926 if (reqp) { 927 reqp->req_state = 0; 928 reqp->req_worker = NULL; 929 } 930 _aio_worker_free(aiowp); 931 return (-1); 932 } 933 934 lmutex_lock(&__aio_mutex); 935 (*aio_workerscnt)++; 936 if (*workers == NULL) { 937 aiowp->work_forw = aiowp; 938 aiowp->work_backw = aiowp; 939 *nextworker = aiowp; 940 *workers = aiowp; 941 } else { 942 aiowp->work_backw = (*workers)->work_backw; 943 aiowp->work_forw = (*workers); 944 (*workers)->work_backw->work_forw = aiowp; 945 (*workers)->work_backw = aiowp; 946 } 947 _aio_worker_cnt++; 948 lmutex_unlock(&__aio_mutex); 949 950 (void) thr_continue(aiowp->work_tid); 951 952 return (0); 953 } 954 955 /* 956 * This is the worker's main routine. 957 * The task of this function is to execute all queued requests; 958 * once the last pending request is executed this function will block 959 * in _aio_idle(). A new incoming request must wakeup this thread to 960 * restart the work. 961 * Every worker has an own work queue. The queue lock is required 962 * to synchronize the addition of new requests for this worker or 963 * cancellation of pending/running requests. 964 * 965 * Cancellation scenarios: 966 * The cancellation of a request is being done asynchronously using 967 * _aio_cancel_req() from another thread context. 968 * A queued request can be cancelled in different manners : 969 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 970 * - lock the queue -> remove the request -> unlock the queue 971 * - this function/thread does not detect this cancellation process 972 * b) request is in progress (AIO_REQ_INPROGRESS) : 973 * - this function first allow the cancellation of the running 974 * request with the flag "work_cancel_flg=1" 975 * see _aio_req_get() -> _aio_cancel_on() 976 * During this phase, it is allowed to interrupt the worker 977 * thread running the request (this thread) using the SIGAIOCANCEL 978 * signal. 979 * Once this thread returns from the kernel (because the request 980 * is just done), then it must disable a possible cancellation 981 * and proceed to finish the request. To disable the cancellation 982 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 983 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 984 * same procedure as in a) 985 * 986 * To b) 987 * This thread uses sigsetjmp() to define the position in the code, where 988 * it wish to continue working in the case that a SIGAIOCANCEL signal 989 * is detected. 990 * Normally this thread should get the cancellation signal during the 991 * kernel phase (reading or writing). In that case the signal handler 992 * aiosigcancelhndlr() is activated using the worker thread context, 993 * which again will use the siglongjmp() function to break the standard 994 * code flow and jump to the "sigsetjmp" position, provided that 995 * "work_cancel_flg" is set to "1". 996 * Because the "work_cancel_flg" is only manipulated by this worker 997 * thread and it can only run on one CPU at a given time, it is not 998 * necessary to protect that flag with the queue lock. 999 * Returning from the kernel (read or write system call) we must 1000 * first disable the use of the SIGAIOCANCEL signal and accordingly 1001 * the use of the siglongjmp() function to prevent a possible deadlock: 1002 * - It can happens that this worker thread returns from the kernel and 1003 * blocks in "work_qlock1", 1004 * - then a second thread cancels the apparently "in progress" request 1005 * and sends the SIGAIOCANCEL signal to the worker thread, 1006 * - the worker thread gets assigned the "work_qlock1" and will returns 1007 * from the kernel, 1008 * - the kernel detects the pending signal and activates the signal 1009 * handler instead, 1010 * - if the "work_cancel_flg" is still set then the signal handler 1011 * should use siglongjmp() to cancel the "in progress" request and 1012 * it would try to acquire the same work_qlock1 in _aio_req_get() 1013 * for a second time => deadlock. 1014 * To avoid that situation we disable the cancellation of the request 1015 * in progress BEFORE we try to acquire the work_qlock1. 1016 * In that case the signal handler will not call siglongjmp() and the 1017 * worker thread will continue running the standard code flow. 1018 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 1019 * an eventually required siglongjmp() freeing the work_qlock1 and 1020 * avoiding a deadlock. 1021 */ 1022 void * 1023 _aio_do_request(void *arglist) 1024 { 1025 aio_worker_t *aiowp = (aio_worker_t *)arglist; 1026 ulwp_t *self = curthread; 1027 struct aio_args *arg; 1028 aio_req_t *reqp; /* current AIO request */ 1029 ssize_t retval; 1030 int append; 1031 int error; 1032 1033 if (pthread_setspecific(_aio_key, aiowp) != 0) 1034 aio_panic("_aio_do_request, pthread_setspecific()"); 1035 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 1036 ASSERT(aiowp->work_req == NULL); 1037 1038 /* 1039 * We resume here when an operation is cancelled. 1040 * On first entry, aiowp->work_req == NULL, so all 1041 * we do is block SIGAIOCANCEL. 1042 */ 1043 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 1044 ASSERT(self->ul_sigdefer == 0); 1045 1046 sigoff(self); /* block SIGAIOCANCEL */ 1047 if (aiowp->work_req != NULL) 1048 _aio_finish_request(aiowp, -1, ECANCELED); 1049 1050 for (;;) { 1051 /* 1052 * Put completed requests on aio_done_list. This has 1053 * to be done as part of the main loop to ensure that 1054 * we don't artificially starve any aiowait'ers. 1055 */ 1056 if (aiowp->work_done1) 1057 _aio_work_done(aiowp); 1058 1059 top: 1060 /* consume any deferred SIGAIOCANCEL signal here */ 1061 sigon(self); 1062 sigoff(self); 1063 1064 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1065 if (_aio_idle(aiowp) != 0) 1066 goto top; 1067 } 1068 arg = &reqp->req_args; 1069 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1070 reqp->req_state == AIO_REQ_CANCELED); 1071 error = 0; 1072 1073 switch (reqp->req_op) { 1074 case AIOREAD: 1075 case AIOAREAD: 1076 sigon(self); /* unblock SIGAIOCANCEL */ 1077 retval = pread(arg->fd, arg->buf, 1078 arg->bufsz, arg->offset); 1079 if (retval == -1) { 1080 if (errno == ESPIPE) { 1081 retval = read(arg->fd, 1082 arg->buf, arg->bufsz); 1083 if (retval == -1) 1084 error = errno; 1085 } else { 1086 error = errno; 1087 } 1088 } 1089 sigoff(self); /* block SIGAIOCANCEL */ 1090 break; 1091 case AIOWRITE: 1092 case AIOAWRITE: 1093 /* 1094 * The SUSv3 POSIX spec for aio_write() states: 1095 * If O_APPEND is set for the file descriptor, 1096 * write operations append to the file in the 1097 * same order as the calls were made. 1098 * but, somewhat inconsistently, it requires pwrite() 1099 * to ignore the O_APPEND setting. So we have to use 1100 * fcntl() to get the open modes and call write() for 1101 * the O_APPEND case. 1102 */ 1103 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1104 sigon(self); /* unblock SIGAIOCANCEL */ 1105 retval = append? 1106 write(arg->fd, arg->buf, arg->bufsz) : 1107 pwrite(arg->fd, arg->buf, arg->bufsz, 1108 arg->offset); 1109 if (retval == -1) { 1110 if (errno == ESPIPE) { 1111 retval = write(arg->fd, 1112 arg->buf, arg->bufsz); 1113 if (retval == -1) 1114 error = errno; 1115 } else { 1116 error = errno; 1117 } 1118 } 1119 sigoff(self); /* block SIGAIOCANCEL */ 1120 break; 1121 #if !defined(_LP64) 1122 case AIOAREAD64: 1123 sigon(self); /* unblock SIGAIOCANCEL */ 1124 retval = pread64(arg->fd, arg->buf, 1125 arg->bufsz, arg->offset); 1126 if (retval == -1) { 1127 if (errno == ESPIPE) { 1128 retval = read(arg->fd, 1129 arg->buf, arg->bufsz); 1130 if (retval == -1) 1131 error = errno; 1132 } else { 1133 error = errno; 1134 } 1135 } 1136 sigoff(self); /* block SIGAIOCANCEL */ 1137 break; 1138 case AIOAWRITE64: 1139 /* 1140 * The SUSv3 POSIX spec for aio_write() states: 1141 * If O_APPEND is set for the file descriptor, 1142 * write operations append to the file in the 1143 * same order as the calls were made. 1144 * but, somewhat inconsistently, it requires pwrite() 1145 * to ignore the O_APPEND setting. So we have to use 1146 * fcntl() to get the open modes and call write() for 1147 * the O_APPEND case. 1148 */ 1149 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND); 1150 sigon(self); /* unblock SIGAIOCANCEL */ 1151 retval = append? 1152 write(arg->fd, arg->buf, arg->bufsz) : 1153 pwrite64(arg->fd, arg->buf, arg->bufsz, 1154 arg->offset); 1155 if (retval == -1) { 1156 if (errno == ESPIPE) { 1157 retval = write(arg->fd, 1158 arg->buf, arg->bufsz); 1159 if (retval == -1) 1160 error = errno; 1161 } else { 1162 error = errno; 1163 } 1164 } 1165 sigoff(self); /* block SIGAIOCANCEL */ 1166 break; 1167 #endif /* !defined(_LP64) */ 1168 case AIOFSYNC: 1169 if (_aio_fsync_del(aiowp, reqp)) 1170 goto top; 1171 ASSERT(reqp->req_head == NULL); 1172 /* 1173 * All writes for this fsync request are now 1174 * acknowledged. Now make these writes visible 1175 * and put the final request into the hash table. 1176 */ 1177 if (reqp->req_state == AIO_REQ_CANCELED) { 1178 /* EMPTY */; 1179 } else if (arg->offset == O_SYNC) { 1180 if ((retval = __fdsync(arg->fd, FDSYNC_FILE)) == 1181 -1) { 1182 error = errno; 1183 } 1184 } else { 1185 if ((retval = __fdsync(arg->fd, FDSYNC_DATA)) == 1186 -1) { 1187 error = errno; 1188 } 1189 } 1190 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1191 aio_panic("_aio_do_request(): AIOFSYNC: " 1192 "request already in hash table"); 1193 break; 1194 default: 1195 aio_panic("_aio_do_request, bad op"); 1196 } 1197 1198 _aio_finish_request(aiowp, retval, error); 1199 } 1200 /* NOTREACHED */ 1201 return (NULL); 1202 } 1203 1204 /* 1205 * Perform the tail processing for _aio_do_request(). 1206 * The in-progress request may or may not have been cancelled. 1207 */ 1208 static void 1209 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1210 { 1211 aio_req_t *reqp; 1212 1213 sig_mutex_lock(&aiowp->work_qlock1); 1214 if ((reqp = aiowp->work_req) == NULL) 1215 sig_mutex_unlock(&aiowp->work_qlock1); 1216 else { 1217 aiowp->work_req = NULL; 1218 if (reqp->req_state == AIO_REQ_CANCELED) { 1219 retval = -1; 1220 error = ECANCELED; 1221 } 1222 if (!POSIX_AIO(reqp)) { 1223 int notify; 1224 if (reqp->req_state == AIO_REQ_INPROGRESS) { 1225 reqp->req_state = AIO_REQ_DONE; 1226 _aio_set_result(reqp, retval, error); 1227 } 1228 sig_mutex_unlock(&aiowp->work_qlock1); 1229 sig_mutex_lock(&__aio_mutex); 1230 /* 1231 * If it was canceled, this request will not be 1232 * added to done list. Just free it. 1233 */ 1234 if (error == ECANCELED) { 1235 _aio_outstand_cnt--; 1236 _aio_req_free(reqp); 1237 } else { 1238 _aio_req_done_cnt++; 1239 } 1240 /* 1241 * Notify any thread that may have blocked 1242 * because it saw an outstanding request. 1243 */ 1244 notify = 0; 1245 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1246 notify = 1; 1247 } 1248 sig_mutex_unlock(&__aio_mutex); 1249 if (notify) { 1250 (void) _kaio(AIONOTIFY); 1251 } 1252 } else { 1253 if (reqp->req_state == AIO_REQ_INPROGRESS) 1254 reqp->req_state = AIO_REQ_DONE; 1255 sig_mutex_unlock(&aiowp->work_qlock1); 1256 _aiodone(reqp, retval, error); 1257 } 1258 } 1259 } 1260 1261 void 1262 _aio_req_mark_done(aio_req_t *reqp) 1263 { 1264 #if !defined(_LP64) 1265 if (reqp->req_largefile) 1266 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1267 else 1268 #endif 1269 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1270 } 1271 1272 /* 1273 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1274 * hopefully to consume one of our queued signals. 1275 */ 1276 static void 1277 _aio_delay(int ticks) 1278 { 1279 (void) usleep(ticks * (MICROSEC / hz)); 1280 } 1281 1282 /* 1283 * Actually send the notifications. 1284 * We could block indefinitely here if the application 1285 * is not listening for the signal or port notifications. 1286 */ 1287 static void 1288 send_notification(notif_param_t *npp) 1289 { 1290 extern int __sigqueue(pid_t pid, int signo, 1291 /* const union sigval */ void *value, int si_code, int block); 1292 1293 if (npp->np_signo) 1294 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1295 SI_ASYNCIO, 1); 1296 else if (npp->np_port >= 0) 1297 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1298 npp->np_event, npp->np_object, npp->np_user); 1299 1300 if (npp->np_lio_signo) 1301 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1302 SI_ASYNCIO, 1); 1303 else if (npp->np_lio_port >= 0) 1304 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1305 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1306 } 1307 1308 /* 1309 * Asynchronous notification worker. 1310 */ 1311 void * 1312 _aio_do_notify(void *arg) 1313 { 1314 aio_worker_t *aiowp = (aio_worker_t *)arg; 1315 aio_req_t *reqp; 1316 1317 /* 1318 * This isn't really necessary. All signals are blocked. 1319 */ 1320 if (pthread_setspecific(_aio_key, aiowp) != 0) 1321 aio_panic("_aio_do_notify, pthread_setspecific()"); 1322 1323 /* 1324 * Notifications are never cancelled. 1325 * All signals remain blocked, forever. 1326 */ 1327 for (;;) { 1328 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1329 if (_aio_idle(aiowp) != 0) 1330 aio_panic("_aio_do_notify: _aio_idle() failed"); 1331 } 1332 send_notification(&reqp->req_notify); 1333 _aio_req_free(reqp); 1334 } 1335 1336 /* NOTREACHED */ 1337 return (NULL); 1338 } 1339 1340 /* 1341 * Do the completion semantics for a request that was either canceled 1342 * by _aio_cancel_req() or was completed by _aio_do_request(). 1343 */ 1344 static void 1345 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1346 { 1347 aio_result_t *resultp = reqp->req_resultp; 1348 int notify = 0; 1349 aio_lio_t *head; 1350 int sigev_none; 1351 int sigev_signal; 1352 int sigev_thread; 1353 int sigev_port; 1354 notif_param_t np; 1355 1356 /* 1357 * We call _aiodone() only for Posix I/O. 1358 */ 1359 ASSERT(POSIX_AIO(reqp)); 1360 1361 sigev_none = 0; 1362 sigev_signal = 0; 1363 sigev_thread = 0; 1364 sigev_port = 0; 1365 np.np_signo = 0; 1366 np.np_port = -1; 1367 np.np_lio_signo = 0; 1368 np.np_lio_port = -1; 1369 1370 switch (reqp->req_sigevent.sigev_notify) { 1371 case SIGEV_NONE: 1372 sigev_none = 1; 1373 break; 1374 case SIGEV_SIGNAL: 1375 sigev_signal = 1; 1376 break; 1377 case SIGEV_THREAD: 1378 sigev_thread = 1; 1379 break; 1380 case SIGEV_PORT: 1381 sigev_port = 1; 1382 break; 1383 default: 1384 aio_panic("_aiodone: improper sigev_notify"); 1385 break; 1386 } 1387 1388 /* 1389 * Figure out the notification parameters while holding __aio_mutex. 1390 * Actually perform the notifications after dropping __aio_mutex. 1391 * This allows us to sleep for a long time (if the notifications 1392 * incur delays) without impeding other async I/O operations. 1393 */ 1394 1395 sig_mutex_lock(&__aio_mutex); 1396 1397 if (sigev_signal) { 1398 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1399 notify = 1; 1400 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1401 } else if (sigev_thread | sigev_port) { 1402 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1403 notify = 1; 1404 np.np_event = reqp->req_op; 1405 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1406 np.np_event = AIOFSYNC64; 1407 np.np_object = (uintptr_t)reqp->req_aiocbp; 1408 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1409 } 1410 1411 if (resultp->aio_errno == EINPROGRESS) 1412 _aio_set_result(reqp, retval, error); 1413 1414 _aio_outstand_cnt--; 1415 1416 head = reqp->req_head; 1417 reqp->req_head = NULL; 1418 1419 if (sigev_none) { 1420 _aio_enq_doneq(reqp); 1421 reqp = NULL; 1422 } else { 1423 (void) _aio_hash_del(resultp); 1424 _aio_req_mark_done(reqp); 1425 } 1426 1427 _aio_waitn_wakeup(); 1428 1429 /* 1430 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1431 * __aio_suspend() increments "_aio_kernel_suspend" 1432 * when they are waiting in the kernel for completed I/Os. 1433 * 1434 * _kaio(AIONOTIFY) awakes the corresponding function 1435 * in the kernel; then the corresponding __aio_waitn() or 1436 * __aio_suspend() function could reap the recently 1437 * completed I/Os (_aiodone()). 1438 */ 1439 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1440 (void) _kaio(AIONOTIFY); 1441 1442 sig_mutex_unlock(&__aio_mutex); 1443 1444 if (head != NULL) { 1445 /* 1446 * If all the lio requests have completed, 1447 * prepare to notify the waiting thread. 1448 */ 1449 sig_mutex_lock(&head->lio_mutex); 1450 ASSERT(head->lio_refcnt == head->lio_nent); 1451 if (head->lio_refcnt == 1) { 1452 int waiting = 0; 1453 if (head->lio_mode == LIO_WAIT) { 1454 if ((waiting = head->lio_waiting) != 0) 1455 (void) cond_signal(&head->lio_cond_cv); 1456 } else if (head->lio_port < 0) { /* none or signal */ 1457 if ((np.np_lio_signo = head->lio_signo) != 0) 1458 notify = 1; 1459 np.np_lio_user = head->lio_sigval.sival_ptr; 1460 } else { /* thread or port */ 1461 notify = 1; 1462 np.np_lio_port = head->lio_port; 1463 np.np_lio_event = head->lio_event; 1464 np.np_lio_object = 1465 (uintptr_t)head->lio_sigevent; 1466 np.np_lio_user = head->lio_sigval.sival_ptr; 1467 } 1468 head->lio_nent = head->lio_refcnt = 0; 1469 sig_mutex_unlock(&head->lio_mutex); 1470 if (waiting == 0) 1471 _aio_lio_free(head); 1472 } else { 1473 head->lio_nent--; 1474 head->lio_refcnt--; 1475 sig_mutex_unlock(&head->lio_mutex); 1476 } 1477 } 1478 1479 /* 1480 * The request is completed; now perform the notifications. 1481 */ 1482 if (notify) { 1483 if (reqp != NULL) { 1484 /* 1485 * We usually put the request on the notification 1486 * queue because we don't want to block and delay 1487 * other operations behind us in the work queue. 1488 * Also we must never block on a cancel notification 1489 * because we are being called from an application 1490 * thread in this case and that could lead to deadlock 1491 * if no other thread is receiving notificatins. 1492 */ 1493 reqp->req_notify = np; 1494 reqp->req_op = AIONOTIFY; 1495 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1496 reqp = NULL; 1497 } else { 1498 /* 1499 * We already put the request on the done queue, 1500 * so we can't queue it to the notification queue. 1501 * Just do the notification directly. 1502 */ 1503 send_notification(&np); 1504 } 1505 } 1506 1507 if (reqp != NULL) 1508 _aio_req_free(reqp); 1509 } 1510 1511 /* 1512 * Delete fsync requests from list head until there is 1513 * only one left. Return 0 when there is only one, 1514 * otherwise return a non-zero value. 1515 */ 1516 static int 1517 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1518 { 1519 aio_lio_t *head = reqp->req_head; 1520 int rval = 0; 1521 1522 ASSERT(reqp == aiowp->work_req); 1523 sig_mutex_lock(&aiowp->work_qlock1); 1524 sig_mutex_lock(&head->lio_mutex); 1525 if (head->lio_refcnt > 1) { 1526 head->lio_refcnt--; 1527 head->lio_nent--; 1528 aiowp->work_req = NULL; 1529 sig_mutex_unlock(&head->lio_mutex); 1530 sig_mutex_unlock(&aiowp->work_qlock1); 1531 sig_mutex_lock(&__aio_mutex); 1532 _aio_outstand_cnt--; 1533 _aio_waitn_wakeup(); 1534 sig_mutex_unlock(&__aio_mutex); 1535 _aio_req_free(reqp); 1536 return (1); 1537 } 1538 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1539 reqp->req_head = NULL; 1540 if (head->lio_canned) 1541 reqp->req_state = AIO_REQ_CANCELED; 1542 if (head->lio_mode == LIO_DESTROY) { 1543 aiowp->work_req = NULL; 1544 rval = 1; 1545 } 1546 sig_mutex_unlock(&head->lio_mutex); 1547 sig_mutex_unlock(&aiowp->work_qlock1); 1548 head->lio_refcnt--; 1549 head->lio_nent--; 1550 _aio_lio_free(head); 1551 if (rval != 0) 1552 _aio_req_free(reqp); 1553 return (rval); 1554 } 1555 1556 /* 1557 * A worker is set idle when its work queue is empty. 1558 * The worker checks again that it has no more work 1559 * and then goes to sleep waiting for more work. 1560 */ 1561 int 1562 _aio_idle(aio_worker_t *aiowp) 1563 { 1564 int error = 0; 1565 1566 sig_mutex_lock(&aiowp->work_qlock1); 1567 if (aiowp->work_count1 == 0) { 1568 ASSERT(aiowp->work_minload1 == 0); 1569 aiowp->work_idleflg = 1; 1570 /* 1571 * A cancellation handler is not needed here. 1572 * aio worker threads are never cancelled via pthread_cancel(). 1573 */ 1574 error = sig_cond_wait(&aiowp->work_idle_cv, 1575 &aiowp->work_qlock1); 1576 /* 1577 * The idle flag is normally cleared before worker is awakened 1578 * by aio_req_add(). On error (EINTR), we clear it ourself. 1579 */ 1580 if (error) 1581 aiowp->work_idleflg = 0; 1582 } 1583 sig_mutex_unlock(&aiowp->work_qlock1); 1584 return (error); 1585 } 1586 1587 /* 1588 * A worker's completed AIO requests are placed onto a global 1589 * done queue. The application is only sent a SIGIO signal if 1590 * the process has a handler enabled and it is not waiting via 1591 * aiowait(). 1592 */ 1593 static void 1594 _aio_work_done(aio_worker_t *aiowp) 1595 { 1596 aio_req_t *reqp; 1597 1598 sig_mutex_lock(&__aio_mutex); 1599 sig_mutex_lock(&aiowp->work_qlock1); 1600 reqp = aiowp->work_prev1; 1601 reqp->req_next = NULL; 1602 aiowp->work_done1 = 0; 1603 aiowp->work_tail1 = aiowp->work_next1; 1604 if (aiowp->work_tail1 == NULL) 1605 aiowp->work_head1 = NULL; 1606 aiowp->work_prev1 = NULL; 1607 _aio_outstand_cnt--; 1608 _aio_req_done_cnt--; 1609 if (reqp->req_state == AIO_REQ_CANCELED) { 1610 /* 1611 * Request got cancelled after it was marked done. This can 1612 * happen because _aio_finish_request() marks it AIO_REQ_DONE 1613 * and drops all locks. Don't add the request to the done 1614 * queue and just discard it. 1615 */ 1616 sig_mutex_unlock(&aiowp->work_qlock1); 1617 _aio_req_free(reqp); 1618 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1619 sig_mutex_unlock(&__aio_mutex); 1620 (void) _kaio(AIONOTIFY); 1621 } else { 1622 sig_mutex_unlock(&__aio_mutex); 1623 } 1624 return; 1625 } 1626 sig_mutex_unlock(&aiowp->work_qlock1); 1627 _aio_donecnt++; 1628 ASSERT(_aio_donecnt > 0 && 1629 _aio_outstand_cnt >= 0 && 1630 _aio_req_done_cnt >= 0); 1631 ASSERT(reqp != NULL); 1632 1633 if (_aio_done_tail == NULL) { 1634 _aio_done_head = _aio_done_tail = reqp; 1635 } else { 1636 _aio_done_head->req_next = reqp; 1637 _aio_done_head = reqp; 1638 } 1639 1640 if (_aiowait_flag) { 1641 sig_mutex_unlock(&__aio_mutex); 1642 (void) _kaio(AIONOTIFY); 1643 } else { 1644 sig_mutex_unlock(&__aio_mutex); 1645 if (_sigio_enabled) 1646 (void) kill(__pid, SIGIO); 1647 } 1648 } 1649 1650 /* 1651 * The done queue consists of AIO requests that are in either the 1652 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1653 * are discarded. If the done queue is empty then NULL is returned. 1654 * Otherwise the address of a done aio_result_t is returned. 1655 */ 1656 aio_result_t * 1657 _aio_req_done(void) 1658 { 1659 aio_req_t *reqp; 1660 aio_result_t *resultp; 1661 1662 ASSERT(MUTEX_HELD(&__aio_mutex)); 1663 1664 if ((reqp = _aio_done_tail) != NULL) { 1665 if ((_aio_done_tail = reqp->req_next) == NULL) 1666 _aio_done_head = NULL; 1667 ASSERT(_aio_donecnt > 0); 1668 _aio_donecnt--; 1669 (void) _aio_hash_del(reqp->req_resultp); 1670 resultp = reqp->req_resultp; 1671 ASSERT(reqp->req_state == AIO_REQ_DONE); 1672 _aio_req_free(reqp); 1673 return (resultp); 1674 } 1675 /* is queue empty? */ 1676 if (reqp == NULL && _aio_outstand_cnt == 0) { 1677 return ((aio_result_t *)-1); 1678 } 1679 return (NULL); 1680 } 1681 1682 /* 1683 * Set the return and errno values for the application's use. 1684 * 1685 * For the Posix interfaces, we must set the return value first followed 1686 * by the errno value because the Posix interfaces allow for a change 1687 * in the errno value from EINPROGRESS to something else to signal 1688 * the completion of the asynchronous request. 1689 * 1690 * The opposite is true for the Solaris interfaces. These allow for 1691 * a change in the return value from AIO_INPROGRESS to something else 1692 * to signal the completion of the asynchronous request. 1693 */ 1694 void 1695 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1696 { 1697 aio_result_t *resultp = reqp->req_resultp; 1698 1699 if (POSIX_AIO(reqp)) { 1700 resultp->aio_return = retval; 1701 membar_producer(); 1702 resultp->aio_errno = error; 1703 } else { 1704 resultp->aio_errno = error; 1705 membar_producer(); 1706 resultp->aio_return = retval; 1707 } 1708 } 1709 1710 /* 1711 * Add an AIO request onto the next work queue. 1712 * A circular list of workers is used to choose the next worker. 1713 */ 1714 void 1715 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1716 { 1717 ulwp_t *self = curthread; 1718 aio_worker_t *aiowp; 1719 aio_worker_t *first; 1720 int load_bal_flg = 1; 1721 int found; 1722 1723 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1724 reqp->req_next = NULL; 1725 /* 1726 * Try to acquire the next worker's work queue. If it is locked, 1727 * then search the list of workers until a queue is found unlocked, 1728 * or until the list is completely traversed at which point another 1729 * worker will be created. 1730 */ 1731 sigoff(self); /* defer SIGIO */ 1732 sig_mutex_lock(&__aio_mutex); 1733 first = aiowp = *nextworker; 1734 if (mode != AIONOTIFY) 1735 _aio_outstand_cnt++; 1736 sig_mutex_unlock(&__aio_mutex); 1737 1738 switch (mode) { 1739 case AIOREAD: 1740 case AIOWRITE: 1741 case AIOAREAD: 1742 case AIOAWRITE: 1743 #if !defined(_LP64) 1744 case AIOAREAD64: 1745 case AIOAWRITE64: 1746 #endif 1747 /* try to find an idle worker */ 1748 found = 0; 1749 do { 1750 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1751 if (aiowp->work_idleflg) { 1752 found = 1; 1753 break; 1754 } 1755 sig_mutex_unlock(&aiowp->work_qlock1); 1756 } 1757 } while ((aiowp = aiowp->work_forw) != first); 1758 1759 if (found) { 1760 aiowp->work_minload1++; 1761 break; 1762 } 1763 1764 /* try to acquire some worker's queue lock */ 1765 do { 1766 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1767 found = 1; 1768 break; 1769 } 1770 } while ((aiowp = aiowp->work_forw) != first); 1771 1772 /* 1773 * Create more workers when the workers appear overloaded. 1774 * Either all the workers are busy draining their queues 1775 * or no worker's queue lock could be acquired. 1776 */ 1777 if (!found) { 1778 if (_aio_worker_cnt < _max_workers) { 1779 if (_aio_create_worker(reqp, mode)) 1780 aio_panic("_aio_req_add: add worker"); 1781 sigon(self); /* reenable SIGIO */ 1782 return; 1783 } 1784 1785 /* 1786 * No worker available and we have created 1787 * _max_workers, keep going through the 1788 * list slowly until we get a lock 1789 */ 1790 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1791 /* 1792 * give someone else a chance 1793 */ 1794 _aio_delay(1); 1795 aiowp = aiowp->work_forw; 1796 } 1797 } 1798 1799 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1800 if (_aio_worker_cnt < _max_workers && 1801 aiowp->work_minload1 >= _minworkload) { 1802 sig_mutex_unlock(&aiowp->work_qlock1); 1803 sig_mutex_lock(&__aio_mutex); 1804 *nextworker = aiowp->work_forw; 1805 sig_mutex_unlock(&__aio_mutex); 1806 if (_aio_create_worker(reqp, mode)) 1807 aio_panic("aio_req_add: add worker"); 1808 sigon(self); /* reenable SIGIO */ 1809 return; 1810 } 1811 aiowp->work_minload1++; 1812 break; 1813 case AIOFSYNC: 1814 case AIONOTIFY: 1815 load_bal_flg = 0; 1816 sig_mutex_lock(&aiowp->work_qlock1); 1817 break; 1818 default: 1819 aio_panic("_aio_req_add: invalid mode"); 1820 break; 1821 } 1822 /* 1823 * Put request onto worker's work queue. 1824 */ 1825 if (aiowp->work_tail1 == NULL) { 1826 ASSERT(aiowp->work_count1 == 0); 1827 aiowp->work_tail1 = reqp; 1828 aiowp->work_next1 = reqp; 1829 } else { 1830 aiowp->work_head1->req_next = reqp; 1831 if (aiowp->work_next1 == NULL) 1832 aiowp->work_next1 = reqp; 1833 } 1834 reqp->req_state = AIO_REQ_QUEUED; 1835 reqp->req_worker = aiowp; 1836 aiowp->work_head1 = reqp; 1837 /* 1838 * Awaken worker if it is not currently active. 1839 */ 1840 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1841 aiowp->work_idleflg = 0; 1842 (void) cond_signal(&aiowp->work_idle_cv); 1843 } 1844 sig_mutex_unlock(&aiowp->work_qlock1); 1845 1846 if (load_bal_flg) { 1847 sig_mutex_lock(&__aio_mutex); 1848 *nextworker = aiowp->work_forw; 1849 sig_mutex_unlock(&__aio_mutex); 1850 } 1851 sigon(self); /* reenable SIGIO */ 1852 } 1853 1854 /* 1855 * Get an AIO request for a specified worker. 1856 * If the work queue is empty, return NULL. 1857 */ 1858 aio_req_t * 1859 _aio_req_get(aio_worker_t *aiowp) 1860 { 1861 aio_req_t *reqp; 1862 1863 sig_mutex_lock(&aiowp->work_qlock1); 1864 if ((reqp = aiowp->work_next1) != NULL) { 1865 /* 1866 * Remove a POSIX request from the queue; the 1867 * request queue is a singularly linked list 1868 * with a previous pointer. The request is 1869 * removed by updating the previous pointer. 1870 * 1871 * Non-posix requests are left on the queue 1872 * to eventually be placed on the done queue. 1873 */ 1874 1875 if (POSIX_AIO(reqp)) { 1876 if (aiowp->work_prev1 == NULL) { 1877 aiowp->work_tail1 = reqp->req_next; 1878 if (aiowp->work_tail1 == NULL) 1879 aiowp->work_head1 = NULL; 1880 } else { 1881 aiowp->work_prev1->req_next = reqp->req_next; 1882 if (aiowp->work_head1 == reqp) 1883 aiowp->work_head1 = reqp->req_next; 1884 } 1885 1886 } else { 1887 aiowp->work_prev1 = reqp; 1888 ASSERT(aiowp->work_done1 >= 0); 1889 aiowp->work_done1++; 1890 } 1891 ASSERT(reqp != reqp->req_next); 1892 aiowp->work_next1 = reqp->req_next; 1893 ASSERT(aiowp->work_count1 >= 1); 1894 aiowp->work_count1--; 1895 switch (reqp->req_op) { 1896 case AIOREAD: 1897 case AIOWRITE: 1898 case AIOAREAD: 1899 case AIOAWRITE: 1900 #if !defined(_LP64) 1901 case AIOAREAD64: 1902 case AIOAWRITE64: 1903 #endif 1904 ASSERT(aiowp->work_minload1 > 0); 1905 aiowp->work_minload1--; 1906 break; 1907 } 1908 reqp->req_state = AIO_REQ_INPROGRESS; 1909 } 1910 aiowp->work_req = reqp; 1911 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1912 sig_mutex_unlock(&aiowp->work_qlock1); 1913 return (reqp); 1914 } 1915 1916 static void 1917 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1918 { 1919 aio_req_t **last; 1920 aio_req_t *lastrp; 1921 aio_req_t *next; 1922 1923 ASSERT(aiowp != NULL); 1924 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1925 if (POSIX_AIO(reqp)) { 1926 if (ostate != AIO_REQ_QUEUED) 1927 return; 1928 } 1929 last = &aiowp->work_tail1; 1930 lastrp = aiowp->work_tail1; 1931 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1932 while ((next = *last) != NULL) { 1933 if (next == reqp) { 1934 *last = next->req_next; 1935 if (aiowp->work_next1 == next) 1936 aiowp->work_next1 = next->req_next; 1937 1938 /* 1939 * if this is the first request on the queue, move 1940 * the lastrp pointer forward. 1941 */ 1942 if (lastrp == next) 1943 lastrp = next->req_next; 1944 1945 /* 1946 * if this request is pointed by work_head1, then 1947 * make work_head1 point to the last request that is 1948 * present on the queue. 1949 */ 1950 if (aiowp->work_head1 == next) 1951 aiowp->work_head1 = lastrp; 1952 1953 /* 1954 * work_prev1 is used only in non posix case and it 1955 * points to the current AIO_REQ_INPROGRESS request. 1956 * If work_prev1 points to this request which is being 1957 * deleted, make work_prev1 NULL and set work_done1 1958 * to 0. 1959 * 1960 * A worker thread can be processing only one request 1961 * at a time. 1962 */ 1963 if (aiowp->work_prev1 == next) { 1964 ASSERT(ostate == AIO_REQ_INPROGRESS && 1965 !POSIX_AIO(reqp) && aiowp->work_done1 > 0); 1966 aiowp->work_prev1 = NULL; 1967 aiowp->work_done1--; 1968 } 1969 1970 if (ostate == AIO_REQ_QUEUED) { 1971 ASSERT(aiowp->work_count1 >= 1); 1972 aiowp->work_count1--; 1973 ASSERT(aiowp->work_minload1 >= 1); 1974 aiowp->work_minload1--; 1975 } 1976 return; 1977 } 1978 last = &next->req_next; 1979 lastrp = next; 1980 } 1981 /* NOTREACHED */ 1982 } 1983 1984 static void 1985 _aio_enq_doneq(aio_req_t *reqp) 1986 { 1987 if (_aio_doneq == NULL) { 1988 _aio_doneq = reqp; 1989 reqp->req_next = reqp->req_prev = reqp; 1990 } else { 1991 reqp->req_next = _aio_doneq; 1992 reqp->req_prev = _aio_doneq->req_prev; 1993 _aio_doneq->req_prev->req_next = reqp; 1994 _aio_doneq->req_prev = reqp; 1995 } 1996 reqp->req_state = AIO_REQ_DONEQ; 1997 _aio_doneq_cnt++; 1998 } 1999 2000 /* 2001 * caller owns the _aio_mutex 2002 */ 2003 aio_req_t * 2004 _aio_req_remove(aio_req_t *reqp) 2005 { 2006 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 2007 return (NULL); 2008 2009 if (reqp) { 2010 /* request in done queue */ 2011 if (_aio_doneq == reqp) 2012 _aio_doneq = reqp->req_next; 2013 if (_aio_doneq == reqp) { 2014 /* only one request on queue */ 2015 _aio_doneq = NULL; 2016 } else { 2017 aio_req_t *tmp = reqp->req_next; 2018 reqp->req_prev->req_next = tmp; 2019 tmp->req_prev = reqp->req_prev; 2020 } 2021 } else if ((reqp = _aio_doneq) != NULL) { 2022 if (reqp == reqp->req_next) { 2023 /* only one request on queue */ 2024 _aio_doneq = NULL; 2025 } else { 2026 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 2027 _aio_doneq->req_prev = reqp->req_prev; 2028 } 2029 } 2030 if (reqp) { 2031 _aio_doneq_cnt--; 2032 reqp->req_next = reqp->req_prev = reqp; 2033 reqp->req_state = AIO_REQ_DONE; 2034 } 2035 return (reqp); 2036 } 2037 2038 /* 2039 * An AIO request is identified by an aio_result_t pointer. The library 2040 * maps this aio_result_t pointer to its internal representation using a 2041 * hash table. This function adds an aio_result_t pointer to the hash table. 2042 */ 2043 static int 2044 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 2045 { 2046 aio_hash_t *hashp; 2047 aio_req_t **prev; 2048 aio_req_t *next; 2049 2050 hashp = _aio_hash + AIOHASH(resultp); 2051 lmutex_lock(&hashp->hash_lock); 2052 prev = &hashp->hash_ptr; 2053 while ((next = *prev) != NULL) { 2054 if (resultp == next->req_resultp) { 2055 lmutex_unlock(&hashp->hash_lock); 2056 return (-1); 2057 } 2058 prev = &next->req_link; 2059 } 2060 *prev = reqp; 2061 ASSERT(reqp->req_link == NULL); 2062 lmutex_unlock(&hashp->hash_lock); 2063 return (0); 2064 } 2065 2066 /* 2067 * Remove an entry from the hash table. 2068 */ 2069 aio_req_t * 2070 _aio_hash_del(aio_result_t *resultp) 2071 { 2072 aio_hash_t *hashp; 2073 aio_req_t **prev; 2074 aio_req_t *next = NULL; 2075 2076 if (_aio_hash != NULL) { 2077 hashp = _aio_hash + AIOHASH(resultp); 2078 lmutex_lock(&hashp->hash_lock); 2079 prev = &hashp->hash_ptr; 2080 while ((next = *prev) != NULL) { 2081 if (resultp == next->req_resultp) { 2082 *prev = next->req_link; 2083 next->req_link = NULL; 2084 break; 2085 } 2086 prev = &next->req_link; 2087 } 2088 lmutex_unlock(&hashp->hash_lock); 2089 } 2090 return (next); 2091 } 2092 2093 /* 2094 * find an entry in the hash table 2095 */ 2096 aio_req_t * 2097 _aio_hash_find(aio_result_t *resultp) 2098 { 2099 aio_hash_t *hashp; 2100 aio_req_t **prev; 2101 aio_req_t *next = NULL; 2102 2103 if (_aio_hash != NULL) { 2104 hashp = _aio_hash + AIOHASH(resultp); 2105 lmutex_lock(&hashp->hash_lock); 2106 prev = &hashp->hash_ptr; 2107 while ((next = *prev) != NULL) { 2108 if (resultp == next->req_resultp) 2109 break; 2110 prev = &next->req_link; 2111 } 2112 lmutex_unlock(&hashp->hash_lock); 2113 } 2114 return (next); 2115 } 2116 2117 /* 2118 * AIO interface for POSIX 2119 */ 2120 int 2121 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2122 int mode, int flg) 2123 { 2124 aio_req_t *reqp; 2125 aio_args_t *ap; 2126 int kerr; 2127 2128 if (aiocbp == NULL) { 2129 errno = EINVAL; 2130 return (-1); 2131 } 2132 2133 /* initialize kaio */ 2134 if (!_kaio_ok) 2135 _kaio_init(); 2136 2137 aiocbp->aio_state = NOCHECK; 2138 2139 /* 2140 * If we have been called because a list I/O 2141 * kaio() failed, we dont want to repeat the 2142 * system call 2143 */ 2144 2145 if (flg & AIO_KAIO) { 2146 /* 2147 * Try kernel aio first. 2148 * If errno is ENOTSUP/EBADFD, 2149 * fall back to the thread implementation. 2150 */ 2151 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2152 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2153 aiocbp->aio_state = CHECK; 2154 kerr = (int)_kaio(mode, aiocbp); 2155 if (kerr == 0) 2156 return (0); 2157 if (errno != ENOTSUP && errno != EBADFD) { 2158 aiocbp->aio_resultp.aio_errno = errno; 2159 aiocbp->aio_resultp.aio_return = -1; 2160 aiocbp->aio_state = NOCHECK; 2161 return (-1); 2162 } 2163 if (errno == EBADFD) 2164 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2165 } 2166 } 2167 2168 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2169 aiocbp->aio_state = USERAIO; 2170 2171 if (!__uaio_ok && __uaio_init() == -1) 2172 return (-1); 2173 2174 if ((reqp = _aio_req_alloc()) == NULL) { 2175 errno = EAGAIN; 2176 return (-1); 2177 } 2178 2179 /* 2180 * If an LIO request, add the list head to the aio request 2181 */ 2182 reqp->req_head = lio_head; 2183 reqp->req_type = AIO_POSIX_REQ; 2184 reqp->req_op = mode; 2185 reqp->req_largefile = 0; 2186 2187 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2188 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2189 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2190 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2191 reqp->req_sigevent.sigev_signo = 2192 aiocbp->aio_sigevent.sigev_signo; 2193 reqp->req_sigevent.sigev_value.sival_ptr = 2194 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2195 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2196 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2197 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2198 /* 2199 * Reuse the sigevent structure to contain the port number 2200 * and the user value. Same for SIGEV_THREAD, below. 2201 */ 2202 reqp->req_sigevent.sigev_signo = 2203 pn->portnfy_port; 2204 reqp->req_sigevent.sigev_value.sival_ptr = 2205 pn->portnfy_user; 2206 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2207 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2208 /* 2209 * The sigevent structure contains the port number 2210 * and the user value. Same for SIGEV_PORT, above. 2211 */ 2212 reqp->req_sigevent.sigev_signo = 2213 aiocbp->aio_sigevent.sigev_signo; 2214 reqp->req_sigevent.sigev_value.sival_ptr = 2215 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2216 } 2217 2218 reqp->req_resultp = &aiocbp->aio_resultp; 2219 reqp->req_aiocbp = aiocbp; 2220 ap = &reqp->req_args; 2221 ap->fd = aiocbp->aio_fildes; 2222 ap->buf = (caddr_t)aiocbp->aio_buf; 2223 ap->bufsz = aiocbp->aio_nbytes; 2224 ap->offset = aiocbp->aio_offset; 2225 2226 if ((flg & AIO_NO_DUPS) && 2227 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2228 aio_panic("_aio_rw(): request already in hash table"); 2229 } 2230 _aio_req_add(reqp, nextworker, mode); 2231 return (0); 2232 } 2233 2234 #if !defined(_LP64) 2235 /* 2236 * 64-bit AIO interface for POSIX 2237 */ 2238 int 2239 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2240 int mode, int flg) 2241 { 2242 aio_req_t *reqp; 2243 aio_args_t *ap; 2244 int kerr; 2245 2246 if (aiocbp == NULL) { 2247 errno = EINVAL; 2248 return (-1); 2249 } 2250 2251 /* initialize kaio */ 2252 if (!_kaio_ok) 2253 _kaio_init(); 2254 2255 aiocbp->aio_state = NOCHECK; 2256 2257 /* 2258 * If we have been called because a list I/O 2259 * kaio() failed, we dont want to repeat the 2260 * system call 2261 */ 2262 2263 if (flg & AIO_KAIO) { 2264 /* 2265 * Try kernel aio first. 2266 * If errno is ENOTSUP/EBADFD, 2267 * fall back to the thread implementation. 2268 */ 2269 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2270 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2271 aiocbp->aio_state = CHECK; 2272 kerr = (int)_kaio(mode, aiocbp); 2273 if (kerr == 0) 2274 return (0); 2275 if (errno != ENOTSUP && errno != EBADFD) { 2276 aiocbp->aio_resultp.aio_errno = errno; 2277 aiocbp->aio_resultp.aio_return = -1; 2278 aiocbp->aio_state = NOCHECK; 2279 return (-1); 2280 } 2281 if (errno == EBADFD) 2282 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2283 } 2284 } 2285 2286 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2287 aiocbp->aio_state = USERAIO; 2288 2289 if (!__uaio_ok && __uaio_init() == -1) 2290 return (-1); 2291 2292 if ((reqp = _aio_req_alloc()) == NULL) { 2293 errno = EAGAIN; 2294 return (-1); 2295 } 2296 2297 /* 2298 * If an LIO request, add the list head to the aio request 2299 */ 2300 reqp->req_head = lio_head; 2301 reqp->req_type = AIO_POSIX_REQ; 2302 reqp->req_op = mode; 2303 reqp->req_largefile = 1; 2304 2305 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2306 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2307 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2308 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2309 reqp->req_sigevent.sigev_signo = 2310 aiocbp->aio_sigevent.sigev_signo; 2311 reqp->req_sigevent.sigev_value.sival_ptr = 2312 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2313 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2314 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2315 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2316 reqp->req_sigevent.sigev_signo = 2317 pn->portnfy_port; 2318 reqp->req_sigevent.sigev_value.sival_ptr = 2319 pn->portnfy_user; 2320 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2321 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2322 reqp->req_sigevent.sigev_signo = 2323 aiocbp->aio_sigevent.sigev_signo; 2324 reqp->req_sigevent.sigev_value.sival_ptr = 2325 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2326 } 2327 2328 reqp->req_resultp = &aiocbp->aio_resultp; 2329 reqp->req_aiocbp = aiocbp; 2330 ap = &reqp->req_args; 2331 ap->fd = aiocbp->aio_fildes; 2332 ap->buf = (caddr_t)aiocbp->aio_buf; 2333 ap->bufsz = aiocbp->aio_nbytes; 2334 ap->offset = aiocbp->aio_offset; 2335 2336 if ((flg & AIO_NO_DUPS) && 2337 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2338 aio_panic("_aio_rw64(): request already in hash table"); 2339 } 2340 _aio_req_add(reqp, nextworker, mode); 2341 return (0); 2342 } 2343 #endif /* !defined(_LP64) */ 2344