1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "synonyms.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48 49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50 static void _aiodone(aio_req_t *, ssize_t, int); 51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53 54 /* 55 * switch for kernel async I/O 56 */ 57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58 59 /* 60 * Key for thread-specific data 61 */ 62 pthread_key_t _aio_key; 63 64 /* 65 * Array for determining whether or not a file supports kaio. 66 * Initialized in _kaio_init(). 67 */ 68 uint32_t *_kaio_supported = NULL; 69 70 /* 71 * workers for read/write requests 72 * (__aio_mutex lock protects circular linked list of workers) 73 */ 74 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76 int __rw_workerscnt; /* number of read/write workers */ 77 78 /* 79 * worker for notification requests. 80 */ 81 aio_worker_t *__workers_no; /* circular list of AIO workers */ 82 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83 int __no_workerscnt; /* number of write workers */ 84 85 aio_req_t *_aio_done_tail; /* list of done requests */ 86 aio_req_t *_aio_done_head; 87 88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89 cond_t __aio_initcv = DEFAULTCV; 90 int __aio_initbusy = 0; 91 92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94 95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97 98 aio_hash_t *_aio_hash; 99 100 aio_req_t *_aio_doneq; /* double linked done queue list */ 101 102 int _aio_donecnt = 0; 103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104 int _aio_doneq_cnt = 0; 105 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110 111 int _max_workers = 256; /* max number of workers permitted */ 112 int _min_workers = 4; /* min number of workers */ 113 int _minworkload = 2; /* min number of request in q */ 114 int _aio_worker_cnt = 0; /* number of workers to do requests */ 115 int __uaio_ok = 0; /* AIO has been enabled */ 116 sigset_t _worker_set; /* worker's signal mask */ 117 118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119 int _aio_flags = 0; /* see asyncio.h defines for */ 120 121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122 123 int hz; /* clock ticks per second */ 124 125 static int 126 _kaio_supported_init(void) 127 { 128 void *ptr; 129 size_t size; 130 131 if (_kaio_supported != NULL) /* already initialized */ 132 return (0); 133 134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137 if (ptr == MAP_FAILED) 138 return (-1); 139 _kaio_supported = ptr; 140 return (0); 141 } 142 143 /* 144 * The aio subsystem is initialized when an AIO request is made. 145 * Constants are initialized like the max number of workers that 146 * the subsystem can create, and the minimum number of workers 147 * permitted before imposing some restrictions. Also, some 148 * workers are created. 149 */ 150 int 151 __uaio_init(void) 152 { 153 int ret = -1; 154 int i; 155 156 lmutex_lock(&__aio_initlock); 157 while (__aio_initbusy) 158 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 159 if (__uaio_ok) { /* already initialized */ 160 lmutex_unlock(&__aio_initlock); 161 return (0); 162 } 163 __aio_initbusy = 1; 164 lmutex_unlock(&__aio_initlock); 165 166 hz = (int)sysconf(_SC_CLK_TCK); 167 __pid = getpid(); 168 169 setup_cancelsig(SIGAIOCANCEL); 170 171 if (_kaio_supported_init() != 0) 172 goto out; 173 174 /* 175 * Allocate and initialize the hash table. 176 * Do this only once, even if __uaio_init() is called twice. 177 */ 178 if (_aio_hash == NULL) { 179 /* LINTED pointer cast */ 180 _aio_hash = (aio_hash_t *)mmap(NULL, 181 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 182 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 183 if ((void *)_aio_hash == MAP_FAILED) { 184 _aio_hash = NULL; 185 goto out; 186 } 187 for (i = 0; i < HASHSZ; i++) 188 (void) mutex_init(&_aio_hash[i].hash_lock, 189 USYNC_THREAD, NULL); 190 } 191 192 /* 193 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 194 */ 195 (void) sigfillset(&_worker_set); 196 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 197 198 /* 199 * Create one worker to send asynchronous notifications. 200 * Do this only once, even if __uaio_init() is called twice. 201 */ 202 if (__no_workerscnt == 0 && 203 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 204 errno = EAGAIN; 205 goto out; 206 } 207 208 /* 209 * Create the minimum number of read/write workers. 210 * And later check whether atleast one worker is created; 211 * lwp_create() calls could fail because of segkp exhaustion. 212 */ 213 for (i = 0; i < _min_workers; i++) 214 (void) _aio_create_worker(NULL, AIOREAD); 215 if (__rw_workerscnt == 0) { 216 errno = EAGAIN; 217 goto out; 218 } 219 220 ret = 0; 221 out: 222 lmutex_lock(&__aio_initlock); 223 if (ret == 0) 224 __uaio_ok = 1; 225 __aio_initbusy = 0; 226 (void) cond_broadcast(&__aio_initcv); 227 lmutex_unlock(&__aio_initlock); 228 return (ret); 229 } 230 231 /* 232 * Called from close() before actually performing the real _close(). 233 */ 234 void 235 _aio_close(int fd) 236 { 237 if (fd < 0) /* avoid cancelling everything */ 238 return; 239 /* 240 * Cancel all outstanding aio requests for this file descriptor. 241 */ 242 if (__uaio_ok) 243 (void) aiocancel_all(fd); 244 /* 245 * If we have allocated the bit array, clear the bit for this file. 246 * The next open may re-use this file descriptor and the new file 247 * may have different kaio() behaviour. 248 */ 249 if (_kaio_supported != NULL) 250 CLEAR_KAIO_SUPPORTED(fd); 251 } 252 253 /* 254 * special kaio cleanup thread sits in a loop in the 255 * kernel waiting for pending kaio requests to complete. 256 */ 257 void * 258 _kaio_cleanup_thread(void *arg) 259 { 260 if (pthread_setspecific(_aio_key, arg) != 0) 261 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 262 (void) _kaio(AIOSTART); 263 return (arg); 264 } 265 266 /* 267 * initialize kaio. 268 */ 269 void 270 _kaio_init() 271 { 272 int error; 273 sigset_t oset; 274 275 lmutex_lock(&__aio_initlock); 276 while (__aio_initbusy) 277 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 278 if (_kaio_ok) { /* already initialized */ 279 lmutex_unlock(&__aio_initlock); 280 return; 281 } 282 __aio_initbusy = 1; 283 lmutex_unlock(&__aio_initlock); 284 285 if (_kaio_supported_init() != 0) 286 error = ENOMEM; 287 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 288 error = ENOMEM; 289 else if ((error = (int)_kaio(AIOINIT)) == 0) { 290 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 291 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 292 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 293 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 294 } 295 if (error && _kaiowp != NULL) { 296 _aio_worker_free(_kaiowp); 297 _kaiowp = NULL; 298 } 299 300 lmutex_lock(&__aio_initlock); 301 if (error) 302 _kaio_ok = -1; 303 else 304 _kaio_ok = 1; 305 __aio_initbusy = 0; 306 (void) cond_broadcast(&__aio_initcv); 307 lmutex_unlock(&__aio_initlock); 308 } 309 310 int 311 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 312 aio_result_t *resultp) 313 { 314 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 315 } 316 317 int 318 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 319 aio_result_t *resultp) 320 { 321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 322 } 323 324 #if !defined(_LP64) 325 int 326 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 327 aio_result_t *resultp) 328 { 329 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 330 } 331 332 int 333 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 334 aio_result_t *resultp) 335 { 336 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 337 } 338 #endif /* !defined(_LP64) */ 339 340 int 341 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 342 aio_result_t *resultp, int mode) 343 { 344 aio_req_t *reqp; 345 aio_args_t *ap; 346 offset_t loffset; 347 struct stat stat; 348 int error = 0; 349 int kerr; 350 int umode; 351 352 switch (whence) { 353 354 case SEEK_SET: 355 loffset = offset; 356 break; 357 case SEEK_CUR: 358 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 359 error = -1; 360 else 361 loffset += offset; 362 break; 363 case SEEK_END: 364 if (fstat(fd, &stat) == -1) 365 error = -1; 366 else 367 loffset = offset + stat.st_size; 368 break; 369 default: 370 errno = EINVAL; 371 error = -1; 372 } 373 374 if (error) 375 return (error); 376 377 /* initialize kaio */ 378 if (!_kaio_ok) 379 _kaio_init(); 380 381 /* 382 * _aio_do_request() needs the original request code (mode) to be able 383 * to choose the appropiate 32/64 bit function. All other functions 384 * only require the difference between READ and WRITE (umode). 385 */ 386 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 387 umode = mode - AIOAREAD64; 388 else 389 umode = mode; 390 391 /* 392 * Try kernel aio first. 393 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 394 */ 395 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 396 resultp->aio_errno = 0; 397 sig_mutex_lock(&__aio_mutex); 398 _kaio_outstand_cnt++; 399 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 400 (umode | AIO_POLL_BIT) : umode), 401 fd, buf, bufsz, loffset, resultp); 402 if (kerr == 0) { 403 sig_mutex_unlock(&__aio_mutex); 404 return (0); 405 } 406 _kaio_outstand_cnt--; 407 sig_mutex_unlock(&__aio_mutex); 408 if (errno != ENOTSUP && errno != EBADFD) 409 return (-1); 410 if (errno == EBADFD) 411 SET_KAIO_NOT_SUPPORTED(fd); 412 } 413 414 if (!__uaio_ok && __uaio_init() == -1) 415 return (-1); 416 417 if ((reqp = _aio_req_alloc()) == NULL) { 418 errno = EAGAIN; 419 return (-1); 420 } 421 422 /* 423 * _aio_do_request() checks reqp->req_op to differentiate 424 * between 32 and 64 bit access. 425 */ 426 reqp->req_op = mode; 427 reqp->req_resultp = resultp; 428 ap = &reqp->req_args; 429 ap->fd = fd; 430 ap->buf = buf; 431 ap->bufsz = bufsz; 432 ap->offset = loffset; 433 434 if (_aio_hash_insert(resultp, reqp) != 0) { 435 _aio_req_free(reqp); 436 errno = EINVAL; 437 return (-1); 438 } 439 /* 440 * _aio_req_add() only needs the difference between READ and 441 * WRITE to choose the right worker queue. 442 */ 443 _aio_req_add(reqp, &__nextworker_rw, umode); 444 return (0); 445 } 446 447 int 448 aiocancel(aio_result_t *resultp) 449 { 450 aio_req_t *reqp; 451 aio_worker_t *aiowp; 452 int ret; 453 int done = 0; 454 int canceled = 0; 455 456 if (!__uaio_ok) { 457 errno = EINVAL; 458 return (-1); 459 } 460 461 sig_mutex_lock(&__aio_mutex); 462 reqp = _aio_hash_find(resultp); 463 if (reqp == NULL) { 464 if (_aio_outstand_cnt == _aio_req_done_cnt) 465 errno = EINVAL; 466 else 467 errno = EACCES; 468 ret = -1; 469 } else { 470 aiowp = reqp->req_worker; 471 sig_mutex_lock(&aiowp->work_qlock1); 472 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 473 sig_mutex_unlock(&aiowp->work_qlock1); 474 475 if (canceled) { 476 ret = 0; 477 } else { 478 if (_aio_outstand_cnt == 0 || 479 _aio_outstand_cnt == _aio_req_done_cnt) 480 errno = EINVAL; 481 else 482 errno = EACCES; 483 ret = -1; 484 } 485 } 486 sig_mutex_unlock(&__aio_mutex); 487 return (ret); 488 } 489 490 /* 491 * This must be asynch safe 492 */ 493 aio_result_t * 494 aiowait(struct timeval *uwait) 495 { 496 aio_result_t *uresultp; 497 aio_result_t *kresultp; 498 aio_result_t *resultp; 499 int dontblock; 500 int timedwait = 0; 501 int kaio_errno = 0; 502 struct timeval twait; 503 struct timeval *wait = NULL; 504 hrtime_t hrtend; 505 hrtime_t hres; 506 507 if (uwait) { 508 /* 509 * Check for a valid specified wait time. 510 * If it is invalid, fail the call right away. 511 */ 512 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 513 uwait->tv_usec >= MICROSEC) { 514 errno = EINVAL; 515 return ((aio_result_t *)-1); 516 } 517 518 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 519 hrtend = gethrtime() + 520 (hrtime_t)uwait->tv_sec * NANOSEC + 521 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 522 twait = *uwait; 523 wait = &twait; 524 timedwait++; 525 } else { 526 /* polling */ 527 sig_mutex_lock(&__aio_mutex); 528 if (_kaio_outstand_cnt == 0) { 529 kresultp = (aio_result_t *)-1; 530 } else { 531 kresultp = (aio_result_t *)_kaio(AIOWAIT, 532 (struct timeval *)-1, 1); 533 if (kresultp != (aio_result_t *)-1 && 534 kresultp != NULL && 535 kresultp != (aio_result_t *)1) { 536 _kaio_outstand_cnt--; 537 sig_mutex_unlock(&__aio_mutex); 538 return (kresultp); 539 } 540 } 541 uresultp = _aio_req_done(); 542 sig_mutex_unlock(&__aio_mutex); 543 if (uresultp != NULL && 544 uresultp != (aio_result_t *)-1) { 545 return (uresultp); 546 } 547 if (uresultp == (aio_result_t *)-1 && 548 kresultp == (aio_result_t *)-1) { 549 errno = EINVAL; 550 return ((aio_result_t *)-1); 551 } else { 552 return (NULL); 553 } 554 } 555 } 556 557 for (;;) { 558 sig_mutex_lock(&__aio_mutex); 559 uresultp = _aio_req_done(); 560 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 561 sig_mutex_unlock(&__aio_mutex); 562 resultp = uresultp; 563 break; 564 } 565 _aiowait_flag++; 566 dontblock = (uresultp == (aio_result_t *)-1); 567 if (dontblock && _kaio_outstand_cnt == 0) { 568 kresultp = (aio_result_t *)-1; 569 kaio_errno = EINVAL; 570 } else { 571 sig_mutex_unlock(&__aio_mutex); 572 kresultp = (aio_result_t *)_kaio(AIOWAIT, 573 wait, dontblock); 574 sig_mutex_lock(&__aio_mutex); 575 kaio_errno = errno; 576 } 577 _aiowait_flag--; 578 sig_mutex_unlock(&__aio_mutex); 579 if (kresultp == (aio_result_t *)1) { 580 /* aiowait() awakened by an aionotify() */ 581 continue; 582 } else if (kresultp != NULL && 583 kresultp != (aio_result_t *)-1) { 584 resultp = kresultp; 585 sig_mutex_lock(&__aio_mutex); 586 _kaio_outstand_cnt--; 587 sig_mutex_unlock(&__aio_mutex); 588 break; 589 } else if (kresultp == (aio_result_t *)-1 && 590 kaio_errno == EINVAL && 591 uresultp == (aio_result_t *)-1) { 592 errno = kaio_errno; 593 resultp = (aio_result_t *)-1; 594 break; 595 } else if (kresultp == (aio_result_t *)-1 && 596 kaio_errno == EINTR) { 597 errno = kaio_errno; 598 resultp = (aio_result_t *)-1; 599 break; 600 } else if (timedwait) { 601 hres = hrtend - gethrtime(); 602 if (hres <= 0) { 603 /* time is up; return */ 604 resultp = NULL; 605 break; 606 } else { 607 /* 608 * Some time left. Round up the remaining time 609 * in nanoseconds to microsec. Retry the call. 610 */ 611 hres += (NANOSEC / MICROSEC) - 1; 612 wait->tv_sec = hres / NANOSEC; 613 wait->tv_usec = 614 (hres % NANOSEC) / (NANOSEC / MICROSEC); 615 } 616 } else { 617 ASSERT(kresultp == NULL && uresultp == NULL); 618 resultp = NULL; 619 continue; 620 } 621 } 622 return (resultp); 623 } 624 625 /* 626 * _aio_get_timedelta calculates the remaining time and stores the result 627 * into timespec_t *wait. 628 */ 629 630 int 631 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 632 { 633 int ret = 0; 634 struct timeval cur; 635 timespec_t curtime; 636 637 (void) gettimeofday(&cur, NULL); 638 curtime.tv_sec = cur.tv_sec; 639 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 640 641 if (end->tv_sec >= curtime.tv_sec) { 642 wait->tv_sec = end->tv_sec - curtime.tv_sec; 643 if (end->tv_nsec >= curtime.tv_nsec) { 644 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 645 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 646 ret = -1; /* timer expired */ 647 } else { 648 if (end->tv_sec > curtime.tv_sec) { 649 wait->tv_sec -= 1; 650 wait->tv_nsec = NANOSEC - 651 (curtime.tv_nsec - end->tv_nsec); 652 } else { 653 ret = -1; /* timer expired */ 654 } 655 } 656 } else { 657 ret = -1; 658 } 659 return (ret); 660 } 661 662 /* 663 * If closing by file descriptor: we will simply cancel all the outstanding 664 * aio`s and return. Those aio's in question will have either noticed the 665 * cancellation notice before, during, or after initiating io. 666 */ 667 int 668 aiocancel_all(int fd) 669 { 670 aio_req_t *reqp; 671 aio_req_t **reqpp; 672 aio_worker_t *first; 673 aio_worker_t *next; 674 int canceled = 0; 675 int done = 0; 676 int cancelall = 0; 677 678 sig_mutex_lock(&__aio_mutex); 679 680 if (_aio_outstand_cnt == 0) { 681 sig_mutex_unlock(&__aio_mutex); 682 return (AIO_ALLDONE); 683 } 684 685 /* 686 * Cancel requests from the read/write workers' queues. 687 */ 688 first = __nextworker_rw; 689 next = first; 690 do { 691 _aio_cancel_work(next, fd, &canceled, &done); 692 } while ((next = next->work_forw) != first); 693 694 /* 695 * finally, check if there are requests on the done queue that 696 * should be canceled. 697 */ 698 if (fd < 0) 699 cancelall = 1; 700 reqpp = &_aio_done_tail; 701 while ((reqp = *reqpp) != NULL) { 702 if (cancelall || reqp->req_args.fd == fd) { 703 *reqpp = reqp->req_next; 704 _aio_donecnt--; 705 (void) _aio_hash_del(reqp->req_resultp); 706 _aio_req_free(reqp); 707 } else 708 reqpp = &reqp->req_next; 709 } 710 if (cancelall) { 711 ASSERT(_aio_donecnt == 0); 712 _aio_done_head = NULL; 713 } 714 sig_mutex_unlock(&__aio_mutex); 715 716 if (canceled && done == 0) 717 return (AIO_CANCELED); 718 else if (done && canceled == 0) 719 return (AIO_ALLDONE); 720 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 721 return ((int)_kaio(AIOCANCEL, fd, NULL)); 722 return (AIO_NOTCANCELED); 723 } 724 725 /* 726 * Cancel requests from a given work queue. If the file descriptor 727 * parameter, fd, is non-negative, then only cancel those requests 728 * in this queue that are to this file descriptor. If the fd 729 * parameter is -1, then cancel all requests. 730 */ 731 static void 732 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 733 { 734 aio_req_t *reqp; 735 736 sig_mutex_lock(&aiowp->work_qlock1); 737 /* 738 * cancel queued requests first. 739 */ 740 reqp = aiowp->work_tail1; 741 while (reqp != NULL) { 742 if (fd < 0 || reqp->req_args.fd == fd) { 743 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 744 /* 745 * Callers locks were dropped. 746 * reqp is invalid; start traversing 747 * the list from the beginning again. 748 */ 749 reqp = aiowp->work_tail1; 750 continue; 751 } 752 } 753 reqp = reqp->req_next; 754 } 755 /* 756 * Since the queued requests have been canceled, there can 757 * only be one inprogress request that should be canceled. 758 */ 759 if ((reqp = aiowp->work_req) != NULL && 760 (fd < 0 || reqp->req_args.fd == fd)) 761 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 762 sig_mutex_unlock(&aiowp->work_qlock1); 763 } 764 765 /* 766 * Cancel a request. Return 1 if the callers locks were temporarily 767 * dropped, otherwise return 0. 768 */ 769 int 770 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 771 { 772 int ostate = reqp->req_state; 773 774 ASSERT(MUTEX_HELD(&__aio_mutex)); 775 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 776 if (ostate == AIO_REQ_CANCELED) 777 return (0); 778 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 779 (*done)++; 780 return (0); 781 } 782 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 783 ASSERT(POSIX_AIO(reqp)); 784 /* Cancel the queued aio_fsync() request */ 785 if (!reqp->req_head->lio_canned) { 786 reqp->req_head->lio_canned = 1; 787 _aio_outstand_cnt--; 788 (*canceled)++; 789 } 790 return (0); 791 } 792 reqp->req_state = AIO_REQ_CANCELED; 793 _aio_req_del(aiowp, reqp, ostate); 794 (void) _aio_hash_del(reqp->req_resultp); 795 (*canceled)++; 796 if (reqp == aiowp->work_req) { 797 ASSERT(ostate == AIO_REQ_INPROGRESS); 798 /* 799 * Set the result values now, before _aiodone() is called. 800 * We do this because the application can expect aio_return 801 * and aio_errno to be set to -1 and ECANCELED, respectively, 802 * immediately after a successful return from aiocancel() 803 * or aio_cancel(). 804 */ 805 _aio_set_result(reqp, -1, ECANCELED); 806 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 807 return (0); 808 } 809 if (!POSIX_AIO(reqp)) { 810 _aio_outstand_cnt--; 811 _aio_set_result(reqp, -1, ECANCELED); 812 return (0); 813 } 814 sig_mutex_unlock(&aiowp->work_qlock1); 815 sig_mutex_unlock(&__aio_mutex); 816 _aiodone(reqp, -1, ECANCELED); 817 sig_mutex_lock(&__aio_mutex); 818 sig_mutex_lock(&aiowp->work_qlock1); 819 return (1); 820 } 821 822 int 823 _aio_create_worker(aio_req_t *reqp, int mode) 824 { 825 aio_worker_t *aiowp, **workers, **nextworker; 826 int *aio_workerscnt; 827 void *(*func)(void *); 828 sigset_t oset; 829 int error; 830 831 /* 832 * Put the new worker thread in the right queue. 833 */ 834 switch (mode) { 835 case AIOREAD: 836 case AIOWRITE: 837 case AIOAREAD: 838 case AIOAWRITE: 839 #if !defined(_LP64) 840 case AIOAREAD64: 841 case AIOAWRITE64: 842 #endif 843 workers = &__workers_rw; 844 nextworker = &__nextworker_rw; 845 aio_workerscnt = &__rw_workerscnt; 846 func = _aio_do_request; 847 break; 848 case AIONOTIFY: 849 workers = &__workers_no; 850 nextworker = &__nextworker_no; 851 func = _aio_do_notify; 852 aio_workerscnt = &__no_workerscnt; 853 break; 854 default: 855 aio_panic("_aio_create_worker: invalid mode"); 856 break; 857 } 858 859 if ((aiowp = _aio_worker_alloc()) == NULL) 860 return (-1); 861 862 if (reqp) { 863 reqp->req_state = AIO_REQ_QUEUED; 864 reqp->req_worker = aiowp; 865 aiowp->work_head1 = reqp; 866 aiowp->work_tail1 = reqp; 867 aiowp->work_next1 = reqp; 868 aiowp->work_count1 = 1; 869 aiowp->work_minload1 = 1; 870 } 871 872 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 873 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 874 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 875 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 876 if (error) { 877 if (reqp) { 878 reqp->req_state = 0; 879 reqp->req_worker = NULL; 880 } 881 _aio_worker_free(aiowp); 882 return (-1); 883 } 884 885 lmutex_lock(&__aio_mutex); 886 (*aio_workerscnt)++; 887 if (*workers == NULL) { 888 aiowp->work_forw = aiowp; 889 aiowp->work_backw = aiowp; 890 *nextworker = aiowp; 891 *workers = aiowp; 892 } else { 893 aiowp->work_backw = (*workers)->work_backw; 894 aiowp->work_forw = (*workers); 895 (*workers)->work_backw->work_forw = aiowp; 896 (*workers)->work_backw = aiowp; 897 } 898 _aio_worker_cnt++; 899 lmutex_unlock(&__aio_mutex); 900 901 (void) thr_continue(aiowp->work_tid); 902 903 return (0); 904 } 905 906 /* 907 * This is the worker's main routine. 908 * The task of this function is to execute all queued requests; 909 * once the last pending request is executed this function will block 910 * in _aio_idle(). A new incoming request must wakeup this thread to 911 * restart the work. 912 * Every worker has an own work queue. The queue lock is required 913 * to synchronize the addition of new requests for this worker or 914 * cancellation of pending/running requests. 915 * 916 * Cancellation scenarios: 917 * The cancellation of a request is being done asynchronously using 918 * _aio_cancel_req() from another thread context. 919 * A queued request can be cancelled in different manners : 920 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 921 * - lock the queue -> remove the request -> unlock the queue 922 * - this function/thread does not detect this cancellation process 923 * b) request is in progress (AIO_REQ_INPROGRESS) : 924 * - this function first allow the cancellation of the running 925 * request with the flag "work_cancel_flg=1" 926 * see _aio_req_get() -> _aio_cancel_on() 927 * During this phase, it is allowed to interrupt the worker 928 * thread running the request (this thread) using the SIGAIOCANCEL 929 * signal. 930 * Once this thread returns from the kernel (because the request 931 * is just done), then it must disable a possible cancellation 932 * and proceed to finish the request. To disable the cancellation 933 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 934 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 935 * same procedure as in a) 936 * 937 * To b) 938 * This thread uses sigsetjmp() to define the position in the code, where 939 * it wish to continue working in the case that a SIGAIOCANCEL signal 940 * is detected. 941 * Normally this thread should get the cancellation signal during the 942 * kernel phase (reading or writing). In that case the signal handler 943 * aiosigcancelhndlr() is activated using the worker thread context, 944 * which again will use the siglongjmp() function to break the standard 945 * code flow and jump to the "sigsetjmp" position, provided that 946 * "work_cancel_flg" is set to "1". 947 * Because the "work_cancel_flg" is only manipulated by this worker 948 * thread and it can only run on one CPU at a given time, it is not 949 * necessary to protect that flag with the queue lock. 950 * Returning from the kernel (read or write system call) we must 951 * first disable the use of the SIGAIOCANCEL signal and accordingly 952 * the use of the siglongjmp() function to prevent a possible deadlock: 953 * - It can happens that this worker thread returns from the kernel and 954 * blocks in "work_qlock1", 955 * - then a second thread cancels the apparently "in progress" request 956 * and sends the SIGAIOCANCEL signal to the worker thread, 957 * - the worker thread gets assigned the "work_qlock1" and will returns 958 * from the kernel, 959 * - the kernel detects the pending signal and activates the signal 960 * handler instead, 961 * - if the "work_cancel_flg" is still set then the signal handler 962 * should use siglongjmp() to cancel the "in progress" request and 963 * it would try to acquire the same work_qlock1 in _aio_req_get() 964 * for a second time => deadlock. 965 * To avoid that situation we disable the cancellation of the request 966 * in progress BEFORE we try to acquire the work_qlock1. 967 * In that case the signal handler will not call siglongjmp() and the 968 * worker thread will continue running the standard code flow. 969 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 970 * an eventually required siglongjmp() freeing the work_qlock1 and 971 * avoiding a deadlock. 972 */ 973 void * 974 _aio_do_request(void *arglist) 975 { 976 aio_worker_t *aiowp = (aio_worker_t *)arglist; 977 ulwp_t *self = curthread; 978 struct aio_args *arg; 979 aio_req_t *reqp; /* current AIO request */ 980 ssize_t retval; 981 int error; 982 983 if (pthread_setspecific(_aio_key, aiowp) != 0) 984 aio_panic("_aio_do_request, pthread_setspecific()"); 985 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 986 ASSERT(aiowp->work_req == NULL); 987 988 /* 989 * We resume here when an operation is cancelled. 990 * On first entry, aiowp->work_req == NULL, so all 991 * we do is block SIGAIOCANCEL. 992 */ 993 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 994 ASSERT(self->ul_sigdefer == 0); 995 996 sigoff(self); /* block SIGAIOCANCEL */ 997 if (aiowp->work_req != NULL) 998 _aio_finish_request(aiowp, -1, ECANCELED); 999 1000 for (;;) { 1001 /* 1002 * Put completed requests on aio_done_list. This has 1003 * to be done as part of the main loop to ensure that 1004 * we don't artificially starve any aiowait'ers. 1005 */ 1006 if (aiowp->work_done1) 1007 _aio_work_done(aiowp); 1008 1009 top: 1010 /* consume any deferred SIGAIOCANCEL signal here */ 1011 sigon(self); 1012 sigoff(self); 1013 1014 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1015 if (_aio_idle(aiowp) != 0) 1016 goto top; 1017 } 1018 arg = &reqp->req_args; 1019 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1020 reqp->req_state == AIO_REQ_CANCELED); 1021 error = 0; 1022 1023 switch (reqp->req_op) { 1024 case AIOREAD: 1025 case AIOAREAD: 1026 sigon(self); /* unblock SIGAIOCANCEL */ 1027 retval = pread(arg->fd, arg->buf, 1028 arg->bufsz, arg->offset); 1029 if (retval == -1) { 1030 if (errno == ESPIPE) { 1031 retval = read(arg->fd, 1032 arg->buf, arg->bufsz); 1033 if (retval == -1) 1034 error = errno; 1035 } else { 1036 error = errno; 1037 } 1038 } 1039 sigoff(self); /* block SIGAIOCANCEL */ 1040 break; 1041 case AIOWRITE: 1042 case AIOAWRITE: 1043 sigon(self); /* unblock SIGAIOCANCEL */ 1044 retval = pwrite(arg->fd, arg->buf, 1045 arg->bufsz, arg->offset); 1046 if (retval == -1) { 1047 if (errno == ESPIPE) { 1048 retval = write(arg->fd, 1049 arg->buf, arg->bufsz); 1050 if (retval == -1) 1051 error = errno; 1052 } else { 1053 error = errno; 1054 } 1055 } 1056 sigoff(self); /* block SIGAIOCANCEL */ 1057 break; 1058 #if !defined(_LP64) 1059 case AIOAREAD64: 1060 sigon(self); /* unblock SIGAIOCANCEL */ 1061 retval = pread64(arg->fd, arg->buf, 1062 arg->bufsz, arg->offset); 1063 if (retval == -1) { 1064 if (errno == ESPIPE) { 1065 retval = read(arg->fd, 1066 arg->buf, arg->bufsz); 1067 if (retval == -1) 1068 error = errno; 1069 } else { 1070 error = errno; 1071 } 1072 } 1073 sigoff(self); /* block SIGAIOCANCEL */ 1074 break; 1075 case AIOAWRITE64: 1076 sigon(self); /* unblock SIGAIOCANCEL */ 1077 retval = pwrite64(arg->fd, arg->buf, 1078 arg->bufsz, arg->offset); 1079 if (retval == -1) { 1080 if (errno == ESPIPE) { 1081 retval = write(arg->fd, 1082 arg->buf, arg->bufsz); 1083 if (retval == -1) 1084 error = errno; 1085 } else { 1086 error = errno; 1087 } 1088 } 1089 sigoff(self); /* block SIGAIOCANCEL */ 1090 break; 1091 #endif /* !defined(_LP64) */ 1092 case AIOFSYNC: 1093 if (_aio_fsync_del(aiowp, reqp)) 1094 goto top; 1095 ASSERT(reqp->req_head == NULL); 1096 /* 1097 * All writes for this fsync request are now 1098 * acknowledged. Now make these writes visible 1099 * and put the final request into the hash table. 1100 */ 1101 if (reqp->req_state == AIO_REQ_CANCELED) { 1102 /* EMPTY */; 1103 } else if (arg->offset == O_SYNC) { 1104 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1105 error = errno; 1106 } else { 1107 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1108 error = errno; 1109 } 1110 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1111 aio_panic("_aio_do_request(): AIOFSYNC: " 1112 "request already in hash table"); 1113 break; 1114 default: 1115 aio_panic("_aio_do_request, bad op"); 1116 } 1117 1118 _aio_finish_request(aiowp, retval, error); 1119 } 1120 /* NOTREACHED */ 1121 return (NULL); 1122 } 1123 1124 /* 1125 * Perform the tail processing for _aio_do_request(). 1126 * The in-progress request may or may not have been cancelled. 1127 */ 1128 static void 1129 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1130 { 1131 aio_req_t *reqp; 1132 1133 sig_mutex_lock(&aiowp->work_qlock1); 1134 if ((reqp = aiowp->work_req) == NULL) 1135 sig_mutex_unlock(&aiowp->work_qlock1); 1136 else { 1137 aiowp->work_req = NULL; 1138 if (reqp->req_state == AIO_REQ_CANCELED) { 1139 retval = -1; 1140 error = ECANCELED; 1141 } 1142 if (!POSIX_AIO(reqp)) { 1143 sig_mutex_unlock(&aiowp->work_qlock1); 1144 sig_mutex_lock(&__aio_mutex); 1145 if (reqp->req_state == AIO_REQ_INPROGRESS) 1146 reqp->req_state = AIO_REQ_DONE; 1147 _aio_req_done_cnt++; 1148 _aio_set_result(reqp, retval, error); 1149 if (error == ECANCELED) 1150 _aio_outstand_cnt--; 1151 sig_mutex_unlock(&__aio_mutex); 1152 } else { 1153 if (reqp->req_state == AIO_REQ_INPROGRESS) 1154 reqp->req_state = AIO_REQ_DONE; 1155 sig_mutex_unlock(&aiowp->work_qlock1); 1156 _aiodone(reqp, retval, error); 1157 } 1158 } 1159 } 1160 1161 void 1162 _aio_req_mark_done(aio_req_t *reqp) 1163 { 1164 #if !defined(_LP64) 1165 if (reqp->req_largefile) 1166 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1167 else 1168 #endif 1169 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1170 } 1171 1172 /* 1173 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1174 * hopefully to consume one of our queued signals. 1175 */ 1176 static void 1177 _aio_delay(int ticks) 1178 { 1179 (void) usleep(ticks * (MICROSEC / hz)); 1180 } 1181 1182 /* 1183 * Actually send the notifications. 1184 * We could block indefinitely here if the application 1185 * is not listening for the signal or port notifications. 1186 */ 1187 static void 1188 send_notification(notif_param_t *npp) 1189 { 1190 extern int __sigqueue(pid_t pid, int signo, 1191 /* const union sigval */ void *value, int si_code, int block); 1192 1193 if (npp->np_signo) 1194 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1195 SI_ASYNCIO, 1); 1196 else if (npp->np_port >= 0) 1197 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1198 npp->np_event, npp->np_object, npp->np_user); 1199 1200 if (npp->np_lio_signo) 1201 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1202 SI_ASYNCIO, 1); 1203 else if (npp->np_lio_port >= 0) 1204 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1205 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1206 } 1207 1208 /* 1209 * Asynchronous notification worker. 1210 */ 1211 void * 1212 _aio_do_notify(void *arg) 1213 { 1214 aio_worker_t *aiowp = (aio_worker_t *)arg; 1215 aio_req_t *reqp; 1216 1217 /* 1218 * This isn't really necessary. All signals are blocked. 1219 */ 1220 if (pthread_setspecific(_aio_key, aiowp) != 0) 1221 aio_panic("_aio_do_notify, pthread_setspecific()"); 1222 1223 /* 1224 * Notifications are never cancelled. 1225 * All signals remain blocked, forever. 1226 */ 1227 for (;;) { 1228 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1229 if (_aio_idle(aiowp) != 0) 1230 aio_panic("_aio_do_notify: _aio_idle() failed"); 1231 } 1232 send_notification(&reqp->req_notify); 1233 _aio_req_free(reqp); 1234 } 1235 1236 /* NOTREACHED */ 1237 return (NULL); 1238 } 1239 1240 /* 1241 * Do the completion semantics for a request that was either canceled 1242 * by _aio_cancel_req() or was completed by _aio_do_request(). 1243 */ 1244 static void 1245 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1246 { 1247 aio_result_t *resultp = reqp->req_resultp; 1248 int notify = 0; 1249 aio_lio_t *head; 1250 int sigev_none; 1251 int sigev_signal; 1252 int sigev_thread; 1253 int sigev_port; 1254 notif_param_t np; 1255 1256 /* 1257 * We call _aiodone() only for Posix I/O. 1258 */ 1259 ASSERT(POSIX_AIO(reqp)); 1260 1261 sigev_none = 0; 1262 sigev_signal = 0; 1263 sigev_thread = 0; 1264 sigev_port = 0; 1265 np.np_signo = 0; 1266 np.np_port = -1; 1267 np.np_lio_signo = 0; 1268 np.np_lio_port = -1; 1269 1270 switch (reqp->req_sigevent.sigev_notify) { 1271 case SIGEV_NONE: 1272 sigev_none = 1; 1273 break; 1274 case SIGEV_SIGNAL: 1275 sigev_signal = 1; 1276 break; 1277 case SIGEV_THREAD: 1278 sigev_thread = 1; 1279 break; 1280 case SIGEV_PORT: 1281 sigev_port = 1; 1282 break; 1283 default: 1284 aio_panic("_aiodone: improper sigev_notify"); 1285 break; 1286 } 1287 1288 /* 1289 * Figure out the notification parameters while holding __aio_mutex. 1290 * Actually perform the notifications after dropping __aio_mutex. 1291 * This allows us to sleep for a long time (if the notifications 1292 * incur delays) without impeding other async I/O operations. 1293 */ 1294 1295 sig_mutex_lock(&__aio_mutex); 1296 1297 if (sigev_signal) { 1298 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1299 notify = 1; 1300 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1301 } else if (sigev_thread | sigev_port) { 1302 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1303 notify = 1; 1304 np.np_event = reqp->req_op; 1305 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1306 np.np_event = AIOFSYNC64; 1307 np.np_object = (uintptr_t)reqp->req_aiocbp; 1308 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1309 } 1310 1311 if (resultp->aio_errno == EINPROGRESS) 1312 _aio_set_result(reqp, retval, error); 1313 1314 _aio_outstand_cnt--; 1315 1316 head = reqp->req_head; 1317 reqp->req_head = NULL; 1318 1319 if (sigev_none) { 1320 _aio_enq_doneq(reqp); 1321 reqp = NULL; 1322 } else { 1323 (void) _aio_hash_del(resultp); 1324 _aio_req_mark_done(reqp); 1325 } 1326 1327 _aio_waitn_wakeup(); 1328 1329 /* 1330 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1331 * __aio_suspend() increments "_aio_kernel_suspend" 1332 * when they are waiting in the kernel for completed I/Os. 1333 * 1334 * _kaio(AIONOTIFY) awakes the corresponding function 1335 * in the kernel; then the corresponding __aio_waitn() or 1336 * __aio_suspend() function could reap the recently 1337 * completed I/Os (_aiodone()). 1338 */ 1339 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1340 (void) _kaio(AIONOTIFY); 1341 1342 sig_mutex_unlock(&__aio_mutex); 1343 1344 if (head != NULL) { 1345 /* 1346 * If all the lio requests have completed, 1347 * prepare to notify the waiting thread. 1348 */ 1349 sig_mutex_lock(&head->lio_mutex); 1350 ASSERT(head->lio_refcnt == head->lio_nent); 1351 if (head->lio_refcnt == 1) { 1352 int waiting = 0; 1353 if (head->lio_mode == LIO_WAIT) { 1354 if ((waiting = head->lio_waiting) != 0) 1355 (void) cond_signal(&head->lio_cond_cv); 1356 } else if (head->lio_port < 0) { /* none or signal */ 1357 if ((np.np_lio_signo = head->lio_signo) != 0) 1358 notify = 1; 1359 np.np_lio_user = head->lio_sigval.sival_ptr; 1360 } else { /* thread or port */ 1361 notify = 1; 1362 np.np_lio_port = head->lio_port; 1363 np.np_lio_event = head->lio_event; 1364 np.np_lio_object = 1365 (uintptr_t)head->lio_sigevent; 1366 np.np_lio_user = head->lio_sigval.sival_ptr; 1367 } 1368 head->lio_nent = head->lio_refcnt = 0; 1369 sig_mutex_unlock(&head->lio_mutex); 1370 if (waiting == 0) 1371 _aio_lio_free(head); 1372 } else { 1373 head->lio_nent--; 1374 head->lio_refcnt--; 1375 sig_mutex_unlock(&head->lio_mutex); 1376 } 1377 } 1378 1379 /* 1380 * The request is completed; now perform the notifications. 1381 */ 1382 if (notify) { 1383 if (reqp != NULL) { 1384 /* 1385 * We usually put the request on the notification 1386 * queue because we don't want to block and delay 1387 * other operations behind us in the work queue. 1388 * Also we must never block on a cancel notification 1389 * because we are being called from an application 1390 * thread in this case and that could lead to deadlock 1391 * if no other thread is receiving notificatins. 1392 */ 1393 reqp->req_notify = np; 1394 reqp->req_op = AIONOTIFY; 1395 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1396 reqp = NULL; 1397 } else { 1398 /* 1399 * We already put the request on the done queue, 1400 * so we can't queue it to the notification queue. 1401 * Just do the notification directly. 1402 */ 1403 send_notification(&np); 1404 } 1405 } 1406 1407 if (reqp != NULL) 1408 _aio_req_free(reqp); 1409 } 1410 1411 /* 1412 * Delete fsync requests from list head until there is 1413 * only one left. Return 0 when there is only one, 1414 * otherwise return a non-zero value. 1415 */ 1416 static int 1417 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1418 { 1419 aio_lio_t *head = reqp->req_head; 1420 int rval = 0; 1421 1422 ASSERT(reqp == aiowp->work_req); 1423 sig_mutex_lock(&aiowp->work_qlock1); 1424 sig_mutex_lock(&head->lio_mutex); 1425 if (head->lio_refcnt > 1) { 1426 head->lio_refcnt--; 1427 head->lio_nent--; 1428 aiowp->work_req = NULL; 1429 sig_mutex_unlock(&head->lio_mutex); 1430 sig_mutex_unlock(&aiowp->work_qlock1); 1431 sig_mutex_lock(&__aio_mutex); 1432 _aio_outstand_cnt--; 1433 _aio_waitn_wakeup(); 1434 sig_mutex_unlock(&__aio_mutex); 1435 _aio_req_free(reqp); 1436 return (1); 1437 } 1438 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1439 reqp->req_head = NULL; 1440 if (head->lio_canned) 1441 reqp->req_state = AIO_REQ_CANCELED; 1442 if (head->lio_mode == LIO_DESTROY) { 1443 aiowp->work_req = NULL; 1444 rval = 1; 1445 } 1446 sig_mutex_unlock(&head->lio_mutex); 1447 sig_mutex_unlock(&aiowp->work_qlock1); 1448 head->lio_refcnt--; 1449 head->lio_nent--; 1450 _aio_lio_free(head); 1451 if (rval != 0) 1452 _aio_req_free(reqp); 1453 return (rval); 1454 } 1455 1456 /* 1457 * A worker is set idle when its work queue is empty. 1458 * The worker checks again that it has no more work 1459 * and then goes to sleep waiting for more work. 1460 */ 1461 int 1462 _aio_idle(aio_worker_t *aiowp) 1463 { 1464 int error = 0; 1465 1466 sig_mutex_lock(&aiowp->work_qlock1); 1467 if (aiowp->work_count1 == 0) { 1468 ASSERT(aiowp->work_minload1 == 0); 1469 aiowp->work_idleflg = 1; 1470 /* 1471 * A cancellation handler is not needed here. 1472 * aio worker threads are never cancelled via pthread_cancel(). 1473 */ 1474 error = sig_cond_wait(&aiowp->work_idle_cv, 1475 &aiowp->work_qlock1); 1476 /* 1477 * The idle flag is normally cleared before worker is awakened 1478 * by aio_req_add(). On error (EINTR), we clear it ourself. 1479 */ 1480 if (error) 1481 aiowp->work_idleflg = 0; 1482 } 1483 sig_mutex_unlock(&aiowp->work_qlock1); 1484 return (error); 1485 } 1486 1487 /* 1488 * A worker's completed AIO requests are placed onto a global 1489 * done queue. The application is only sent a SIGIO signal if 1490 * the process has a handler enabled and it is not waiting via 1491 * aiowait(). 1492 */ 1493 static void 1494 _aio_work_done(aio_worker_t *aiowp) 1495 { 1496 aio_req_t *reqp; 1497 1498 sig_mutex_lock(&aiowp->work_qlock1); 1499 reqp = aiowp->work_prev1; 1500 reqp->req_next = NULL; 1501 aiowp->work_done1 = 0; 1502 aiowp->work_tail1 = aiowp->work_next1; 1503 if (aiowp->work_tail1 == NULL) 1504 aiowp->work_head1 = NULL; 1505 aiowp->work_prev1 = NULL; 1506 sig_mutex_unlock(&aiowp->work_qlock1); 1507 sig_mutex_lock(&__aio_mutex); 1508 _aio_donecnt++; 1509 _aio_outstand_cnt--; 1510 _aio_req_done_cnt--; 1511 ASSERT(_aio_donecnt > 0 && 1512 _aio_outstand_cnt >= 0 && 1513 _aio_req_done_cnt >= 0); 1514 ASSERT(reqp != NULL); 1515 1516 if (_aio_done_tail == NULL) { 1517 _aio_done_head = _aio_done_tail = reqp; 1518 } else { 1519 _aio_done_head->req_next = reqp; 1520 _aio_done_head = reqp; 1521 } 1522 1523 if (_aiowait_flag) { 1524 sig_mutex_unlock(&__aio_mutex); 1525 (void) _kaio(AIONOTIFY); 1526 } else { 1527 sig_mutex_unlock(&__aio_mutex); 1528 if (_sigio_enabled) 1529 (void) kill(__pid, SIGIO); 1530 } 1531 } 1532 1533 /* 1534 * The done queue consists of AIO requests that are in either the 1535 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1536 * are discarded. If the done queue is empty then NULL is returned. 1537 * Otherwise the address of a done aio_result_t is returned. 1538 */ 1539 aio_result_t * 1540 _aio_req_done(void) 1541 { 1542 aio_req_t *reqp; 1543 aio_result_t *resultp; 1544 1545 ASSERT(MUTEX_HELD(&__aio_mutex)); 1546 1547 if ((reqp = _aio_done_tail) != NULL) { 1548 if ((_aio_done_tail = reqp->req_next) == NULL) 1549 _aio_done_head = NULL; 1550 ASSERT(_aio_donecnt > 0); 1551 _aio_donecnt--; 1552 (void) _aio_hash_del(reqp->req_resultp); 1553 resultp = reqp->req_resultp; 1554 ASSERT(reqp->req_state == AIO_REQ_DONE); 1555 _aio_req_free(reqp); 1556 return (resultp); 1557 } 1558 /* is queue empty? */ 1559 if (reqp == NULL && _aio_outstand_cnt == 0) { 1560 return ((aio_result_t *)-1); 1561 } 1562 return (NULL); 1563 } 1564 1565 /* 1566 * Set the return and errno values for the application's use. 1567 * 1568 * For the Posix interfaces, we must set the return value first followed 1569 * by the errno value because the Posix interfaces allow for a change 1570 * in the errno value from EINPROGRESS to something else to signal 1571 * the completion of the asynchronous request. 1572 * 1573 * The opposite is true for the Solaris interfaces. These allow for 1574 * a change in the return value from AIO_INPROGRESS to something else 1575 * to signal the completion of the asynchronous request. 1576 */ 1577 void 1578 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1579 { 1580 aio_result_t *resultp = reqp->req_resultp; 1581 1582 if (POSIX_AIO(reqp)) { 1583 resultp->aio_return = retval; 1584 membar_producer(); 1585 resultp->aio_errno = error; 1586 } else { 1587 resultp->aio_errno = error; 1588 membar_producer(); 1589 resultp->aio_return = retval; 1590 } 1591 } 1592 1593 /* 1594 * Add an AIO request onto the next work queue. 1595 * A circular list of workers is used to choose the next worker. 1596 */ 1597 void 1598 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1599 { 1600 ulwp_t *self = curthread; 1601 aio_worker_t *aiowp; 1602 aio_worker_t *first; 1603 int load_bal_flg = 1; 1604 int found; 1605 1606 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1607 reqp->req_next = NULL; 1608 /* 1609 * Try to acquire the next worker's work queue. If it is locked, 1610 * then search the list of workers until a queue is found unlocked, 1611 * or until the list is completely traversed at which point another 1612 * worker will be created. 1613 */ 1614 sigoff(self); /* defer SIGIO */ 1615 sig_mutex_lock(&__aio_mutex); 1616 first = aiowp = *nextworker; 1617 if (mode != AIONOTIFY) 1618 _aio_outstand_cnt++; 1619 sig_mutex_unlock(&__aio_mutex); 1620 1621 switch (mode) { 1622 case AIOREAD: 1623 case AIOWRITE: 1624 case AIOAREAD: 1625 case AIOAWRITE: 1626 #if !defined(_LP64) 1627 case AIOAREAD64: 1628 case AIOAWRITE64: 1629 #endif 1630 /* try to find an idle worker */ 1631 found = 0; 1632 do { 1633 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1634 if (aiowp->work_idleflg) { 1635 found = 1; 1636 break; 1637 } 1638 sig_mutex_unlock(&aiowp->work_qlock1); 1639 } 1640 } while ((aiowp = aiowp->work_forw) != first); 1641 1642 if (found) { 1643 aiowp->work_minload1++; 1644 break; 1645 } 1646 1647 /* try to acquire some worker's queue lock */ 1648 do { 1649 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1650 found = 1; 1651 break; 1652 } 1653 } while ((aiowp = aiowp->work_forw) != first); 1654 1655 /* 1656 * Create more workers when the workers appear overloaded. 1657 * Either all the workers are busy draining their queues 1658 * or no worker's queue lock could be acquired. 1659 */ 1660 if (!found) { 1661 if (_aio_worker_cnt < _max_workers) { 1662 if (_aio_create_worker(reqp, mode)) 1663 aio_panic("_aio_req_add: add worker"); 1664 sigon(self); /* reenable SIGIO */ 1665 return; 1666 } 1667 1668 /* 1669 * No worker available and we have created 1670 * _max_workers, keep going through the 1671 * list slowly until we get a lock 1672 */ 1673 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1674 /* 1675 * give someone else a chance 1676 */ 1677 _aio_delay(1); 1678 aiowp = aiowp->work_forw; 1679 } 1680 } 1681 1682 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1683 if (_aio_worker_cnt < _max_workers && 1684 aiowp->work_minload1 >= _minworkload) { 1685 sig_mutex_unlock(&aiowp->work_qlock1); 1686 sig_mutex_lock(&__aio_mutex); 1687 *nextworker = aiowp->work_forw; 1688 sig_mutex_unlock(&__aio_mutex); 1689 if (_aio_create_worker(reqp, mode)) 1690 aio_panic("aio_req_add: add worker"); 1691 sigon(self); /* reenable SIGIO */ 1692 return; 1693 } 1694 aiowp->work_minload1++; 1695 break; 1696 case AIOFSYNC: 1697 case AIONOTIFY: 1698 load_bal_flg = 0; 1699 sig_mutex_lock(&aiowp->work_qlock1); 1700 break; 1701 default: 1702 aio_panic("_aio_req_add: invalid mode"); 1703 break; 1704 } 1705 /* 1706 * Put request onto worker's work queue. 1707 */ 1708 if (aiowp->work_tail1 == NULL) { 1709 ASSERT(aiowp->work_count1 == 0); 1710 aiowp->work_tail1 = reqp; 1711 aiowp->work_next1 = reqp; 1712 } else { 1713 aiowp->work_head1->req_next = reqp; 1714 if (aiowp->work_next1 == NULL) 1715 aiowp->work_next1 = reqp; 1716 } 1717 reqp->req_state = AIO_REQ_QUEUED; 1718 reqp->req_worker = aiowp; 1719 aiowp->work_head1 = reqp; 1720 /* 1721 * Awaken worker if it is not currently active. 1722 */ 1723 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1724 aiowp->work_idleflg = 0; 1725 (void) cond_signal(&aiowp->work_idle_cv); 1726 } 1727 sig_mutex_unlock(&aiowp->work_qlock1); 1728 1729 if (load_bal_flg) { 1730 sig_mutex_lock(&__aio_mutex); 1731 *nextworker = aiowp->work_forw; 1732 sig_mutex_unlock(&__aio_mutex); 1733 } 1734 sigon(self); /* reenable SIGIO */ 1735 } 1736 1737 /* 1738 * Get an AIO request for a specified worker. 1739 * If the work queue is empty, return NULL. 1740 */ 1741 aio_req_t * 1742 _aio_req_get(aio_worker_t *aiowp) 1743 { 1744 aio_req_t *reqp; 1745 1746 sig_mutex_lock(&aiowp->work_qlock1); 1747 if ((reqp = aiowp->work_next1) != NULL) { 1748 /* 1749 * Remove a POSIX request from the queue; the 1750 * request queue is a singularly linked list 1751 * with a previous pointer. The request is 1752 * removed by updating the previous pointer. 1753 * 1754 * Non-posix requests are left on the queue 1755 * to eventually be placed on the done queue. 1756 */ 1757 1758 if (POSIX_AIO(reqp)) { 1759 if (aiowp->work_prev1 == NULL) { 1760 aiowp->work_tail1 = reqp->req_next; 1761 if (aiowp->work_tail1 == NULL) 1762 aiowp->work_head1 = NULL; 1763 } else { 1764 aiowp->work_prev1->req_next = reqp->req_next; 1765 if (aiowp->work_head1 == reqp) 1766 aiowp->work_head1 = reqp->req_next; 1767 } 1768 1769 } else { 1770 aiowp->work_prev1 = reqp; 1771 ASSERT(aiowp->work_done1 >= 0); 1772 aiowp->work_done1++; 1773 } 1774 ASSERT(reqp != reqp->req_next); 1775 aiowp->work_next1 = reqp->req_next; 1776 ASSERT(aiowp->work_count1 >= 1); 1777 aiowp->work_count1--; 1778 switch (reqp->req_op) { 1779 case AIOREAD: 1780 case AIOWRITE: 1781 case AIOAREAD: 1782 case AIOAWRITE: 1783 #if !defined(_LP64) 1784 case AIOAREAD64: 1785 case AIOAWRITE64: 1786 #endif 1787 ASSERT(aiowp->work_minload1 > 0); 1788 aiowp->work_minload1--; 1789 break; 1790 } 1791 reqp->req_state = AIO_REQ_INPROGRESS; 1792 } 1793 aiowp->work_req = reqp; 1794 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1795 sig_mutex_unlock(&aiowp->work_qlock1); 1796 return (reqp); 1797 } 1798 1799 static void 1800 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1801 { 1802 aio_req_t **last; 1803 aio_req_t *lastrp; 1804 aio_req_t *next; 1805 1806 ASSERT(aiowp != NULL); 1807 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1808 if (POSIX_AIO(reqp)) { 1809 if (ostate != AIO_REQ_QUEUED) 1810 return; 1811 } 1812 last = &aiowp->work_tail1; 1813 lastrp = aiowp->work_tail1; 1814 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1815 while ((next = *last) != NULL) { 1816 if (next == reqp) { 1817 *last = next->req_next; 1818 if (aiowp->work_next1 == next) 1819 aiowp->work_next1 = next->req_next; 1820 1821 if ((next->req_next != NULL) || 1822 (aiowp->work_done1 == 0)) { 1823 if (aiowp->work_head1 == next) 1824 aiowp->work_head1 = next->req_next; 1825 if (aiowp->work_prev1 == next) 1826 aiowp->work_prev1 = next->req_next; 1827 } else { 1828 if (aiowp->work_head1 == next) 1829 aiowp->work_head1 = lastrp; 1830 if (aiowp->work_prev1 == next) 1831 aiowp->work_prev1 = lastrp; 1832 } 1833 1834 if (ostate == AIO_REQ_QUEUED) { 1835 ASSERT(aiowp->work_count1 >= 1); 1836 aiowp->work_count1--; 1837 ASSERT(aiowp->work_minload1 >= 1); 1838 aiowp->work_minload1--; 1839 } else { 1840 ASSERT(ostate == AIO_REQ_INPROGRESS && 1841 !POSIX_AIO(reqp)); 1842 aiowp->work_done1--; 1843 } 1844 return; 1845 } 1846 last = &next->req_next; 1847 lastrp = next; 1848 } 1849 /* NOTREACHED */ 1850 } 1851 1852 static void 1853 _aio_enq_doneq(aio_req_t *reqp) 1854 { 1855 if (_aio_doneq == NULL) { 1856 _aio_doneq = reqp; 1857 reqp->req_next = reqp->req_prev = reqp; 1858 } else { 1859 reqp->req_next = _aio_doneq; 1860 reqp->req_prev = _aio_doneq->req_prev; 1861 _aio_doneq->req_prev->req_next = reqp; 1862 _aio_doneq->req_prev = reqp; 1863 } 1864 reqp->req_state = AIO_REQ_DONEQ; 1865 _aio_doneq_cnt++; 1866 } 1867 1868 /* 1869 * caller owns the _aio_mutex 1870 */ 1871 aio_req_t * 1872 _aio_req_remove(aio_req_t *reqp) 1873 { 1874 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1875 return (NULL); 1876 1877 if (reqp) { 1878 /* request in done queue */ 1879 if (_aio_doneq == reqp) 1880 _aio_doneq = reqp->req_next; 1881 if (_aio_doneq == reqp) { 1882 /* only one request on queue */ 1883 _aio_doneq = NULL; 1884 } else { 1885 aio_req_t *tmp = reqp->req_next; 1886 reqp->req_prev->req_next = tmp; 1887 tmp->req_prev = reqp->req_prev; 1888 } 1889 } else if ((reqp = _aio_doneq) != NULL) { 1890 if (reqp == reqp->req_next) { 1891 /* only one request on queue */ 1892 _aio_doneq = NULL; 1893 } else { 1894 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1895 _aio_doneq->req_prev = reqp->req_prev; 1896 } 1897 } 1898 if (reqp) { 1899 _aio_doneq_cnt--; 1900 reqp->req_next = reqp->req_prev = reqp; 1901 reqp->req_state = AIO_REQ_DONE; 1902 } 1903 return (reqp); 1904 } 1905 1906 /* 1907 * An AIO request is identified by an aio_result_t pointer. The library 1908 * maps this aio_result_t pointer to its internal representation using a 1909 * hash table. This function adds an aio_result_t pointer to the hash table. 1910 */ 1911 static int 1912 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1913 { 1914 aio_hash_t *hashp; 1915 aio_req_t **prev; 1916 aio_req_t *next; 1917 1918 hashp = _aio_hash + AIOHASH(resultp); 1919 lmutex_lock(&hashp->hash_lock); 1920 prev = &hashp->hash_ptr; 1921 while ((next = *prev) != NULL) { 1922 if (resultp == next->req_resultp) { 1923 lmutex_unlock(&hashp->hash_lock); 1924 return (-1); 1925 } 1926 prev = &next->req_link; 1927 } 1928 *prev = reqp; 1929 ASSERT(reqp->req_link == NULL); 1930 lmutex_unlock(&hashp->hash_lock); 1931 return (0); 1932 } 1933 1934 /* 1935 * Remove an entry from the hash table. 1936 */ 1937 aio_req_t * 1938 _aio_hash_del(aio_result_t *resultp) 1939 { 1940 aio_hash_t *hashp; 1941 aio_req_t **prev; 1942 aio_req_t *next = NULL; 1943 1944 if (_aio_hash != NULL) { 1945 hashp = _aio_hash + AIOHASH(resultp); 1946 lmutex_lock(&hashp->hash_lock); 1947 prev = &hashp->hash_ptr; 1948 while ((next = *prev) != NULL) { 1949 if (resultp == next->req_resultp) { 1950 *prev = next->req_link; 1951 next->req_link = NULL; 1952 break; 1953 } 1954 prev = &next->req_link; 1955 } 1956 lmutex_unlock(&hashp->hash_lock); 1957 } 1958 return (next); 1959 } 1960 1961 /* 1962 * find an entry in the hash table 1963 */ 1964 aio_req_t * 1965 _aio_hash_find(aio_result_t *resultp) 1966 { 1967 aio_hash_t *hashp; 1968 aio_req_t **prev; 1969 aio_req_t *next = NULL; 1970 1971 if (_aio_hash != NULL) { 1972 hashp = _aio_hash + AIOHASH(resultp); 1973 lmutex_lock(&hashp->hash_lock); 1974 prev = &hashp->hash_ptr; 1975 while ((next = *prev) != NULL) { 1976 if (resultp == next->req_resultp) 1977 break; 1978 prev = &next->req_link; 1979 } 1980 lmutex_unlock(&hashp->hash_lock); 1981 } 1982 return (next); 1983 } 1984 1985 /* 1986 * AIO interface for POSIX 1987 */ 1988 int 1989 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 1990 int mode, int flg) 1991 { 1992 aio_req_t *reqp; 1993 aio_args_t *ap; 1994 int kerr; 1995 1996 if (aiocbp == NULL) { 1997 errno = EINVAL; 1998 return (-1); 1999 } 2000 2001 /* initialize kaio */ 2002 if (!_kaio_ok) 2003 _kaio_init(); 2004 2005 aiocbp->aio_state = NOCHECK; 2006 2007 /* 2008 * If we have been called because a list I/O 2009 * kaio() failed, we dont want to repeat the 2010 * system call 2011 */ 2012 2013 if (flg & AIO_KAIO) { 2014 /* 2015 * Try kernel aio first. 2016 * If errno is ENOTSUP/EBADFD, 2017 * fall back to the thread implementation. 2018 */ 2019 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2020 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2021 aiocbp->aio_state = CHECK; 2022 kerr = (int)_kaio(mode, aiocbp); 2023 if (kerr == 0) 2024 return (0); 2025 if (errno != ENOTSUP && errno != EBADFD) { 2026 aiocbp->aio_resultp.aio_errno = errno; 2027 aiocbp->aio_resultp.aio_return = -1; 2028 aiocbp->aio_state = NOCHECK; 2029 return (-1); 2030 } 2031 if (errno == EBADFD) 2032 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2033 } 2034 } 2035 2036 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2037 aiocbp->aio_state = USERAIO; 2038 2039 if (!__uaio_ok && __uaio_init() == -1) 2040 return (-1); 2041 2042 if ((reqp = _aio_req_alloc()) == NULL) { 2043 errno = EAGAIN; 2044 return (-1); 2045 } 2046 2047 /* 2048 * If an LIO request, add the list head to the aio request 2049 */ 2050 reqp->req_head = lio_head; 2051 reqp->req_type = AIO_POSIX_REQ; 2052 reqp->req_op = mode; 2053 reqp->req_largefile = 0; 2054 2055 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2056 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2057 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2058 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2059 reqp->req_sigevent.sigev_signo = 2060 aiocbp->aio_sigevent.sigev_signo; 2061 reqp->req_sigevent.sigev_value.sival_ptr = 2062 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2063 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2064 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2065 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2066 /* 2067 * Reuse the sigevent structure to contain the port number 2068 * and the user value. Same for SIGEV_THREAD, below. 2069 */ 2070 reqp->req_sigevent.sigev_signo = 2071 pn->portnfy_port; 2072 reqp->req_sigevent.sigev_value.sival_ptr = 2073 pn->portnfy_user; 2074 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2075 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2076 /* 2077 * The sigevent structure contains the port number 2078 * and the user value. Same for SIGEV_PORT, above. 2079 */ 2080 reqp->req_sigevent.sigev_signo = 2081 aiocbp->aio_sigevent.sigev_signo; 2082 reqp->req_sigevent.sigev_value.sival_ptr = 2083 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2084 } 2085 2086 reqp->req_resultp = &aiocbp->aio_resultp; 2087 reqp->req_aiocbp = aiocbp; 2088 ap = &reqp->req_args; 2089 ap->fd = aiocbp->aio_fildes; 2090 ap->buf = (caddr_t)aiocbp->aio_buf; 2091 ap->bufsz = aiocbp->aio_nbytes; 2092 ap->offset = aiocbp->aio_offset; 2093 2094 if ((flg & AIO_NO_DUPS) && 2095 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2096 aio_panic("_aio_rw(): request already in hash table"); 2097 _aio_req_free(reqp); 2098 errno = EINVAL; 2099 return (-1); 2100 } 2101 _aio_req_add(reqp, nextworker, mode); 2102 return (0); 2103 } 2104 2105 #if !defined(_LP64) 2106 /* 2107 * 64-bit AIO interface for POSIX 2108 */ 2109 int 2110 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2111 int mode, int flg) 2112 { 2113 aio_req_t *reqp; 2114 aio_args_t *ap; 2115 int kerr; 2116 2117 if (aiocbp == NULL) { 2118 errno = EINVAL; 2119 return (-1); 2120 } 2121 2122 /* initialize kaio */ 2123 if (!_kaio_ok) 2124 _kaio_init(); 2125 2126 aiocbp->aio_state = NOCHECK; 2127 2128 /* 2129 * If we have been called because a list I/O 2130 * kaio() failed, we dont want to repeat the 2131 * system call 2132 */ 2133 2134 if (flg & AIO_KAIO) { 2135 /* 2136 * Try kernel aio first. 2137 * If errno is ENOTSUP/EBADFD, 2138 * fall back to the thread implementation. 2139 */ 2140 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2141 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2142 aiocbp->aio_state = CHECK; 2143 kerr = (int)_kaio(mode, aiocbp); 2144 if (kerr == 0) 2145 return (0); 2146 if (errno != ENOTSUP && errno != EBADFD) { 2147 aiocbp->aio_resultp.aio_errno = errno; 2148 aiocbp->aio_resultp.aio_return = -1; 2149 aiocbp->aio_state = NOCHECK; 2150 return (-1); 2151 } 2152 if (errno == EBADFD) 2153 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2154 } 2155 } 2156 2157 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2158 aiocbp->aio_state = USERAIO; 2159 2160 if (!__uaio_ok && __uaio_init() == -1) 2161 return (-1); 2162 2163 if ((reqp = _aio_req_alloc()) == NULL) { 2164 errno = EAGAIN; 2165 return (-1); 2166 } 2167 2168 /* 2169 * If an LIO request, add the list head to the aio request 2170 */ 2171 reqp->req_head = lio_head; 2172 reqp->req_type = AIO_POSIX_REQ; 2173 reqp->req_op = mode; 2174 reqp->req_largefile = 1; 2175 2176 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2177 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2178 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2179 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2180 reqp->req_sigevent.sigev_signo = 2181 aiocbp->aio_sigevent.sigev_signo; 2182 reqp->req_sigevent.sigev_value.sival_ptr = 2183 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2184 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2185 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2186 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2187 reqp->req_sigevent.sigev_signo = 2188 pn->portnfy_port; 2189 reqp->req_sigevent.sigev_value.sival_ptr = 2190 pn->portnfy_user; 2191 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2192 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2193 reqp->req_sigevent.sigev_signo = 2194 aiocbp->aio_sigevent.sigev_signo; 2195 reqp->req_sigevent.sigev_value.sival_ptr = 2196 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2197 } 2198 2199 reqp->req_resultp = &aiocbp->aio_resultp; 2200 reqp->req_aiocbp = aiocbp; 2201 ap = &reqp->req_args; 2202 ap->fd = aiocbp->aio_fildes; 2203 ap->buf = (caddr_t)aiocbp->aio_buf; 2204 ap->bufsz = aiocbp->aio_nbytes; 2205 ap->offset = aiocbp->aio_offset; 2206 2207 if ((flg & AIO_NO_DUPS) && 2208 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2209 aio_panic("_aio_rw64(): request already in hash table"); 2210 _aio_req_free(reqp); 2211 errno = EINVAL; 2212 return (-1); 2213 } 2214 _aio_req_add(reqp, nextworker, mode); 2215 return (0); 2216 } 2217 #endif /* !defined(_LP64) */ 2218