1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "synonyms.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48 49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50 static void _aiodone(aio_req_t *, ssize_t, int); 51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53 54 /* 55 * switch for kernel async I/O 56 */ 57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58 59 /* 60 * Key for thread-specific data 61 */ 62 pthread_key_t _aio_key; 63 64 /* 65 * Array for determining whether or not a file supports kaio. 66 * Initialized in _kaio_init(). 67 */ 68 uint32_t *_kaio_supported = NULL; 69 70 /* 71 * workers for read/write requests 72 * (__aio_mutex lock protects circular linked list of workers) 73 */ 74 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76 int __rw_workerscnt; /* number of read/write workers */ 77 78 /* 79 * worker for notification requests. 80 */ 81 aio_worker_t *__workers_no; /* circular list of AIO workers */ 82 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83 int __no_workerscnt; /* number of write workers */ 84 85 aio_req_t *_aio_done_tail; /* list of done requests */ 86 aio_req_t *_aio_done_head; 87 88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89 cond_t __aio_initcv = DEFAULTCV; 90 int __aio_initbusy = 0; 91 92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94 95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97 98 aio_hash_t *_aio_hash; 99 100 aio_req_t *_aio_doneq; /* double linked done queue list */ 101 102 int _aio_donecnt = 0; 103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104 int _aio_doneq_cnt = 0; 105 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110 111 int _max_workers = 256; /* max number of workers permitted */ 112 int _min_workers = 4; /* min number of workers */ 113 int _minworkload = 2; /* min number of request in q */ 114 int _aio_worker_cnt = 0; /* number of workers to do requests */ 115 int __uaio_ok = 0; /* AIO has been enabled */ 116 sigset_t _worker_set; /* worker's signal mask */ 117 118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119 int _aio_flags = 0; /* see asyncio.h defines for */ 120 121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122 123 int hz; /* clock ticks per second */ 124 125 static int 126 _kaio_supported_init(void) 127 { 128 void *ptr; 129 size_t size; 130 131 if (_kaio_supported != NULL) /* already initialized */ 132 return (0); 133 134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137 if (ptr == MAP_FAILED) 138 return (-1); 139 _kaio_supported = ptr; 140 return (0); 141 } 142 143 /* 144 * The aio subsystem is initialized when an AIO request is made. 145 * Constants are initialized like the max number of workers that 146 * the subsystem can create, and the minimum number of workers 147 * permitted before imposing some restrictions. Also, some 148 * workers are created. 149 */ 150 int 151 __uaio_init(void) 152 { 153 int ret = -1; 154 int i; 155 156 lmutex_lock(&__aio_initlock); 157 while (__aio_initbusy) 158 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 159 if (__uaio_ok) { /* already initialized */ 160 lmutex_unlock(&__aio_initlock); 161 return (0); 162 } 163 __aio_initbusy = 1; 164 lmutex_unlock(&__aio_initlock); 165 166 hz = (int)sysconf(_SC_CLK_TCK); 167 __pid = getpid(); 168 169 setup_cancelsig(SIGAIOCANCEL); 170 171 if (_kaio_supported_init() != 0) 172 goto out; 173 174 /* 175 * Allocate and initialize the hash table. 176 * Do this only once, even if __uaio_init() is called twice. 177 */ 178 if (_aio_hash == NULL) { 179 /* LINTED pointer cast */ 180 _aio_hash = (aio_hash_t *)mmap(NULL, 181 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 182 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 183 if ((void *)_aio_hash == MAP_FAILED) { 184 _aio_hash = NULL; 185 goto out; 186 } 187 for (i = 0; i < HASHSZ; i++) 188 (void) mutex_init(&_aio_hash[i].hash_lock, 189 USYNC_THREAD, NULL); 190 } 191 192 /* 193 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 194 */ 195 (void) sigfillset(&_worker_set); 196 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 197 198 /* 199 * Create one worker to send asynchronous notifications. 200 * Do this only once, even if __uaio_init() is called twice. 201 */ 202 if (__no_workerscnt == 0 && 203 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 204 errno = EAGAIN; 205 goto out; 206 } 207 208 /* 209 * Create the minimum number of read/write workers. 210 * And later check whether atleast one worker is created; 211 * lwp_create() calls could fail because of segkp exhaustion. 212 */ 213 for (i = 0; i < _min_workers; i++) 214 (void) _aio_create_worker(NULL, AIOREAD); 215 if (__rw_workerscnt == 0) { 216 errno = EAGAIN; 217 goto out; 218 } 219 220 ret = 0; 221 out: 222 lmutex_lock(&__aio_initlock); 223 if (ret == 0) 224 __uaio_ok = 1; 225 __aio_initbusy = 0; 226 (void) cond_broadcast(&__aio_initcv); 227 lmutex_unlock(&__aio_initlock); 228 return (ret); 229 } 230 231 /* 232 * Called from close() before actually performing the real _close(). 233 */ 234 void 235 _aio_close(int fd) 236 { 237 if (fd < 0) /* avoid cancelling everything */ 238 return; 239 /* 240 * Cancel all outstanding aio requests for this file descriptor. 241 */ 242 if (__uaio_ok) 243 (void) aiocancel_all(fd); 244 /* 245 * If we have allocated the bit array, clear the bit for this file. 246 * The next open may re-use this file descriptor and the new file 247 * may have different kaio() behaviour. 248 */ 249 if (_kaio_supported != NULL) 250 CLEAR_KAIO_SUPPORTED(fd); 251 } 252 253 /* 254 * special kaio cleanup thread sits in a loop in the 255 * kernel waiting for pending kaio requests to complete. 256 */ 257 void * 258 _kaio_cleanup_thread(void *arg) 259 { 260 if (pthread_setspecific(_aio_key, arg) != 0) 261 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 262 (void) _kaio(AIOSTART); 263 return (arg); 264 } 265 266 /* 267 * initialize kaio. 268 */ 269 void 270 _kaio_init() 271 { 272 int error; 273 sigset_t oset; 274 275 lmutex_lock(&__aio_initlock); 276 while (__aio_initbusy) 277 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 278 if (_kaio_ok) { /* already initialized */ 279 lmutex_unlock(&__aio_initlock); 280 return; 281 } 282 __aio_initbusy = 1; 283 lmutex_unlock(&__aio_initlock); 284 285 if (_kaio_supported_init() != 0) 286 error = ENOMEM; 287 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 288 error = ENOMEM; 289 else if ((error = (int)_kaio(AIOINIT)) == 0) { 290 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 291 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 292 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 293 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 294 } 295 if (error && _kaiowp != NULL) { 296 _aio_worker_free(_kaiowp); 297 _kaiowp = NULL; 298 } 299 300 lmutex_lock(&__aio_initlock); 301 if (error) 302 _kaio_ok = -1; 303 else 304 _kaio_ok = 1; 305 __aio_initbusy = 0; 306 (void) cond_broadcast(&__aio_initcv); 307 lmutex_unlock(&__aio_initlock); 308 } 309 310 int 311 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 312 aio_result_t *resultp) 313 { 314 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 315 } 316 317 int 318 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 319 aio_result_t *resultp) 320 { 321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 322 } 323 324 #if !defined(_LP64) 325 int 326 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 327 aio_result_t *resultp) 328 { 329 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 330 } 331 332 int 333 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 334 aio_result_t *resultp) 335 { 336 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 337 } 338 #endif /* !defined(_LP64) */ 339 340 int 341 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 342 aio_result_t *resultp, int mode) 343 { 344 aio_req_t *reqp; 345 aio_args_t *ap; 346 offset_t loffset; 347 struct stat64 stat64; 348 int error = 0; 349 int kerr; 350 int umode; 351 352 switch (whence) { 353 354 case SEEK_SET: 355 loffset = offset; 356 break; 357 case SEEK_CUR: 358 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 359 error = -1; 360 else 361 loffset += offset; 362 break; 363 case SEEK_END: 364 if (fstat64(fd, &stat64) == -1) 365 error = -1; 366 else 367 loffset = offset + stat64.st_size; 368 break; 369 default: 370 errno = EINVAL; 371 error = -1; 372 } 373 374 if (error) 375 return (error); 376 377 /* initialize kaio */ 378 if (!_kaio_ok) 379 _kaio_init(); 380 381 /* 382 * _aio_do_request() needs the original request code (mode) to be able 383 * to choose the appropiate 32/64 bit function. All other functions 384 * only require the difference between READ and WRITE (umode). 385 */ 386 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 387 umode = mode - AIOAREAD64; 388 else 389 umode = mode; 390 391 /* 392 * Try kernel aio first. 393 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 394 */ 395 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 396 resultp->aio_errno = 0; 397 sig_mutex_lock(&__aio_mutex); 398 _kaio_outstand_cnt++; 399 sig_mutex_unlock(&__aio_mutex); 400 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 401 (umode | AIO_POLL_BIT) : umode), 402 fd, buf, bufsz, loffset, resultp); 403 if (kerr == 0) { 404 return (0); 405 } 406 sig_mutex_lock(&__aio_mutex); 407 _kaio_outstand_cnt--; 408 sig_mutex_unlock(&__aio_mutex); 409 if (errno != ENOTSUP && errno != EBADFD) 410 return (-1); 411 if (errno == EBADFD) 412 SET_KAIO_NOT_SUPPORTED(fd); 413 } 414 415 if (!__uaio_ok && __uaio_init() == -1) 416 return (-1); 417 418 if ((reqp = _aio_req_alloc()) == NULL) { 419 errno = EAGAIN; 420 return (-1); 421 } 422 423 /* 424 * _aio_do_request() checks reqp->req_op to differentiate 425 * between 32 and 64 bit access. 426 */ 427 reqp->req_op = mode; 428 reqp->req_resultp = resultp; 429 ap = &reqp->req_args; 430 ap->fd = fd; 431 ap->buf = buf; 432 ap->bufsz = bufsz; 433 ap->offset = loffset; 434 435 if (_aio_hash_insert(resultp, reqp) != 0) { 436 _aio_req_free(reqp); 437 errno = EINVAL; 438 return (-1); 439 } 440 /* 441 * _aio_req_add() only needs the difference between READ and 442 * WRITE to choose the right worker queue. 443 */ 444 _aio_req_add(reqp, &__nextworker_rw, umode); 445 return (0); 446 } 447 448 int 449 aiocancel(aio_result_t *resultp) 450 { 451 aio_req_t *reqp; 452 aio_worker_t *aiowp; 453 int ret; 454 int done = 0; 455 int canceled = 0; 456 457 if (!__uaio_ok) { 458 errno = EINVAL; 459 return (-1); 460 } 461 462 sig_mutex_lock(&__aio_mutex); 463 reqp = _aio_hash_find(resultp); 464 if (reqp == NULL) { 465 if (_aio_outstand_cnt == _aio_req_done_cnt) 466 errno = EINVAL; 467 else 468 errno = EACCES; 469 ret = -1; 470 } else { 471 aiowp = reqp->req_worker; 472 sig_mutex_lock(&aiowp->work_qlock1); 473 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 474 sig_mutex_unlock(&aiowp->work_qlock1); 475 476 if (canceled) { 477 ret = 0; 478 } else { 479 if (_aio_outstand_cnt == 0 || 480 _aio_outstand_cnt == _aio_req_done_cnt) 481 errno = EINVAL; 482 else 483 errno = EACCES; 484 ret = -1; 485 } 486 } 487 sig_mutex_unlock(&__aio_mutex); 488 return (ret); 489 } 490 491 /* 492 * This must be asynch safe 493 */ 494 aio_result_t * 495 aiowait(struct timeval *uwait) 496 { 497 aio_result_t *uresultp; 498 aio_result_t *kresultp; 499 aio_result_t *resultp; 500 int dontblock; 501 int timedwait = 0; 502 int kaio_errno = 0; 503 struct timeval twait; 504 struct timeval *wait = NULL; 505 hrtime_t hrtend; 506 hrtime_t hres; 507 508 if (uwait) { 509 /* 510 * Check for a valid specified wait time. 511 * If it is invalid, fail the call right away. 512 */ 513 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 514 uwait->tv_usec >= MICROSEC) { 515 errno = EINVAL; 516 return ((aio_result_t *)-1); 517 } 518 519 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 520 hrtend = gethrtime() + 521 (hrtime_t)uwait->tv_sec * NANOSEC + 522 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 523 twait = *uwait; 524 wait = &twait; 525 timedwait++; 526 } else { 527 /* polling */ 528 sig_mutex_lock(&__aio_mutex); 529 if (_kaio_outstand_cnt == 0) { 530 kresultp = (aio_result_t *)-1; 531 } else { 532 kresultp = (aio_result_t *)_kaio(AIOWAIT, 533 (struct timeval *)-1, 1); 534 if (kresultp != (aio_result_t *)-1 && 535 kresultp != NULL && 536 kresultp != (aio_result_t *)1) { 537 _kaio_outstand_cnt--; 538 sig_mutex_unlock(&__aio_mutex); 539 return (kresultp); 540 } 541 } 542 uresultp = _aio_req_done(); 543 sig_mutex_unlock(&__aio_mutex); 544 if (uresultp != NULL && 545 uresultp != (aio_result_t *)-1) { 546 return (uresultp); 547 } 548 if (uresultp == (aio_result_t *)-1 && 549 kresultp == (aio_result_t *)-1) { 550 errno = EINVAL; 551 return ((aio_result_t *)-1); 552 } else { 553 return (NULL); 554 } 555 } 556 } 557 558 for (;;) { 559 sig_mutex_lock(&__aio_mutex); 560 uresultp = _aio_req_done(); 561 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 562 sig_mutex_unlock(&__aio_mutex); 563 resultp = uresultp; 564 break; 565 } 566 _aiowait_flag++; 567 dontblock = (uresultp == (aio_result_t *)-1); 568 if (dontblock && _kaio_outstand_cnt == 0) { 569 kresultp = (aio_result_t *)-1; 570 kaio_errno = EINVAL; 571 } else { 572 sig_mutex_unlock(&__aio_mutex); 573 kresultp = (aio_result_t *)_kaio(AIOWAIT, 574 wait, dontblock); 575 sig_mutex_lock(&__aio_mutex); 576 kaio_errno = errno; 577 } 578 _aiowait_flag--; 579 sig_mutex_unlock(&__aio_mutex); 580 if (kresultp == (aio_result_t *)1) { 581 /* aiowait() awakened by an aionotify() */ 582 continue; 583 } else if (kresultp != NULL && 584 kresultp != (aio_result_t *)-1) { 585 resultp = kresultp; 586 sig_mutex_lock(&__aio_mutex); 587 _kaio_outstand_cnt--; 588 sig_mutex_unlock(&__aio_mutex); 589 break; 590 } else if (kresultp == (aio_result_t *)-1 && 591 kaio_errno == EINVAL && 592 uresultp == (aio_result_t *)-1) { 593 errno = kaio_errno; 594 resultp = (aio_result_t *)-1; 595 break; 596 } else if (kresultp == (aio_result_t *)-1 && 597 kaio_errno == EINTR) { 598 errno = kaio_errno; 599 resultp = (aio_result_t *)-1; 600 break; 601 } else if (timedwait) { 602 hres = hrtend - gethrtime(); 603 if (hres <= 0) { 604 /* time is up; return */ 605 resultp = NULL; 606 break; 607 } else { 608 /* 609 * Some time left. Round up the remaining time 610 * in nanoseconds to microsec. Retry the call. 611 */ 612 hres += (NANOSEC / MICROSEC) - 1; 613 wait->tv_sec = hres / NANOSEC; 614 wait->tv_usec = 615 (hres % NANOSEC) / (NANOSEC / MICROSEC); 616 } 617 } else { 618 ASSERT(kresultp == NULL && uresultp == NULL); 619 resultp = NULL; 620 continue; 621 } 622 } 623 return (resultp); 624 } 625 626 /* 627 * _aio_get_timedelta calculates the remaining time and stores the result 628 * into timespec_t *wait. 629 */ 630 631 int 632 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 633 { 634 int ret = 0; 635 struct timeval cur; 636 timespec_t curtime; 637 638 (void) gettimeofday(&cur, NULL); 639 curtime.tv_sec = cur.tv_sec; 640 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 641 642 if (end->tv_sec >= curtime.tv_sec) { 643 wait->tv_sec = end->tv_sec - curtime.tv_sec; 644 if (end->tv_nsec >= curtime.tv_nsec) { 645 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 646 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 647 ret = -1; /* timer expired */ 648 } else { 649 if (end->tv_sec > curtime.tv_sec) { 650 wait->tv_sec -= 1; 651 wait->tv_nsec = NANOSEC - 652 (curtime.tv_nsec - end->tv_nsec); 653 } else { 654 ret = -1; /* timer expired */ 655 } 656 } 657 } else { 658 ret = -1; 659 } 660 return (ret); 661 } 662 663 /* 664 * If closing by file descriptor: we will simply cancel all the outstanding 665 * aio`s and return. Those aio's in question will have either noticed the 666 * cancellation notice before, during, or after initiating io. 667 */ 668 int 669 aiocancel_all(int fd) 670 { 671 aio_req_t *reqp; 672 aio_req_t **reqpp; 673 aio_worker_t *first; 674 aio_worker_t *next; 675 int canceled = 0; 676 int done = 0; 677 int cancelall = 0; 678 679 sig_mutex_lock(&__aio_mutex); 680 681 if (_aio_outstand_cnt == 0) { 682 sig_mutex_unlock(&__aio_mutex); 683 return (AIO_ALLDONE); 684 } 685 686 /* 687 * Cancel requests from the read/write workers' queues. 688 */ 689 first = __nextworker_rw; 690 next = first; 691 do { 692 _aio_cancel_work(next, fd, &canceled, &done); 693 } while ((next = next->work_forw) != first); 694 695 /* 696 * finally, check if there are requests on the done queue that 697 * should be canceled. 698 */ 699 if (fd < 0) 700 cancelall = 1; 701 reqpp = &_aio_done_tail; 702 while ((reqp = *reqpp) != NULL) { 703 if (cancelall || reqp->req_args.fd == fd) { 704 *reqpp = reqp->req_next; 705 _aio_donecnt--; 706 (void) _aio_hash_del(reqp->req_resultp); 707 _aio_req_free(reqp); 708 } else 709 reqpp = &reqp->req_next; 710 } 711 if (cancelall) { 712 ASSERT(_aio_donecnt == 0); 713 _aio_done_head = NULL; 714 } 715 sig_mutex_unlock(&__aio_mutex); 716 717 if (canceled && done == 0) 718 return (AIO_CANCELED); 719 else if (done && canceled == 0) 720 return (AIO_ALLDONE); 721 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 722 return ((int)_kaio(AIOCANCEL, fd, NULL)); 723 return (AIO_NOTCANCELED); 724 } 725 726 /* 727 * Cancel requests from a given work queue. If the file descriptor 728 * parameter, fd, is non-negative, then only cancel those requests 729 * in this queue that are to this file descriptor. If the fd 730 * parameter is -1, then cancel all requests. 731 */ 732 static void 733 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 734 { 735 aio_req_t *reqp; 736 737 sig_mutex_lock(&aiowp->work_qlock1); 738 /* 739 * cancel queued requests first. 740 */ 741 reqp = aiowp->work_tail1; 742 while (reqp != NULL) { 743 if (fd < 0 || reqp->req_args.fd == fd) { 744 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 745 /* 746 * Callers locks were dropped. 747 * reqp is invalid; start traversing 748 * the list from the beginning again. 749 */ 750 reqp = aiowp->work_tail1; 751 continue; 752 } 753 } 754 reqp = reqp->req_next; 755 } 756 /* 757 * Since the queued requests have been canceled, there can 758 * only be one inprogress request that should be canceled. 759 */ 760 if ((reqp = aiowp->work_req) != NULL && 761 (fd < 0 || reqp->req_args.fd == fd)) 762 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 763 sig_mutex_unlock(&aiowp->work_qlock1); 764 } 765 766 /* 767 * Cancel a request. Return 1 if the callers locks were temporarily 768 * dropped, otherwise return 0. 769 */ 770 int 771 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 772 { 773 int ostate = reqp->req_state; 774 775 ASSERT(MUTEX_HELD(&__aio_mutex)); 776 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 777 if (ostate == AIO_REQ_CANCELED) 778 return (0); 779 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 780 (*done)++; 781 return (0); 782 } 783 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 784 ASSERT(POSIX_AIO(reqp)); 785 /* Cancel the queued aio_fsync() request */ 786 if (!reqp->req_head->lio_canned) { 787 reqp->req_head->lio_canned = 1; 788 _aio_outstand_cnt--; 789 (*canceled)++; 790 } 791 return (0); 792 } 793 reqp->req_state = AIO_REQ_CANCELED; 794 _aio_req_del(aiowp, reqp, ostate); 795 (void) _aio_hash_del(reqp->req_resultp); 796 (*canceled)++; 797 if (reqp == aiowp->work_req) { 798 ASSERT(ostate == AIO_REQ_INPROGRESS); 799 /* 800 * Set the result values now, before _aiodone() is called. 801 * We do this because the application can expect aio_return 802 * and aio_errno to be set to -1 and ECANCELED, respectively, 803 * immediately after a successful return from aiocancel() 804 * or aio_cancel(). 805 */ 806 _aio_set_result(reqp, -1, ECANCELED); 807 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 808 return (0); 809 } 810 if (!POSIX_AIO(reqp)) { 811 _aio_outstand_cnt--; 812 _aio_set_result(reqp, -1, ECANCELED); 813 return (0); 814 } 815 sig_mutex_unlock(&aiowp->work_qlock1); 816 sig_mutex_unlock(&__aio_mutex); 817 _aiodone(reqp, -1, ECANCELED); 818 sig_mutex_lock(&__aio_mutex); 819 sig_mutex_lock(&aiowp->work_qlock1); 820 return (1); 821 } 822 823 int 824 _aio_create_worker(aio_req_t *reqp, int mode) 825 { 826 aio_worker_t *aiowp, **workers, **nextworker; 827 int *aio_workerscnt; 828 void *(*func)(void *); 829 sigset_t oset; 830 int error; 831 832 /* 833 * Put the new worker thread in the right queue. 834 */ 835 switch (mode) { 836 case AIOREAD: 837 case AIOWRITE: 838 case AIOAREAD: 839 case AIOAWRITE: 840 #if !defined(_LP64) 841 case AIOAREAD64: 842 case AIOAWRITE64: 843 #endif 844 workers = &__workers_rw; 845 nextworker = &__nextworker_rw; 846 aio_workerscnt = &__rw_workerscnt; 847 func = _aio_do_request; 848 break; 849 case AIONOTIFY: 850 workers = &__workers_no; 851 nextworker = &__nextworker_no; 852 func = _aio_do_notify; 853 aio_workerscnt = &__no_workerscnt; 854 break; 855 default: 856 aio_panic("_aio_create_worker: invalid mode"); 857 break; 858 } 859 860 if ((aiowp = _aio_worker_alloc()) == NULL) 861 return (-1); 862 863 if (reqp) { 864 reqp->req_state = AIO_REQ_QUEUED; 865 reqp->req_worker = aiowp; 866 aiowp->work_head1 = reqp; 867 aiowp->work_tail1 = reqp; 868 aiowp->work_next1 = reqp; 869 aiowp->work_count1 = 1; 870 aiowp->work_minload1 = 1; 871 } 872 873 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 874 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 875 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 876 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 877 if (error) { 878 if (reqp) { 879 reqp->req_state = 0; 880 reqp->req_worker = NULL; 881 } 882 _aio_worker_free(aiowp); 883 return (-1); 884 } 885 886 lmutex_lock(&__aio_mutex); 887 (*aio_workerscnt)++; 888 if (*workers == NULL) { 889 aiowp->work_forw = aiowp; 890 aiowp->work_backw = aiowp; 891 *nextworker = aiowp; 892 *workers = aiowp; 893 } else { 894 aiowp->work_backw = (*workers)->work_backw; 895 aiowp->work_forw = (*workers); 896 (*workers)->work_backw->work_forw = aiowp; 897 (*workers)->work_backw = aiowp; 898 } 899 _aio_worker_cnt++; 900 lmutex_unlock(&__aio_mutex); 901 902 (void) thr_continue(aiowp->work_tid); 903 904 return (0); 905 } 906 907 /* 908 * This is the worker's main routine. 909 * The task of this function is to execute all queued requests; 910 * once the last pending request is executed this function will block 911 * in _aio_idle(). A new incoming request must wakeup this thread to 912 * restart the work. 913 * Every worker has an own work queue. The queue lock is required 914 * to synchronize the addition of new requests for this worker or 915 * cancellation of pending/running requests. 916 * 917 * Cancellation scenarios: 918 * The cancellation of a request is being done asynchronously using 919 * _aio_cancel_req() from another thread context. 920 * A queued request can be cancelled in different manners : 921 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 922 * - lock the queue -> remove the request -> unlock the queue 923 * - this function/thread does not detect this cancellation process 924 * b) request is in progress (AIO_REQ_INPROGRESS) : 925 * - this function first allow the cancellation of the running 926 * request with the flag "work_cancel_flg=1" 927 * see _aio_req_get() -> _aio_cancel_on() 928 * During this phase, it is allowed to interrupt the worker 929 * thread running the request (this thread) using the SIGAIOCANCEL 930 * signal. 931 * Once this thread returns from the kernel (because the request 932 * is just done), then it must disable a possible cancellation 933 * and proceed to finish the request. To disable the cancellation 934 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 935 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 936 * same procedure as in a) 937 * 938 * To b) 939 * This thread uses sigsetjmp() to define the position in the code, where 940 * it wish to continue working in the case that a SIGAIOCANCEL signal 941 * is detected. 942 * Normally this thread should get the cancellation signal during the 943 * kernel phase (reading or writing). In that case the signal handler 944 * aiosigcancelhndlr() is activated using the worker thread context, 945 * which again will use the siglongjmp() function to break the standard 946 * code flow and jump to the "sigsetjmp" position, provided that 947 * "work_cancel_flg" is set to "1". 948 * Because the "work_cancel_flg" is only manipulated by this worker 949 * thread and it can only run on one CPU at a given time, it is not 950 * necessary to protect that flag with the queue lock. 951 * Returning from the kernel (read or write system call) we must 952 * first disable the use of the SIGAIOCANCEL signal and accordingly 953 * the use of the siglongjmp() function to prevent a possible deadlock: 954 * - It can happens that this worker thread returns from the kernel and 955 * blocks in "work_qlock1", 956 * - then a second thread cancels the apparently "in progress" request 957 * and sends the SIGAIOCANCEL signal to the worker thread, 958 * - the worker thread gets assigned the "work_qlock1" and will returns 959 * from the kernel, 960 * - the kernel detects the pending signal and activates the signal 961 * handler instead, 962 * - if the "work_cancel_flg" is still set then the signal handler 963 * should use siglongjmp() to cancel the "in progress" request and 964 * it would try to acquire the same work_qlock1 in _aio_req_get() 965 * for a second time => deadlock. 966 * To avoid that situation we disable the cancellation of the request 967 * in progress BEFORE we try to acquire the work_qlock1. 968 * In that case the signal handler will not call siglongjmp() and the 969 * worker thread will continue running the standard code flow. 970 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 971 * an eventually required siglongjmp() freeing the work_qlock1 and 972 * avoiding a deadlock. 973 */ 974 void * 975 _aio_do_request(void *arglist) 976 { 977 aio_worker_t *aiowp = (aio_worker_t *)arglist; 978 ulwp_t *self = curthread; 979 struct aio_args *arg; 980 aio_req_t *reqp; /* current AIO request */ 981 ssize_t retval; 982 int error; 983 984 if (pthread_setspecific(_aio_key, aiowp) != 0) 985 aio_panic("_aio_do_request, pthread_setspecific()"); 986 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 987 ASSERT(aiowp->work_req == NULL); 988 989 /* 990 * We resume here when an operation is cancelled. 991 * On first entry, aiowp->work_req == NULL, so all 992 * we do is block SIGAIOCANCEL. 993 */ 994 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 995 ASSERT(self->ul_sigdefer == 0); 996 997 sigoff(self); /* block SIGAIOCANCEL */ 998 if (aiowp->work_req != NULL) 999 _aio_finish_request(aiowp, -1, ECANCELED); 1000 1001 for (;;) { 1002 /* 1003 * Put completed requests on aio_done_list. This has 1004 * to be done as part of the main loop to ensure that 1005 * we don't artificially starve any aiowait'ers. 1006 */ 1007 if (aiowp->work_done1) 1008 _aio_work_done(aiowp); 1009 1010 top: 1011 /* consume any deferred SIGAIOCANCEL signal here */ 1012 sigon(self); 1013 sigoff(self); 1014 1015 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1016 if (_aio_idle(aiowp) != 0) 1017 goto top; 1018 } 1019 arg = &reqp->req_args; 1020 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1021 reqp->req_state == AIO_REQ_CANCELED); 1022 error = 0; 1023 1024 switch (reqp->req_op) { 1025 case AIOREAD: 1026 case AIOAREAD: 1027 sigon(self); /* unblock SIGAIOCANCEL */ 1028 retval = pread(arg->fd, arg->buf, 1029 arg->bufsz, arg->offset); 1030 if (retval == -1) { 1031 if (errno == ESPIPE) { 1032 retval = read(arg->fd, 1033 arg->buf, arg->bufsz); 1034 if (retval == -1) 1035 error = errno; 1036 } else { 1037 error = errno; 1038 } 1039 } 1040 sigoff(self); /* block SIGAIOCANCEL */ 1041 break; 1042 case AIOWRITE: 1043 case AIOAWRITE: 1044 sigon(self); /* unblock SIGAIOCANCEL */ 1045 retval = pwrite(arg->fd, arg->buf, 1046 arg->bufsz, arg->offset); 1047 if (retval == -1) { 1048 if (errno == ESPIPE) { 1049 retval = write(arg->fd, 1050 arg->buf, arg->bufsz); 1051 if (retval == -1) 1052 error = errno; 1053 } else { 1054 error = errno; 1055 } 1056 } 1057 sigoff(self); /* block SIGAIOCANCEL */ 1058 break; 1059 #if !defined(_LP64) 1060 case AIOAREAD64: 1061 sigon(self); /* unblock SIGAIOCANCEL */ 1062 retval = pread64(arg->fd, arg->buf, 1063 arg->bufsz, arg->offset); 1064 if (retval == -1) { 1065 if (errno == ESPIPE) { 1066 retval = read(arg->fd, 1067 arg->buf, arg->bufsz); 1068 if (retval == -1) 1069 error = errno; 1070 } else { 1071 error = errno; 1072 } 1073 } 1074 sigoff(self); /* block SIGAIOCANCEL */ 1075 break; 1076 case AIOAWRITE64: 1077 sigon(self); /* unblock SIGAIOCANCEL */ 1078 retval = pwrite64(arg->fd, arg->buf, 1079 arg->bufsz, arg->offset); 1080 if (retval == -1) { 1081 if (errno == ESPIPE) { 1082 retval = write(arg->fd, 1083 arg->buf, arg->bufsz); 1084 if (retval == -1) 1085 error = errno; 1086 } else { 1087 error = errno; 1088 } 1089 } 1090 sigoff(self); /* block SIGAIOCANCEL */ 1091 break; 1092 #endif /* !defined(_LP64) */ 1093 case AIOFSYNC: 1094 if (_aio_fsync_del(aiowp, reqp)) 1095 goto top; 1096 ASSERT(reqp->req_head == NULL); 1097 /* 1098 * All writes for this fsync request are now 1099 * acknowledged. Now make these writes visible 1100 * and put the final request into the hash table. 1101 */ 1102 if (reqp->req_state == AIO_REQ_CANCELED) { 1103 /* EMPTY */; 1104 } else if (arg->offset == O_SYNC) { 1105 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1106 error = errno; 1107 } else { 1108 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1109 error = errno; 1110 } 1111 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1112 aio_panic("_aio_do_request(): AIOFSYNC: " 1113 "request already in hash table"); 1114 break; 1115 default: 1116 aio_panic("_aio_do_request, bad op"); 1117 } 1118 1119 _aio_finish_request(aiowp, retval, error); 1120 } 1121 /* NOTREACHED */ 1122 return (NULL); 1123 } 1124 1125 /* 1126 * Perform the tail processing for _aio_do_request(). 1127 * The in-progress request may or may not have been cancelled. 1128 */ 1129 static void 1130 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1131 { 1132 aio_req_t *reqp; 1133 1134 sig_mutex_lock(&aiowp->work_qlock1); 1135 if ((reqp = aiowp->work_req) == NULL) 1136 sig_mutex_unlock(&aiowp->work_qlock1); 1137 else { 1138 aiowp->work_req = NULL; 1139 if (reqp->req_state == AIO_REQ_CANCELED) { 1140 retval = -1; 1141 error = ECANCELED; 1142 } 1143 if (!POSIX_AIO(reqp)) { 1144 int notify; 1145 sig_mutex_unlock(&aiowp->work_qlock1); 1146 sig_mutex_lock(&__aio_mutex); 1147 if (reqp->req_state == AIO_REQ_INPROGRESS) 1148 reqp->req_state = AIO_REQ_DONE; 1149 /* 1150 * If it was canceled, this request will not be 1151 * added to done list. Just free it. 1152 */ 1153 if (error == ECANCELED) { 1154 _aio_outstand_cnt--; 1155 _aio_req_free(reqp); 1156 } else { 1157 _aio_set_result(reqp, retval, error); 1158 _aio_req_done_cnt++; 1159 } 1160 /* 1161 * Notify any thread that may have blocked 1162 * because it saw an outstanding request. 1163 */ 1164 notify = 0; 1165 if (_aio_outstand_cnt == 0 && _aiowait_flag) { 1166 notify = 1; 1167 } 1168 sig_mutex_unlock(&__aio_mutex); 1169 if (notify) { 1170 (void) _kaio(AIONOTIFY); 1171 } 1172 } else { 1173 if (reqp->req_state == AIO_REQ_INPROGRESS) 1174 reqp->req_state = AIO_REQ_DONE; 1175 sig_mutex_unlock(&aiowp->work_qlock1); 1176 _aiodone(reqp, retval, error); 1177 } 1178 } 1179 } 1180 1181 void 1182 _aio_req_mark_done(aio_req_t *reqp) 1183 { 1184 #if !defined(_LP64) 1185 if (reqp->req_largefile) 1186 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1187 else 1188 #endif 1189 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1190 } 1191 1192 /* 1193 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1194 * hopefully to consume one of our queued signals. 1195 */ 1196 static void 1197 _aio_delay(int ticks) 1198 { 1199 (void) usleep(ticks * (MICROSEC / hz)); 1200 } 1201 1202 /* 1203 * Actually send the notifications. 1204 * We could block indefinitely here if the application 1205 * is not listening for the signal or port notifications. 1206 */ 1207 static void 1208 send_notification(notif_param_t *npp) 1209 { 1210 extern int __sigqueue(pid_t pid, int signo, 1211 /* const union sigval */ void *value, int si_code, int block); 1212 1213 if (npp->np_signo) 1214 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1215 SI_ASYNCIO, 1); 1216 else if (npp->np_port >= 0) 1217 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1218 npp->np_event, npp->np_object, npp->np_user); 1219 1220 if (npp->np_lio_signo) 1221 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1222 SI_ASYNCIO, 1); 1223 else if (npp->np_lio_port >= 0) 1224 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1225 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1226 } 1227 1228 /* 1229 * Asynchronous notification worker. 1230 */ 1231 void * 1232 _aio_do_notify(void *arg) 1233 { 1234 aio_worker_t *aiowp = (aio_worker_t *)arg; 1235 aio_req_t *reqp; 1236 1237 /* 1238 * This isn't really necessary. All signals are blocked. 1239 */ 1240 if (pthread_setspecific(_aio_key, aiowp) != 0) 1241 aio_panic("_aio_do_notify, pthread_setspecific()"); 1242 1243 /* 1244 * Notifications are never cancelled. 1245 * All signals remain blocked, forever. 1246 */ 1247 for (;;) { 1248 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1249 if (_aio_idle(aiowp) != 0) 1250 aio_panic("_aio_do_notify: _aio_idle() failed"); 1251 } 1252 send_notification(&reqp->req_notify); 1253 _aio_req_free(reqp); 1254 } 1255 1256 /* NOTREACHED */ 1257 return (NULL); 1258 } 1259 1260 /* 1261 * Do the completion semantics for a request that was either canceled 1262 * by _aio_cancel_req() or was completed by _aio_do_request(). 1263 */ 1264 static void 1265 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1266 { 1267 aio_result_t *resultp = reqp->req_resultp; 1268 int notify = 0; 1269 aio_lio_t *head; 1270 int sigev_none; 1271 int sigev_signal; 1272 int sigev_thread; 1273 int sigev_port; 1274 notif_param_t np; 1275 1276 /* 1277 * We call _aiodone() only for Posix I/O. 1278 */ 1279 ASSERT(POSIX_AIO(reqp)); 1280 1281 sigev_none = 0; 1282 sigev_signal = 0; 1283 sigev_thread = 0; 1284 sigev_port = 0; 1285 np.np_signo = 0; 1286 np.np_port = -1; 1287 np.np_lio_signo = 0; 1288 np.np_lio_port = -1; 1289 1290 switch (reqp->req_sigevent.sigev_notify) { 1291 case SIGEV_NONE: 1292 sigev_none = 1; 1293 break; 1294 case SIGEV_SIGNAL: 1295 sigev_signal = 1; 1296 break; 1297 case SIGEV_THREAD: 1298 sigev_thread = 1; 1299 break; 1300 case SIGEV_PORT: 1301 sigev_port = 1; 1302 break; 1303 default: 1304 aio_panic("_aiodone: improper sigev_notify"); 1305 break; 1306 } 1307 1308 /* 1309 * Figure out the notification parameters while holding __aio_mutex. 1310 * Actually perform the notifications after dropping __aio_mutex. 1311 * This allows us to sleep for a long time (if the notifications 1312 * incur delays) without impeding other async I/O operations. 1313 */ 1314 1315 sig_mutex_lock(&__aio_mutex); 1316 1317 if (sigev_signal) { 1318 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1319 notify = 1; 1320 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1321 } else if (sigev_thread | sigev_port) { 1322 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1323 notify = 1; 1324 np.np_event = reqp->req_op; 1325 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1326 np.np_event = AIOFSYNC64; 1327 np.np_object = (uintptr_t)reqp->req_aiocbp; 1328 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1329 } 1330 1331 if (resultp->aio_errno == EINPROGRESS) 1332 _aio_set_result(reqp, retval, error); 1333 1334 _aio_outstand_cnt--; 1335 1336 head = reqp->req_head; 1337 reqp->req_head = NULL; 1338 1339 if (sigev_none) { 1340 _aio_enq_doneq(reqp); 1341 reqp = NULL; 1342 } else { 1343 (void) _aio_hash_del(resultp); 1344 _aio_req_mark_done(reqp); 1345 } 1346 1347 _aio_waitn_wakeup(); 1348 1349 /* 1350 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1351 * __aio_suspend() increments "_aio_kernel_suspend" 1352 * when they are waiting in the kernel for completed I/Os. 1353 * 1354 * _kaio(AIONOTIFY) awakes the corresponding function 1355 * in the kernel; then the corresponding __aio_waitn() or 1356 * __aio_suspend() function could reap the recently 1357 * completed I/Os (_aiodone()). 1358 */ 1359 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1360 (void) _kaio(AIONOTIFY); 1361 1362 sig_mutex_unlock(&__aio_mutex); 1363 1364 if (head != NULL) { 1365 /* 1366 * If all the lio requests have completed, 1367 * prepare to notify the waiting thread. 1368 */ 1369 sig_mutex_lock(&head->lio_mutex); 1370 ASSERT(head->lio_refcnt == head->lio_nent); 1371 if (head->lio_refcnt == 1) { 1372 int waiting = 0; 1373 if (head->lio_mode == LIO_WAIT) { 1374 if ((waiting = head->lio_waiting) != 0) 1375 (void) cond_signal(&head->lio_cond_cv); 1376 } else if (head->lio_port < 0) { /* none or signal */ 1377 if ((np.np_lio_signo = head->lio_signo) != 0) 1378 notify = 1; 1379 np.np_lio_user = head->lio_sigval.sival_ptr; 1380 } else { /* thread or port */ 1381 notify = 1; 1382 np.np_lio_port = head->lio_port; 1383 np.np_lio_event = head->lio_event; 1384 np.np_lio_object = 1385 (uintptr_t)head->lio_sigevent; 1386 np.np_lio_user = head->lio_sigval.sival_ptr; 1387 } 1388 head->lio_nent = head->lio_refcnt = 0; 1389 sig_mutex_unlock(&head->lio_mutex); 1390 if (waiting == 0) 1391 _aio_lio_free(head); 1392 } else { 1393 head->lio_nent--; 1394 head->lio_refcnt--; 1395 sig_mutex_unlock(&head->lio_mutex); 1396 } 1397 } 1398 1399 /* 1400 * The request is completed; now perform the notifications. 1401 */ 1402 if (notify) { 1403 if (reqp != NULL) { 1404 /* 1405 * We usually put the request on the notification 1406 * queue because we don't want to block and delay 1407 * other operations behind us in the work queue. 1408 * Also we must never block on a cancel notification 1409 * because we are being called from an application 1410 * thread in this case and that could lead to deadlock 1411 * if no other thread is receiving notificatins. 1412 */ 1413 reqp->req_notify = np; 1414 reqp->req_op = AIONOTIFY; 1415 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1416 reqp = NULL; 1417 } else { 1418 /* 1419 * We already put the request on the done queue, 1420 * so we can't queue it to the notification queue. 1421 * Just do the notification directly. 1422 */ 1423 send_notification(&np); 1424 } 1425 } 1426 1427 if (reqp != NULL) 1428 _aio_req_free(reqp); 1429 } 1430 1431 /* 1432 * Delete fsync requests from list head until there is 1433 * only one left. Return 0 when there is only one, 1434 * otherwise return a non-zero value. 1435 */ 1436 static int 1437 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1438 { 1439 aio_lio_t *head = reqp->req_head; 1440 int rval = 0; 1441 1442 ASSERT(reqp == aiowp->work_req); 1443 sig_mutex_lock(&aiowp->work_qlock1); 1444 sig_mutex_lock(&head->lio_mutex); 1445 if (head->lio_refcnt > 1) { 1446 head->lio_refcnt--; 1447 head->lio_nent--; 1448 aiowp->work_req = NULL; 1449 sig_mutex_unlock(&head->lio_mutex); 1450 sig_mutex_unlock(&aiowp->work_qlock1); 1451 sig_mutex_lock(&__aio_mutex); 1452 _aio_outstand_cnt--; 1453 _aio_waitn_wakeup(); 1454 sig_mutex_unlock(&__aio_mutex); 1455 _aio_req_free(reqp); 1456 return (1); 1457 } 1458 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1459 reqp->req_head = NULL; 1460 if (head->lio_canned) 1461 reqp->req_state = AIO_REQ_CANCELED; 1462 if (head->lio_mode == LIO_DESTROY) { 1463 aiowp->work_req = NULL; 1464 rval = 1; 1465 } 1466 sig_mutex_unlock(&head->lio_mutex); 1467 sig_mutex_unlock(&aiowp->work_qlock1); 1468 head->lio_refcnt--; 1469 head->lio_nent--; 1470 _aio_lio_free(head); 1471 if (rval != 0) 1472 _aio_req_free(reqp); 1473 return (rval); 1474 } 1475 1476 /* 1477 * A worker is set idle when its work queue is empty. 1478 * The worker checks again that it has no more work 1479 * and then goes to sleep waiting for more work. 1480 */ 1481 int 1482 _aio_idle(aio_worker_t *aiowp) 1483 { 1484 int error = 0; 1485 1486 sig_mutex_lock(&aiowp->work_qlock1); 1487 if (aiowp->work_count1 == 0) { 1488 ASSERT(aiowp->work_minload1 == 0); 1489 aiowp->work_idleflg = 1; 1490 /* 1491 * A cancellation handler is not needed here. 1492 * aio worker threads are never cancelled via pthread_cancel(). 1493 */ 1494 error = sig_cond_wait(&aiowp->work_idle_cv, 1495 &aiowp->work_qlock1); 1496 /* 1497 * The idle flag is normally cleared before worker is awakened 1498 * by aio_req_add(). On error (EINTR), we clear it ourself. 1499 */ 1500 if (error) 1501 aiowp->work_idleflg = 0; 1502 } 1503 sig_mutex_unlock(&aiowp->work_qlock1); 1504 return (error); 1505 } 1506 1507 /* 1508 * A worker's completed AIO requests are placed onto a global 1509 * done queue. The application is only sent a SIGIO signal if 1510 * the process has a handler enabled and it is not waiting via 1511 * aiowait(). 1512 */ 1513 static void 1514 _aio_work_done(aio_worker_t *aiowp) 1515 { 1516 aio_req_t *reqp; 1517 1518 sig_mutex_lock(&aiowp->work_qlock1); 1519 reqp = aiowp->work_prev1; 1520 reqp->req_next = NULL; 1521 aiowp->work_done1 = 0; 1522 aiowp->work_tail1 = aiowp->work_next1; 1523 if (aiowp->work_tail1 == NULL) 1524 aiowp->work_head1 = NULL; 1525 aiowp->work_prev1 = NULL; 1526 sig_mutex_unlock(&aiowp->work_qlock1); 1527 sig_mutex_lock(&__aio_mutex); 1528 _aio_donecnt++; 1529 _aio_outstand_cnt--; 1530 _aio_req_done_cnt--; 1531 ASSERT(_aio_donecnt > 0 && 1532 _aio_outstand_cnt >= 0 && 1533 _aio_req_done_cnt >= 0); 1534 ASSERT(reqp != NULL); 1535 1536 if (_aio_done_tail == NULL) { 1537 _aio_done_head = _aio_done_tail = reqp; 1538 } else { 1539 _aio_done_head->req_next = reqp; 1540 _aio_done_head = reqp; 1541 } 1542 1543 if (_aiowait_flag) { 1544 sig_mutex_unlock(&__aio_mutex); 1545 (void) _kaio(AIONOTIFY); 1546 } else { 1547 sig_mutex_unlock(&__aio_mutex); 1548 if (_sigio_enabled) 1549 (void) kill(__pid, SIGIO); 1550 } 1551 } 1552 1553 /* 1554 * The done queue consists of AIO requests that are in either the 1555 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1556 * are discarded. If the done queue is empty then NULL is returned. 1557 * Otherwise the address of a done aio_result_t is returned. 1558 */ 1559 aio_result_t * 1560 _aio_req_done(void) 1561 { 1562 aio_req_t *reqp; 1563 aio_result_t *resultp; 1564 1565 ASSERT(MUTEX_HELD(&__aio_mutex)); 1566 1567 if ((reqp = _aio_done_tail) != NULL) { 1568 if ((_aio_done_tail = reqp->req_next) == NULL) 1569 _aio_done_head = NULL; 1570 ASSERT(_aio_donecnt > 0); 1571 _aio_donecnt--; 1572 (void) _aio_hash_del(reqp->req_resultp); 1573 resultp = reqp->req_resultp; 1574 ASSERT(reqp->req_state == AIO_REQ_DONE); 1575 _aio_req_free(reqp); 1576 return (resultp); 1577 } 1578 /* is queue empty? */ 1579 if (reqp == NULL && _aio_outstand_cnt == 0) { 1580 return ((aio_result_t *)-1); 1581 } 1582 return (NULL); 1583 } 1584 1585 /* 1586 * Set the return and errno values for the application's use. 1587 * 1588 * For the Posix interfaces, we must set the return value first followed 1589 * by the errno value because the Posix interfaces allow for a change 1590 * in the errno value from EINPROGRESS to something else to signal 1591 * the completion of the asynchronous request. 1592 * 1593 * The opposite is true for the Solaris interfaces. These allow for 1594 * a change in the return value from AIO_INPROGRESS to something else 1595 * to signal the completion of the asynchronous request. 1596 */ 1597 void 1598 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1599 { 1600 aio_result_t *resultp = reqp->req_resultp; 1601 1602 if (POSIX_AIO(reqp)) { 1603 resultp->aio_return = retval; 1604 membar_producer(); 1605 resultp->aio_errno = error; 1606 } else { 1607 resultp->aio_errno = error; 1608 membar_producer(); 1609 resultp->aio_return = retval; 1610 } 1611 } 1612 1613 /* 1614 * Add an AIO request onto the next work queue. 1615 * A circular list of workers is used to choose the next worker. 1616 */ 1617 void 1618 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1619 { 1620 ulwp_t *self = curthread; 1621 aio_worker_t *aiowp; 1622 aio_worker_t *first; 1623 int load_bal_flg = 1; 1624 int found; 1625 1626 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1627 reqp->req_next = NULL; 1628 /* 1629 * Try to acquire the next worker's work queue. If it is locked, 1630 * then search the list of workers until a queue is found unlocked, 1631 * or until the list is completely traversed at which point another 1632 * worker will be created. 1633 */ 1634 sigoff(self); /* defer SIGIO */ 1635 sig_mutex_lock(&__aio_mutex); 1636 first = aiowp = *nextworker; 1637 if (mode != AIONOTIFY) 1638 _aio_outstand_cnt++; 1639 sig_mutex_unlock(&__aio_mutex); 1640 1641 switch (mode) { 1642 case AIOREAD: 1643 case AIOWRITE: 1644 case AIOAREAD: 1645 case AIOAWRITE: 1646 #if !defined(_LP64) 1647 case AIOAREAD64: 1648 case AIOAWRITE64: 1649 #endif 1650 /* try to find an idle worker */ 1651 found = 0; 1652 do { 1653 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1654 if (aiowp->work_idleflg) { 1655 found = 1; 1656 break; 1657 } 1658 sig_mutex_unlock(&aiowp->work_qlock1); 1659 } 1660 } while ((aiowp = aiowp->work_forw) != first); 1661 1662 if (found) { 1663 aiowp->work_minload1++; 1664 break; 1665 } 1666 1667 /* try to acquire some worker's queue lock */ 1668 do { 1669 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1670 found = 1; 1671 break; 1672 } 1673 } while ((aiowp = aiowp->work_forw) != first); 1674 1675 /* 1676 * Create more workers when the workers appear overloaded. 1677 * Either all the workers are busy draining their queues 1678 * or no worker's queue lock could be acquired. 1679 */ 1680 if (!found) { 1681 if (_aio_worker_cnt < _max_workers) { 1682 if (_aio_create_worker(reqp, mode)) 1683 aio_panic("_aio_req_add: add worker"); 1684 sigon(self); /* reenable SIGIO */ 1685 return; 1686 } 1687 1688 /* 1689 * No worker available and we have created 1690 * _max_workers, keep going through the 1691 * list slowly until we get a lock 1692 */ 1693 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1694 /* 1695 * give someone else a chance 1696 */ 1697 _aio_delay(1); 1698 aiowp = aiowp->work_forw; 1699 } 1700 } 1701 1702 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1703 if (_aio_worker_cnt < _max_workers && 1704 aiowp->work_minload1 >= _minworkload) { 1705 sig_mutex_unlock(&aiowp->work_qlock1); 1706 sig_mutex_lock(&__aio_mutex); 1707 *nextworker = aiowp->work_forw; 1708 sig_mutex_unlock(&__aio_mutex); 1709 if (_aio_create_worker(reqp, mode)) 1710 aio_panic("aio_req_add: add worker"); 1711 sigon(self); /* reenable SIGIO */ 1712 return; 1713 } 1714 aiowp->work_minload1++; 1715 break; 1716 case AIOFSYNC: 1717 case AIONOTIFY: 1718 load_bal_flg = 0; 1719 sig_mutex_lock(&aiowp->work_qlock1); 1720 break; 1721 default: 1722 aio_panic("_aio_req_add: invalid mode"); 1723 break; 1724 } 1725 /* 1726 * Put request onto worker's work queue. 1727 */ 1728 if (aiowp->work_tail1 == NULL) { 1729 ASSERT(aiowp->work_count1 == 0); 1730 aiowp->work_tail1 = reqp; 1731 aiowp->work_next1 = reqp; 1732 } else { 1733 aiowp->work_head1->req_next = reqp; 1734 if (aiowp->work_next1 == NULL) 1735 aiowp->work_next1 = reqp; 1736 } 1737 reqp->req_state = AIO_REQ_QUEUED; 1738 reqp->req_worker = aiowp; 1739 aiowp->work_head1 = reqp; 1740 /* 1741 * Awaken worker if it is not currently active. 1742 */ 1743 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1744 aiowp->work_idleflg = 0; 1745 (void) cond_signal(&aiowp->work_idle_cv); 1746 } 1747 sig_mutex_unlock(&aiowp->work_qlock1); 1748 1749 if (load_bal_flg) { 1750 sig_mutex_lock(&__aio_mutex); 1751 *nextworker = aiowp->work_forw; 1752 sig_mutex_unlock(&__aio_mutex); 1753 } 1754 sigon(self); /* reenable SIGIO */ 1755 } 1756 1757 /* 1758 * Get an AIO request for a specified worker. 1759 * If the work queue is empty, return NULL. 1760 */ 1761 aio_req_t * 1762 _aio_req_get(aio_worker_t *aiowp) 1763 { 1764 aio_req_t *reqp; 1765 1766 sig_mutex_lock(&aiowp->work_qlock1); 1767 if ((reqp = aiowp->work_next1) != NULL) { 1768 /* 1769 * Remove a POSIX request from the queue; the 1770 * request queue is a singularly linked list 1771 * with a previous pointer. The request is 1772 * removed by updating the previous pointer. 1773 * 1774 * Non-posix requests are left on the queue 1775 * to eventually be placed on the done queue. 1776 */ 1777 1778 if (POSIX_AIO(reqp)) { 1779 if (aiowp->work_prev1 == NULL) { 1780 aiowp->work_tail1 = reqp->req_next; 1781 if (aiowp->work_tail1 == NULL) 1782 aiowp->work_head1 = NULL; 1783 } else { 1784 aiowp->work_prev1->req_next = reqp->req_next; 1785 if (aiowp->work_head1 == reqp) 1786 aiowp->work_head1 = reqp->req_next; 1787 } 1788 1789 } else { 1790 aiowp->work_prev1 = reqp; 1791 ASSERT(aiowp->work_done1 >= 0); 1792 aiowp->work_done1++; 1793 } 1794 ASSERT(reqp != reqp->req_next); 1795 aiowp->work_next1 = reqp->req_next; 1796 ASSERT(aiowp->work_count1 >= 1); 1797 aiowp->work_count1--; 1798 switch (reqp->req_op) { 1799 case AIOREAD: 1800 case AIOWRITE: 1801 case AIOAREAD: 1802 case AIOAWRITE: 1803 #if !defined(_LP64) 1804 case AIOAREAD64: 1805 case AIOAWRITE64: 1806 #endif 1807 ASSERT(aiowp->work_minload1 > 0); 1808 aiowp->work_minload1--; 1809 break; 1810 } 1811 reqp->req_state = AIO_REQ_INPROGRESS; 1812 } 1813 aiowp->work_req = reqp; 1814 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1815 sig_mutex_unlock(&aiowp->work_qlock1); 1816 return (reqp); 1817 } 1818 1819 static void 1820 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1821 { 1822 aio_req_t **last; 1823 aio_req_t *lastrp; 1824 aio_req_t *next; 1825 1826 ASSERT(aiowp != NULL); 1827 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1828 if (POSIX_AIO(reqp)) { 1829 if (ostate != AIO_REQ_QUEUED) 1830 return; 1831 } 1832 last = &aiowp->work_tail1; 1833 lastrp = aiowp->work_tail1; 1834 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1835 while ((next = *last) != NULL) { 1836 if (next == reqp) { 1837 *last = next->req_next; 1838 if (aiowp->work_next1 == next) 1839 aiowp->work_next1 = next->req_next; 1840 1841 if ((next->req_next != NULL) || 1842 (aiowp->work_done1 == 0)) { 1843 if (aiowp->work_head1 == next) 1844 aiowp->work_head1 = next->req_next; 1845 if (aiowp->work_prev1 == next) 1846 aiowp->work_prev1 = next->req_next; 1847 } else { 1848 if (aiowp->work_head1 == next) 1849 aiowp->work_head1 = lastrp; 1850 if (aiowp->work_prev1 == next) 1851 aiowp->work_prev1 = lastrp; 1852 } 1853 1854 if (ostate == AIO_REQ_QUEUED) { 1855 ASSERT(aiowp->work_count1 >= 1); 1856 aiowp->work_count1--; 1857 ASSERT(aiowp->work_minload1 >= 1); 1858 aiowp->work_minload1--; 1859 } else { 1860 ASSERT(ostate == AIO_REQ_INPROGRESS && 1861 !POSIX_AIO(reqp)); 1862 aiowp->work_done1--; 1863 } 1864 return; 1865 } 1866 last = &next->req_next; 1867 lastrp = next; 1868 } 1869 /* NOTREACHED */ 1870 } 1871 1872 static void 1873 _aio_enq_doneq(aio_req_t *reqp) 1874 { 1875 if (_aio_doneq == NULL) { 1876 _aio_doneq = reqp; 1877 reqp->req_next = reqp->req_prev = reqp; 1878 } else { 1879 reqp->req_next = _aio_doneq; 1880 reqp->req_prev = _aio_doneq->req_prev; 1881 _aio_doneq->req_prev->req_next = reqp; 1882 _aio_doneq->req_prev = reqp; 1883 } 1884 reqp->req_state = AIO_REQ_DONEQ; 1885 _aio_doneq_cnt++; 1886 } 1887 1888 /* 1889 * caller owns the _aio_mutex 1890 */ 1891 aio_req_t * 1892 _aio_req_remove(aio_req_t *reqp) 1893 { 1894 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1895 return (NULL); 1896 1897 if (reqp) { 1898 /* request in done queue */ 1899 if (_aio_doneq == reqp) 1900 _aio_doneq = reqp->req_next; 1901 if (_aio_doneq == reqp) { 1902 /* only one request on queue */ 1903 _aio_doneq = NULL; 1904 } else { 1905 aio_req_t *tmp = reqp->req_next; 1906 reqp->req_prev->req_next = tmp; 1907 tmp->req_prev = reqp->req_prev; 1908 } 1909 } else if ((reqp = _aio_doneq) != NULL) { 1910 if (reqp == reqp->req_next) { 1911 /* only one request on queue */ 1912 _aio_doneq = NULL; 1913 } else { 1914 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1915 _aio_doneq->req_prev = reqp->req_prev; 1916 } 1917 } 1918 if (reqp) { 1919 _aio_doneq_cnt--; 1920 reqp->req_next = reqp->req_prev = reqp; 1921 reqp->req_state = AIO_REQ_DONE; 1922 } 1923 return (reqp); 1924 } 1925 1926 /* 1927 * An AIO request is identified by an aio_result_t pointer. The library 1928 * maps this aio_result_t pointer to its internal representation using a 1929 * hash table. This function adds an aio_result_t pointer to the hash table. 1930 */ 1931 static int 1932 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1933 { 1934 aio_hash_t *hashp; 1935 aio_req_t **prev; 1936 aio_req_t *next; 1937 1938 hashp = _aio_hash + AIOHASH(resultp); 1939 lmutex_lock(&hashp->hash_lock); 1940 prev = &hashp->hash_ptr; 1941 while ((next = *prev) != NULL) { 1942 if (resultp == next->req_resultp) { 1943 lmutex_unlock(&hashp->hash_lock); 1944 return (-1); 1945 } 1946 prev = &next->req_link; 1947 } 1948 *prev = reqp; 1949 ASSERT(reqp->req_link == NULL); 1950 lmutex_unlock(&hashp->hash_lock); 1951 return (0); 1952 } 1953 1954 /* 1955 * Remove an entry from the hash table. 1956 */ 1957 aio_req_t * 1958 _aio_hash_del(aio_result_t *resultp) 1959 { 1960 aio_hash_t *hashp; 1961 aio_req_t **prev; 1962 aio_req_t *next = NULL; 1963 1964 if (_aio_hash != NULL) { 1965 hashp = _aio_hash + AIOHASH(resultp); 1966 lmutex_lock(&hashp->hash_lock); 1967 prev = &hashp->hash_ptr; 1968 while ((next = *prev) != NULL) { 1969 if (resultp == next->req_resultp) { 1970 *prev = next->req_link; 1971 next->req_link = NULL; 1972 break; 1973 } 1974 prev = &next->req_link; 1975 } 1976 lmutex_unlock(&hashp->hash_lock); 1977 } 1978 return (next); 1979 } 1980 1981 /* 1982 * find an entry in the hash table 1983 */ 1984 aio_req_t * 1985 _aio_hash_find(aio_result_t *resultp) 1986 { 1987 aio_hash_t *hashp; 1988 aio_req_t **prev; 1989 aio_req_t *next = NULL; 1990 1991 if (_aio_hash != NULL) { 1992 hashp = _aio_hash + AIOHASH(resultp); 1993 lmutex_lock(&hashp->hash_lock); 1994 prev = &hashp->hash_ptr; 1995 while ((next = *prev) != NULL) { 1996 if (resultp == next->req_resultp) 1997 break; 1998 prev = &next->req_link; 1999 } 2000 lmutex_unlock(&hashp->hash_lock); 2001 } 2002 return (next); 2003 } 2004 2005 /* 2006 * AIO interface for POSIX 2007 */ 2008 int 2009 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2010 int mode, int flg) 2011 { 2012 aio_req_t *reqp; 2013 aio_args_t *ap; 2014 int kerr; 2015 2016 if (aiocbp == NULL) { 2017 errno = EINVAL; 2018 return (-1); 2019 } 2020 2021 /* initialize kaio */ 2022 if (!_kaio_ok) 2023 _kaio_init(); 2024 2025 aiocbp->aio_state = NOCHECK; 2026 2027 /* 2028 * If we have been called because a list I/O 2029 * kaio() failed, we dont want to repeat the 2030 * system call 2031 */ 2032 2033 if (flg & AIO_KAIO) { 2034 /* 2035 * Try kernel aio first. 2036 * If errno is ENOTSUP/EBADFD, 2037 * fall back to the thread implementation. 2038 */ 2039 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2040 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2041 aiocbp->aio_state = CHECK; 2042 kerr = (int)_kaio(mode, aiocbp); 2043 if (kerr == 0) 2044 return (0); 2045 if (errno != ENOTSUP && errno != EBADFD) { 2046 aiocbp->aio_resultp.aio_errno = errno; 2047 aiocbp->aio_resultp.aio_return = -1; 2048 aiocbp->aio_state = NOCHECK; 2049 return (-1); 2050 } 2051 if (errno == EBADFD) 2052 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2053 } 2054 } 2055 2056 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2057 aiocbp->aio_state = USERAIO; 2058 2059 if (!__uaio_ok && __uaio_init() == -1) 2060 return (-1); 2061 2062 if ((reqp = _aio_req_alloc()) == NULL) { 2063 errno = EAGAIN; 2064 return (-1); 2065 } 2066 2067 /* 2068 * If an LIO request, add the list head to the aio request 2069 */ 2070 reqp->req_head = lio_head; 2071 reqp->req_type = AIO_POSIX_REQ; 2072 reqp->req_op = mode; 2073 reqp->req_largefile = 0; 2074 2075 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2076 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2077 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2078 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2079 reqp->req_sigevent.sigev_signo = 2080 aiocbp->aio_sigevent.sigev_signo; 2081 reqp->req_sigevent.sigev_value.sival_ptr = 2082 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2083 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2084 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2085 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2086 /* 2087 * Reuse the sigevent structure to contain the port number 2088 * and the user value. Same for SIGEV_THREAD, below. 2089 */ 2090 reqp->req_sigevent.sigev_signo = 2091 pn->portnfy_port; 2092 reqp->req_sigevent.sigev_value.sival_ptr = 2093 pn->portnfy_user; 2094 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2095 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2096 /* 2097 * The sigevent structure contains the port number 2098 * and the user value. Same for SIGEV_PORT, above. 2099 */ 2100 reqp->req_sigevent.sigev_signo = 2101 aiocbp->aio_sigevent.sigev_signo; 2102 reqp->req_sigevent.sigev_value.sival_ptr = 2103 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2104 } 2105 2106 reqp->req_resultp = &aiocbp->aio_resultp; 2107 reqp->req_aiocbp = aiocbp; 2108 ap = &reqp->req_args; 2109 ap->fd = aiocbp->aio_fildes; 2110 ap->buf = (caddr_t)aiocbp->aio_buf; 2111 ap->bufsz = aiocbp->aio_nbytes; 2112 ap->offset = aiocbp->aio_offset; 2113 2114 if ((flg & AIO_NO_DUPS) && 2115 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2116 aio_panic("_aio_rw(): request already in hash table"); 2117 _aio_req_free(reqp); 2118 errno = EINVAL; 2119 return (-1); 2120 } 2121 _aio_req_add(reqp, nextworker, mode); 2122 return (0); 2123 } 2124 2125 #if !defined(_LP64) 2126 /* 2127 * 64-bit AIO interface for POSIX 2128 */ 2129 int 2130 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2131 int mode, int flg) 2132 { 2133 aio_req_t *reqp; 2134 aio_args_t *ap; 2135 int kerr; 2136 2137 if (aiocbp == NULL) { 2138 errno = EINVAL; 2139 return (-1); 2140 } 2141 2142 /* initialize kaio */ 2143 if (!_kaio_ok) 2144 _kaio_init(); 2145 2146 aiocbp->aio_state = NOCHECK; 2147 2148 /* 2149 * If we have been called because a list I/O 2150 * kaio() failed, we dont want to repeat the 2151 * system call 2152 */ 2153 2154 if (flg & AIO_KAIO) { 2155 /* 2156 * Try kernel aio first. 2157 * If errno is ENOTSUP/EBADFD, 2158 * fall back to the thread implementation. 2159 */ 2160 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2161 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2162 aiocbp->aio_state = CHECK; 2163 kerr = (int)_kaio(mode, aiocbp); 2164 if (kerr == 0) 2165 return (0); 2166 if (errno != ENOTSUP && errno != EBADFD) { 2167 aiocbp->aio_resultp.aio_errno = errno; 2168 aiocbp->aio_resultp.aio_return = -1; 2169 aiocbp->aio_state = NOCHECK; 2170 return (-1); 2171 } 2172 if (errno == EBADFD) 2173 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2174 } 2175 } 2176 2177 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2178 aiocbp->aio_state = USERAIO; 2179 2180 if (!__uaio_ok && __uaio_init() == -1) 2181 return (-1); 2182 2183 if ((reqp = _aio_req_alloc()) == NULL) { 2184 errno = EAGAIN; 2185 return (-1); 2186 } 2187 2188 /* 2189 * If an LIO request, add the list head to the aio request 2190 */ 2191 reqp->req_head = lio_head; 2192 reqp->req_type = AIO_POSIX_REQ; 2193 reqp->req_op = mode; 2194 reqp->req_largefile = 1; 2195 2196 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2197 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2198 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2199 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2200 reqp->req_sigevent.sigev_signo = 2201 aiocbp->aio_sigevent.sigev_signo; 2202 reqp->req_sigevent.sigev_value.sival_ptr = 2203 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2204 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2205 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2206 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2207 reqp->req_sigevent.sigev_signo = 2208 pn->portnfy_port; 2209 reqp->req_sigevent.sigev_value.sival_ptr = 2210 pn->portnfy_user; 2211 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2212 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2213 reqp->req_sigevent.sigev_signo = 2214 aiocbp->aio_sigevent.sigev_signo; 2215 reqp->req_sigevent.sigev_value.sival_ptr = 2216 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2217 } 2218 2219 reqp->req_resultp = &aiocbp->aio_resultp; 2220 reqp->req_aiocbp = aiocbp; 2221 ap = &reqp->req_args; 2222 ap->fd = aiocbp->aio_fildes; 2223 ap->buf = (caddr_t)aiocbp->aio_buf; 2224 ap->bufsz = aiocbp->aio_nbytes; 2225 ap->offset = aiocbp->aio_offset; 2226 2227 if ((flg & AIO_NO_DUPS) && 2228 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2229 aio_panic("_aio_rw64(): request already in hash table"); 2230 _aio_req_free(reqp); 2231 errno = EINVAL; 2232 return (-1); 2233 } 2234 _aio_req_add(reqp, nextworker, mode); 2235 return (0); 2236 } 2237 #endif /* !defined(_LP64) */ 2238