1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include "synonyms.h" 30 #include "thr_uberdata.h" 31 #include "asyncio.h" 32 #include <atomic.h> 33 #include <sys/param.h> 34 #include <sys/file.h> 35 #include <sys/port.h> 36 37 static int _aio_hash_insert(aio_result_t *, aio_req_t *); 38 static aio_req_t *_aio_req_get(aio_worker_t *); 39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 41 static void _aio_work_done(aio_worker_t *); 42 static void _aio_enq_doneq(aio_req_t *); 43 44 extern void _aio_lio_free(aio_lio_t *); 45 46 extern int __fdsync(int, int); 47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 48 49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 50 static void _aiodone(aio_req_t *, ssize_t, int); 51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 52 static void _aio_finish_request(aio_worker_t *, ssize_t, int); 53 54 /* 55 * switch for kernel async I/O 56 */ 57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 58 59 /* 60 * Key for thread-specific data 61 */ 62 pthread_key_t _aio_key; 63 64 /* 65 * Array for determining whether or not a file supports kaio. 66 * Initialized in _kaio_init(). 67 */ 68 uint32_t *_kaio_supported = NULL; 69 70 /* 71 * workers for read/write requests 72 * (__aio_mutex lock protects circular linked list of workers) 73 */ 74 aio_worker_t *__workers_rw; /* circular list of AIO workers */ 75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 76 int __rw_workerscnt; /* number of read/write workers */ 77 78 /* 79 * worker for notification requests. 80 */ 81 aio_worker_t *__workers_no; /* circular list of AIO workers */ 82 aio_worker_t *__nextworker_no; /* next worker in list of workers */ 83 int __no_workerscnt; /* number of write workers */ 84 85 aio_req_t *_aio_done_tail; /* list of done requests */ 86 aio_req_t *_aio_done_head; 87 88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 89 cond_t __aio_initcv = DEFAULTCV; 90 int __aio_initbusy = 0; 91 92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 94 95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 96 int _sigio_enabled = 0; /* when set, send SIGIO signal */ 97 98 aio_hash_t *_aio_hash; 99 100 aio_req_t *_aio_doneq; /* double linked done queue list */ 101 102 int _aio_donecnt = 0; 103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 104 int _aio_doneq_cnt = 0; 105 int _aio_outstand_cnt = 0; /* # of outstanding requests */ 106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 110 111 int _max_workers = 256; /* max number of workers permitted */ 112 int _min_workers = 4; /* min number of workers */ 113 int _minworkload = 2; /* min number of request in q */ 114 int _aio_worker_cnt = 0; /* number of workers to do requests */ 115 int __uaio_ok = 0; /* AIO has been enabled */ 116 sigset_t _worker_set; /* worker's signal mask */ 117 118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 119 int _aio_flags = 0; /* see asyncio.h defines for */ 120 121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 122 123 int hz; /* clock ticks per second */ 124 125 static int 126 _kaio_supported_init(void) 127 { 128 void *ptr; 129 size_t size; 130 131 if (_kaio_supported != NULL) /* already initialized */ 132 return (0); 133 134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 137 if (ptr == MAP_FAILED) 138 return (-1); 139 _kaio_supported = ptr; 140 return (0); 141 } 142 143 /* 144 * The aio subsystem is initialized when an AIO request is made. 145 * Constants are initialized like the max number of workers that 146 * the subsystem can create, and the minimum number of workers 147 * permitted before imposing some restrictions. Also, some 148 * workers are created. 149 */ 150 int 151 __uaio_init(void) 152 { 153 int ret = -1; 154 int i; 155 156 lmutex_lock(&__aio_initlock); 157 while (__aio_initbusy) 158 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 159 if (__uaio_ok) { /* already initialized */ 160 lmutex_unlock(&__aio_initlock); 161 return (0); 162 } 163 __aio_initbusy = 1; 164 lmutex_unlock(&__aio_initlock); 165 166 hz = (int)sysconf(_SC_CLK_TCK); 167 __pid = getpid(); 168 169 setup_cancelsig(SIGAIOCANCEL); 170 171 if (_kaio_supported_init() != 0) 172 goto out; 173 174 /* 175 * Allocate and initialize the hash table. 176 */ 177 /* LINTED pointer cast */ 178 _aio_hash = (aio_hash_t *)mmap(NULL, 179 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 180 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 181 if ((void *)_aio_hash == MAP_FAILED) { 182 _aio_hash = NULL; 183 goto out; 184 } 185 for (i = 0; i < HASHSZ; i++) 186 (void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL); 187 188 /* 189 * Initialize worker's signal mask to only catch SIGAIOCANCEL. 190 */ 191 (void) sigfillset(&_worker_set); 192 (void) sigdelset(&_worker_set, SIGAIOCANCEL); 193 194 /* 195 * Create the minimum number of read/write workers. 196 */ 197 for (i = 0; i < _min_workers; i++) 198 (void) _aio_create_worker(NULL, AIOREAD); 199 200 /* 201 * Create one worker to send asynchronous notifications. 202 */ 203 (void) _aio_create_worker(NULL, AIONOTIFY); 204 205 ret = 0; 206 out: 207 lmutex_lock(&__aio_initlock); 208 if (ret == 0) 209 __uaio_ok = 1; 210 __aio_initbusy = 0; 211 (void) cond_broadcast(&__aio_initcv); 212 lmutex_unlock(&__aio_initlock); 213 return (ret); 214 } 215 216 /* 217 * Called from close() before actually performing the real _close(). 218 */ 219 void 220 _aio_close(int fd) 221 { 222 if (fd < 0) /* avoid cancelling everything */ 223 return; 224 /* 225 * Cancel all outstanding aio requests for this file descriptor. 226 */ 227 if (__uaio_ok) 228 (void) aiocancel_all(fd); 229 /* 230 * If we have allocated the bit array, clear the bit for this file. 231 * The next open may re-use this file descriptor and the new file 232 * may have different kaio() behaviour. 233 */ 234 if (_kaio_supported != NULL) 235 CLEAR_KAIO_SUPPORTED(fd); 236 } 237 238 /* 239 * special kaio cleanup thread sits in a loop in the 240 * kernel waiting for pending kaio requests to complete. 241 */ 242 void * 243 _kaio_cleanup_thread(void *arg) 244 { 245 if (pthread_setspecific(_aio_key, arg) != 0) 246 aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 247 (void) _kaio(AIOSTART); 248 return (arg); 249 } 250 251 /* 252 * initialize kaio. 253 */ 254 void 255 _kaio_init() 256 { 257 int error; 258 sigset_t oset; 259 260 lmutex_lock(&__aio_initlock); 261 while (__aio_initbusy) 262 (void) _cond_wait(&__aio_initcv, &__aio_initlock); 263 if (_kaio_ok) { /* already initialized */ 264 lmutex_unlock(&__aio_initlock); 265 return; 266 } 267 __aio_initbusy = 1; 268 lmutex_unlock(&__aio_initlock); 269 270 if (_kaio_supported_init() != 0) 271 error = ENOMEM; 272 else if ((_kaiowp = _aio_worker_alloc()) == NULL) 273 error = ENOMEM; 274 else if ((error = (int)_kaio(AIOINIT)) == 0) { 275 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 276 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 277 _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 278 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 279 } 280 if (error && _kaiowp != NULL) { 281 _aio_worker_free(_kaiowp); 282 _kaiowp = NULL; 283 } 284 285 lmutex_lock(&__aio_initlock); 286 if (error) 287 _kaio_ok = -1; 288 else 289 _kaio_ok = 1; 290 __aio_initbusy = 0; 291 (void) cond_broadcast(&__aio_initcv); 292 lmutex_unlock(&__aio_initlock); 293 } 294 295 int 296 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 297 aio_result_t *resultp) 298 { 299 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 300 } 301 302 int 303 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 304 aio_result_t *resultp) 305 { 306 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 307 } 308 309 #if !defined(_LP64) 310 int 311 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 312 aio_result_t *resultp) 313 { 314 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 315 } 316 317 int 318 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 319 aio_result_t *resultp) 320 { 321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 322 } 323 #endif /* !defined(_LP64) */ 324 325 int 326 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 327 aio_result_t *resultp, int mode) 328 { 329 aio_req_t *reqp; 330 aio_args_t *ap; 331 offset_t loffset; 332 struct stat stat; 333 int error = 0; 334 int kerr; 335 int umode; 336 337 switch (whence) { 338 339 case SEEK_SET: 340 loffset = offset; 341 break; 342 case SEEK_CUR: 343 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 344 error = -1; 345 else 346 loffset += offset; 347 break; 348 case SEEK_END: 349 if (fstat(fd, &stat) == -1) 350 error = -1; 351 else 352 loffset = offset + stat.st_size; 353 break; 354 default: 355 errno = EINVAL; 356 error = -1; 357 } 358 359 if (error) 360 return (error); 361 362 /* initialize kaio */ 363 if (!_kaio_ok) 364 _kaio_init(); 365 366 /* 367 * _aio_do_request() needs the original request code (mode) to be able 368 * to choose the appropiate 32/64 bit function. All other functions 369 * only require the difference between READ and WRITE (umode). 370 */ 371 if (mode == AIOAREAD64 || mode == AIOAWRITE64) 372 umode = mode - AIOAREAD64; 373 else 374 umode = mode; 375 376 /* 377 * Try kernel aio first. 378 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 379 */ 380 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 381 resultp->aio_errno = 0; 382 sig_mutex_lock(&__aio_mutex); 383 _kaio_outstand_cnt++; 384 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 385 (umode | AIO_POLL_BIT) : umode), 386 fd, buf, bufsz, loffset, resultp); 387 if (kerr == 0) { 388 sig_mutex_unlock(&__aio_mutex); 389 return (0); 390 } 391 _kaio_outstand_cnt--; 392 sig_mutex_unlock(&__aio_mutex); 393 if (errno != ENOTSUP && errno != EBADFD) 394 return (-1); 395 if (errno == EBADFD) 396 SET_KAIO_NOT_SUPPORTED(fd); 397 } 398 399 if (!__uaio_ok && __uaio_init() == -1) 400 return (-1); 401 402 if ((reqp = _aio_req_alloc()) == NULL) { 403 errno = EAGAIN; 404 return (-1); 405 } 406 407 /* 408 * _aio_do_request() checks reqp->req_op to differentiate 409 * between 32 and 64 bit access. 410 */ 411 reqp->req_op = mode; 412 reqp->req_resultp = resultp; 413 ap = &reqp->req_args; 414 ap->fd = fd; 415 ap->buf = buf; 416 ap->bufsz = bufsz; 417 ap->offset = loffset; 418 419 if (_aio_hash_insert(resultp, reqp) != 0) { 420 _aio_req_free(reqp); 421 errno = EINVAL; 422 return (-1); 423 } 424 /* 425 * _aio_req_add() only needs the difference between READ and 426 * WRITE to choose the right worker queue. 427 */ 428 _aio_req_add(reqp, &__nextworker_rw, umode); 429 return (0); 430 } 431 432 int 433 aiocancel(aio_result_t *resultp) 434 { 435 aio_req_t *reqp; 436 aio_worker_t *aiowp; 437 int ret; 438 int done = 0; 439 int canceled = 0; 440 441 if (!__uaio_ok) { 442 errno = EINVAL; 443 return (-1); 444 } 445 446 sig_mutex_lock(&__aio_mutex); 447 reqp = _aio_hash_find(resultp); 448 if (reqp == NULL) { 449 if (_aio_outstand_cnt == _aio_req_done_cnt) 450 errno = EINVAL; 451 else 452 errno = EACCES; 453 ret = -1; 454 } else { 455 aiowp = reqp->req_worker; 456 sig_mutex_lock(&aiowp->work_qlock1); 457 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 458 sig_mutex_unlock(&aiowp->work_qlock1); 459 460 if (canceled) { 461 ret = 0; 462 } else { 463 if (_aio_outstand_cnt == 0 || 464 _aio_outstand_cnt == _aio_req_done_cnt) 465 errno = EINVAL; 466 else 467 errno = EACCES; 468 ret = -1; 469 } 470 } 471 sig_mutex_unlock(&__aio_mutex); 472 return (ret); 473 } 474 475 /* 476 * This must be asynch safe 477 */ 478 aio_result_t * 479 aiowait(struct timeval *uwait) 480 { 481 aio_result_t *uresultp; 482 aio_result_t *kresultp; 483 aio_result_t *resultp; 484 int dontblock; 485 int timedwait = 0; 486 int kaio_errno = 0; 487 struct timeval twait; 488 struct timeval *wait = NULL; 489 hrtime_t hrtend; 490 hrtime_t hres; 491 492 if (uwait) { 493 /* 494 * Check for a valid specified wait time. 495 * If it is invalid, fail the call right away. 496 */ 497 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 498 uwait->tv_usec >= MICROSEC) { 499 errno = EINVAL; 500 return ((aio_result_t *)-1); 501 } 502 503 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 504 hrtend = gethrtime() + 505 (hrtime_t)uwait->tv_sec * NANOSEC + 506 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 507 twait = *uwait; 508 wait = &twait; 509 timedwait++; 510 } else { 511 /* polling */ 512 sig_mutex_lock(&__aio_mutex); 513 if (_kaio_outstand_cnt == 0) { 514 kresultp = (aio_result_t *)-1; 515 } else { 516 kresultp = (aio_result_t *)_kaio(AIOWAIT, 517 (struct timeval *)-1, 1); 518 if (kresultp != (aio_result_t *)-1 && 519 kresultp != NULL && 520 kresultp != (aio_result_t *)1) { 521 _kaio_outstand_cnt--; 522 sig_mutex_unlock(&__aio_mutex); 523 return (kresultp); 524 } 525 } 526 uresultp = _aio_req_done(); 527 sig_mutex_unlock(&__aio_mutex); 528 if (uresultp != NULL && 529 uresultp != (aio_result_t *)-1) { 530 return (uresultp); 531 } 532 if (uresultp == (aio_result_t *)-1 && 533 kresultp == (aio_result_t *)-1) { 534 errno = EINVAL; 535 return ((aio_result_t *)-1); 536 } else { 537 return (NULL); 538 } 539 } 540 } 541 542 for (;;) { 543 sig_mutex_lock(&__aio_mutex); 544 uresultp = _aio_req_done(); 545 if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 546 sig_mutex_unlock(&__aio_mutex); 547 resultp = uresultp; 548 break; 549 } 550 _aiowait_flag++; 551 dontblock = (uresultp == (aio_result_t *)-1); 552 if (dontblock && _kaio_outstand_cnt == 0) { 553 kresultp = (aio_result_t *)-1; 554 kaio_errno = EINVAL; 555 } else { 556 sig_mutex_unlock(&__aio_mutex); 557 kresultp = (aio_result_t *)_kaio(AIOWAIT, 558 wait, dontblock); 559 sig_mutex_lock(&__aio_mutex); 560 kaio_errno = errno; 561 } 562 _aiowait_flag--; 563 sig_mutex_unlock(&__aio_mutex); 564 if (kresultp == (aio_result_t *)1) { 565 /* aiowait() awakened by an aionotify() */ 566 continue; 567 } else if (kresultp != NULL && 568 kresultp != (aio_result_t *)-1) { 569 resultp = kresultp; 570 sig_mutex_lock(&__aio_mutex); 571 _kaio_outstand_cnt--; 572 sig_mutex_unlock(&__aio_mutex); 573 break; 574 } else if (kresultp == (aio_result_t *)-1 && 575 kaio_errno == EINVAL && 576 uresultp == (aio_result_t *)-1) { 577 errno = kaio_errno; 578 resultp = (aio_result_t *)-1; 579 break; 580 } else if (kresultp == (aio_result_t *)-1 && 581 kaio_errno == EINTR) { 582 errno = kaio_errno; 583 resultp = (aio_result_t *)-1; 584 break; 585 } else if (timedwait) { 586 hres = hrtend - gethrtime(); 587 if (hres <= 0) { 588 /* time is up; return */ 589 resultp = NULL; 590 break; 591 } else { 592 /* 593 * Some time left. Round up the remaining time 594 * in nanoseconds to microsec. Retry the call. 595 */ 596 hres += (NANOSEC / MICROSEC) - 1; 597 wait->tv_sec = hres / NANOSEC; 598 wait->tv_usec = 599 (hres % NANOSEC) / (NANOSEC / MICROSEC); 600 } 601 } else { 602 ASSERT(kresultp == NULL && uresultp == NULL); 603 resultp = NULL; 604 continue; 605 } 606 } 607 return (resultp); 608 } 609 610 /* 611 * _aio_get_timedelta calculates the remaining time and stores the result 612 * into timespec_t *wait. 613 */ 614 615 int 616 _aio_get_timedelta(timespec_t *end, timespec_t *wait) 617 { 618 int ret = 0; 619 struct timeval cur; 620 timespec_t curtime; 621 622 (void) gettimeofday(&cur, NULL); 623 curtime.tv_sec = cur.tv_sec; 624 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 625 626 if (end->tv_sec >= curtime.tv_sec) { 627 wait->tv_sec = end->tv_sec - curtime.tv_sec; 628 if (end->tv_nsec >= curtime.tv_nsec) { 629 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 630 if (wait->tv_sec == 0 && wait->tv_nsec == 0) 631 ret = -1; /* timer expired */ 632 } else { 633 if (end->tv_sec > curtime.tv_sec) { 634 wait->tv_sec -= 1; 635 wait->tv_nsec = NANOSEC - 636 (curtime.tv_nsec - end->tv_nsec); 637 } else { 638 ret = -1; /* timer expired */ 639 } 640 } 641 } else { 642 ret = -1; 643 } 644 return (ret); 645 } 646 647 /* 648 * If closing by file descriptor: we will simply cancel all the outstanding 649 * aio`s and return. Those aio's in question will have either noticed the 650 * cancellation notice before, during, or after initiating io. 651 */ 652 int 653 aiocancel_all(int fd) 654 { 655 aio_req_t *reqp; 656 aio_req_t **reqpp; 657 aio_worker_t *first; 658 aio_worker_t *next; 659 int canceled = 0; 660 int done = 0; 661 int cancelall = 0; 662 663 sig_mutex_lock(&__aio_mutex); 664 665 if (_aio_outstand_cnt == 0) { 666 sig_mutex_unlock(&__aio_mutex); 667 return (AIO_ALLDONE); 668 } 669 670 /* 671 * Cancel requests from the read/write workers' queues. 672 */ 673 first = __nextworker_rw; 674 next = first; 675 do { 676 _aio_cancel_work(next, fd, &canceled, &done); 677 } while ((next = next->work_forw) != first); 678 679 /* 680 * finally, check if there are requests on the done queue that 681 * should be canceled. 682 */ 683 if (fd < 0) 684 cancelall = 1; 685 reqpp = &_aio_done_tail; 686 while ((reqp = *reqpp) != NULL) { 687 if (cancelall || reqp->req_args.fd == fd) { 688 *reqpp = reqp->req_next; 689 _aio_donecnt--; 690 (void) _aio_hash_del(reqp->req_resultp); 691 _aio_req_free(reqp); 692 } else 693 reqpp = &reqp->req_next; 694 } 695 if (cancelall) { 696 ASSERT(_aio_donecnt == 0); 697 _aio_done_head = NULL; 698 } 699 sig_mutex_unlock(&__aio_mutex); 700 701 if (canceled && done == 0) 702 return (AIO_CANCELED); 703 else if (done && canceled == 0) 704 return (AIO_ALLDONE); 705 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 706 return ((int)_kaio(AIOCANCEL, fd, NULL)); 707 return (AIO_NOTCANCELED); 708 } 709 710 /* 711 * Cancel requests from a given work queue. If the file descriptor 712 * parameter, fd, is non-negative, then only cancel those requests 713 * in this queue that are to this file descriptor. If the fd 714 * parameter is -1, then cancel all requests. 715 */ 716 static void 717 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 718 { 719 aio_req_t *reqp; 720 721 sig_mutex_lock(&aiowp->work_qlock1); 722 /* 723 * cancel queued requests first. 724 */ 725 reqp = aiowp->work_tail1; 726 while (reqp != NULL) { 727 if (fd < 0 || reqp->req_args.fd == fd) { 728 if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 729 /* 730 * Callers locks were dropped. 731 * reqp is invalid; start traversing 732 * the list from the beginning again. 733 */ 734 reqp = aiowp->work_tail1; 735 continue; 736 } 737 } 738 reqp = reqp->req_next; 739 } 740 /* 741 * Since the queued requests have been canceled, there can 742 * only be one inprogress request that should be canceled. 743 */ 744 if ((reqp = aiowp->work_req) != NULL && 745 (fd < 0 || reqp->req_args.fd == fd)) 746 (void) _aio_cancel_req(aiowp, reqp, canceled, done); 747 sig_mutex_unlock(&aiowp->work_qlock1); 748 } 749 750 /* 751 * Cancel a request. Return 1 if the callers locks were temporarily 752 * dropped, otherwise return 0. 753 */ 754 int 755 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 756 { 757 int ostate = reqp->req_state; 758 759 ASSERT(MUTEX_HELD(&__aio_mutex)); 760 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 761 if (ostate == AIO_REQ_CANCELED) 762 return (0); 763 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 764 (*done)++; 765 return (0); 766 } 767 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 768 ASSERT(POSIX_AIO(reqp)); 769 /* Cancel the queued aio_fsync() request */ 770 if (!reqp->req_head->lio_canned) { 771 reqp->req_head->lio_canned = 1; 772 _aio_outstand_cnt--; 773 (*canceled)++; 774 } 775 return (0); 776 } 777 reqp->req_state = AIO_REQ_CANCELED; 778 _aio_req_del(aiowp, reqp, ostate); 779 (void) _aio_hash_del(reqp->req_resultp); 780 (*canceled)++; 781 if (reqp == aiowp->work_req) { 782 ASSERT(ostate == AIO_REQ_INPROGRESS); 783 /* 784 * Set the result values now, before _aiodone() is called. 785 * We do this because the application can expect aio_return 786 * and aio_errno to be set to -1 and ECANCELED, respectively, 787 * immediately after a successful return from aiocancel() 788 * or aio_cancel(). 789 */ 790 _aio_set_result(reqp, -1, ECANCELED); 791 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 792 return (0); 793 } 794 if (!POSIX_AIO(reqp)) { 795 _aio_outstand_cnt--; 796 _aio_set_result(reqp, -1, ECANCELED); 797 return (0); 798 } 799 sig_mutex_unlock(&aiowp->work_qlock1); 800 sig_mutex_unlock(&__aio_mutex); 801 _aiodone(reqp, -1, ECANCELED); 802 sig_mutex_lock(&__aio_mutex); 803 sig_mutex_lock(&aiowp->work_qlock1); 804 return (1); 805 } 806 807 int 808 _aio_create_worker(aio_req_t *reqp, int mode) 809 { 810 aio_worker_t *aiowp, **workers, **nextworker; 811 int *aio_workerscnt; 812 void *(*func)(void *); 813 sigset_t oset; 814 int error; 815 816 /* 817 * Put the new worker thread in the right queue. 818 */ 819 switch (mode) { 820 case AIOREAD: 821 case AIOWRITE: 822 case AIOAREAD: 823 case AIOAWRITE: 824 #if !defined(_LP64) 825 case AIOAREAD64: 826 case AIOAWRITE64: 827 #endif 828 workers = &__workers_rw; 829 nextworker = &__nextworker_rw; 830 aio_workerscnt = &__rw_workerscnt; 831 func = _aio_do_request; 832 break; 833 case AIONOTIFY: 834 workers = &__workers_no; 835 nextworker = &__nextworker_no; 836 func = _aio_do_notify; 837 aio_workerscnt = &__no_workerscnt; 838 break; 839 default: 840 aio_panic("_aio_create_worker: invalid mode"); 841 break; 842 } 843 844 if ((aiowp = _aio_worker_alloc()) == NULL) 845 return (-1); 846 847 if (reqp) { 848 reqp->req_state = AIO_REQ_QUEUED; 849 reqp->req_worker = aiowp; 850 aiowp->work_head1 = reqp; 851 aiowp->work_tail1 = reqp; 852 aiowp->work_next1 = reqp; 853 aiowp->work_count1 = 1; 854 aiowp->work_minload1 = 1; 855 } 856 857 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 858 error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 859 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 860 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 861 if (error) { 862 if (reqp) { 863 reqp->req_state = 0; 864 reqp->req_worker = NULL; 865 } 866 _aio_worker_free(aiowp); 867 return (-1); 868 } 869 870 lmutex_lock(&__aio_mutex); 871 (*aio_workerscnt)++; 872 if (*workers == NULL) { 873 aiowp->work_forw = aiowp; 874 aiowp->work_backw = aiowp; 875 *nextworker = aiowp; 876 *workers = aiowp; 877 } else { 878 aiowp->work_backw = (*workers)->work_backw; 879 aiowp->work_forw = (*workers); 880 (*workers)->work_backw->work_forw = aiowp; 881 (*workers)->work_backw = aiowp; 882 } 883 _aio_worker_cnt++; 884 lmutex_unlock(&__aio_mutex); 885 886 (void) thr_continue(aiowp->work_tid); 887 888 return (0); 889 } 890 891 /* 892 * This is the worker's main routine. 893 * The task of this function is to execute all queued requests; 894 * once the last pending request is executed this function will block 895 * in _aio_idle(). A new incoming request must wakeup this thread to 896 * restart the work. 897 * Every worker has an own work queue. The queue lock is required 898 * to synchronize the addition of new requests for this worker or 899 * cancellation of pending/running requests. 900 * 901 * Cancellation scenarios: 902 * The cancellation of a request is being done asynchronously using 903 * _aio_cancel_req() from another thread context. 904 * A queued request can be cancelled in different manners : 905 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 906 * - lock the queue -> remove the request -> unlock the queue 907 * - this function/thread does not detect this cancellation process 908 * b) request is in progress (AIO_REQ_INPROGRESS) : 909 * - this function first allow the cancellation of the running 910 * request with the flag "work_cancel_flg=1" 911 * see _aio_req_get() -> _aio_cancel_on() 912 * During this phase, it is allowed to interrupt the worker 913 * thread running the request (this thread) using the SIGAIOCANCEL 914 * signal. 915 * Once this thread returns from the kernel (because the request 916 * is just done), then it must disable a possible cancellation 917 * and proceed to finish the request. To disable the cancellation 918 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 919 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 920 * same procedure as in a) 921 * 922 * To b) 923 * This thread uses sigsetjmp() to define the position in the code, where 924 * it wish to continue working in the case that a SIGAIOCANCEL signal 925 * is detected. 926 * Normally this thread should get the cancellation signal during the 927 * kernel phase (reading or writing). In that case the signal handler 928 * aiosigcancelhndlr() is activated using the worker thread context, 929 * which again will use the siglongjmp() function to break the standard 930 * code flow and jump to the "sigsetjmp" position, provided that 931 * "work_cancel_flg" is set to "1". 932 * Because the "work_cancel_flg" is only manipulated by this worker 933 * thread and it can only run on one CPU at a given time, it is not 934 * necessary to protect that flag with the queue lock. 935 * Returning from the kernel (read or write system call) we must 936 * first disable the use of the SIGAIOCANCEL signal and accordingly 937 * the use of the siglongjmp() function to prevent a possible deadlock: 938 * - It can happens that this worker thread returns from the kernel and 939 * blocks in "work_qlock1", 940 * - then a second thread cancels the apparently "in progress" request 941 * and sends the SIGAIOCANCEL signal to the worker thread, 942 * - the worker thread gets assigned the "work_qlock1" and will returns 943 * from the kernel, 944 * - the kernel detects the pending signal and activates the signal 945 * handler instead, 946 * - if the "work_cancel_flg" is still set then the signal handler 947 * should use siglongjmp() to cancel the "in progress" request and 948 * it would try to acquire the same work_qlock1 in _aio_req_get() 949 * for a second time => deadlock. 950 * To avoid that situation we disable the cancellation of the request 951 * in progress BEFORE we try to acquire the work_qlock1. 952 * In that case the signal handler will not call siglongjmp() and the 953 * worker thread will continue running the standard code flow. 954 * Then this thread must check the AIO_REQ_CANCELED flag to emulate 955 * an eventually required siglongjmp() freeing the work_qlock1 and 956 * avoiding a deadlock. 957 */ 958 void * 959 _aio_do_request(void *arglist) 960 { 961 aio_worker_t *aiowp = (aio_worker_t *)arglist; 962 ulwp_t *self = curthread; 963 struct aio_args *arg; 964 aio_req_t *reqp; /* current AIO request */ 965 ssize_t retval; 966 int error; 967 968 if (pthread_setspecific(_aio_key, aiowp) != 0) 969 aio_panic("_aio_do_request, pthread_setspecific()"); 970 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 971 ASSERT(aiowp->work_req == NULL); 972 973 /* 974 * We resume here when an operation is cancelled. 975 * On first entry, aiowp->work_req == NULL, so all 976 * we do is block SIGAIOCANCEL. 977 */ 978 (void) sigsetjmp(aiowp->work_jmp_buf, 0); 979 ASSERT(self->ul_sigdefer == 0); 980 981 sigoff(self); /* block SIGAIOCANCEL */ 982 if (aiowp->work_req != NULL) 983 _aio_finish_request(aiowp, -1, ECANCELED); 984 985 for (;;) { 986 /* 987 * Put completed requests on aio_done_list. This has 988 * to be done as part of the main loop to ensure that 989 * we don't artificially starve any aiowait'ers. 990 */ 991 if (aiowp->work_done1) 992 _aio_work_done(aiowp); 993 994 top: 995 /* consume any deferred SIGAIOCANCEL signal here */ 996 sigon(self); 997 sigoff(self); 998 999 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1000 if (_aio_idle(aiowp) != 0) 1001 goto top; 1002 } 1003 arg = &reqp->req_args; 1004 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 1005 reqp->req_state == AIO_REQ_CANCELED); 1006 error = 0; 1007 1008 switch (reqp->req_op) { 1009 case AIOREAD: 1010 case AIOAREAD: 1011 sigon(self); /* unblock SIGAIOCANCEL */ 1012 retval = pread(arg->fd, arg->buf, 1013 arg->bufsz, arg->offset); 1014 if (retval == -1) { 1015 if (errno == ESPIPE) { 1016 retval = read(arg->fd, 1017 arg->buf, arg->bufsz); 1018 if (retval == -1) 1019 error = errno; 1020 } else { 1021 error = errno; 1022 } 1023 } 1024 sigoff(self); /* block SIGAIOCANCEL */ 1025 break; 1026 case AIOWRITE: 1027 case AIOAWRITE: 1028 sigon(self); /* unblock SIGAIOCANCEL */ 1029 retval = pwrite(arg->fd, arg->buf, 1030 arg->bufsz, arg->offset); 1031 if (retval == -1) { 1032 if (errno == ESPIPE) { 1033 retval = write(arg->fd, 1034 arg->buf, arg->bufsz); 1035 if (retval == -1) 1036 error = errno; 1037 } else { 1038 error = errno; 1039 } 1040 } 1041 sigoff(self); /* block SIGAIOCANCEL */ 1042 break; 1043 #if !defined(_LP64) 1044 case AIOAREAD64: 1045 sigon(self); /* unblock SIGAIOCANCEL */ 1046 retval = pread64(arg->fd, arg->buf, 1047 arg->bufsz, arg->offset); 1048 if (retval == -1) { 1049 if (errno == ESPIPE) { 1050 retval = read(arg->fd, 1051 arg->buf, arg->bufsz); 1052 if (retval == -1) 1053 error = errno; 1054 } else { 1055 error = errno; 1056 } 1057 } 1058 sigoff(self); /* block SIGAIOCANCEL */ 1059 break; 1060 case AIOAWRITE64: 1061 sigon(self); /* unblock SIGAIOCANCEL */ 1062 retval = pwrite64(arg->fd, arg->buf, 1063 arg->bufsz, arg->offset); 1064 if (retval == -1) { 1065 if (errno == ESPIPE) { 1066 retval = write(arg->fd, 1067 arg->buf, arg->bufsz); 1068 if (retval == -1) 1069 error = errno; 1070 } else { 1071 error = errno; 1072 } 1073 } 1074 sigoff(self); /* block SIGAIOCANCEL */ 1075 break; 1076 #endif /* !defined(_LP64) */ 1077 case AIOFSYNC: 1078 if (_aio_fsync_del(aiowp, reqp)) 1079 goto top; 1080 ASSERT(reqp->req_head == NULL); 1081 /* 1082 * All writes for this fsync request are now 1083 * acknowledged. Now make these writes visible 1084 * and put the final request into the hash table. 1085 */ 1086 if (reqp->req_state == AIO_REQ_CANCELED) { 1087 /* EMPTY */; 1088 } else if (arg->offset == O_SYNC) { 1089 if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 1090 error = errno; 1091 } else { 1092 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 1093 error = errno; 1094 } 1095 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 1096 aio_panic("_aio_do_request(): AIOFSYNC: " 1097 "request already in hash table"); 1098 break; 1099 default: 1100 aio_panic("_aio_do_request, bad op"); 1101 } 1102 1103 _aio_finish_request(aiowp, retval, error); 1104 } 1105 /* NOTREACHED */ 1106 return (NULL); 1107 } 1108 1109 /* 1110 * Perform the tail processing for _aio_do_request(). 1111 * The in-progress request may or may not have been cancelled. 1112 */ 1113 static void 1114 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 1115 { 1116 aio_req_t *reqp; 1117 1118 sig_mutex_lock(&aiowp->work_qlock1); 1119 if ((reqp = aiowp->work_req) == NULL) 1120 sig_mutex_unlock(&aiowp->work_qlock1); 1121 else { 1122 aiowp->work_req = NULL; 1123 if (reqp->req_state == AIO_REQ_CANCELED) { 1124 retval = -1; 1125 error = ECANCELED; 1126 } 1127 if (!POSIX_AIO(reqp)) { 1128 sig_mutex_unlock(&aiowp->work_qlock1); 1129 sig_mutex_lock(&__aio_mutex); 1130 if (reqp->req_state == AIO_REQ_INPROGRESS) 1131 reqp->req_state = AIO_REQ_DONE; 1132 _aio_req_done_cnt++; 1133 _aio_set_result(reqp, retval, error); 1134 if (error == ECANCELED) 1135 _aio_outstand_cnt--; 1136 sig_mutex_unlock(&__aio_mutex); 1137 } else { 1138 if (reqp->req_state == AIO_REQ_INPROGRESS) 1139 reqp->req_state = AIO_REQ_DONE; 1140 sig_mutex_unlock(&aiowp->work_qlock1); 1141 _aiodone(reqp, retval, error); 1142 } 1143 } 1144 } 1145 1146 void 1147 _aio_req_mark_done(aio_req_t *reqp) 1148 { 1149 #if !defined(_LP64) 1150 if (reqp->req_largefile) 1151 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1152 else 1153 #endif 1154 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 1155 } 1156 1157 /* 1158 * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 1159 * hopefully to consume one of our queued signals. 1160 */ 1161 static void 1162 _aio_delay(int ticks) 1163 { 1164 (void) usleep(ticks * (MICROSEC / hz)); 1165 } 1166 1167 /* 1168 * Actually send the notifications. 1169 * We could block indefinitely here if the application 1170 * is not listening for the signal or port notifications. 1171 */ 1172 static void 1173 send_notification(notif_param_t *npp) 1174 { 1175 extern int __sigqueue(pid_t pid, int signo, 1176 /* const union sigval */ void *value, int si_code, int block); 1177 1178 if (npp->np_signo) 1179 (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 1180 SI_ASYNCIO, 1); 1181 else if (npp->np_port >= 0) 1182 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 1183 npp->np_event, npp->np_object, npp->np_user); 1184 1185 if (npp->np_lio_signo) 1186 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 1187 SI_ASYNCIO, 1); 1188 else if (npp->np_lio_port >= 0) 1189 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 1190 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 1191 } 1192 1193 /* 1194 * Asynchronous notification worker. 1195 */ 1196 void * 1197 _aio_do_notify(void *arg) 1198 { 1199 aio_worker_t *aiowp = (aio_worker_t *)arg; 1200 aio_req_t *reqp; 1201 1202 /* 1203 * This isn't really necessary. All signals are blocked. 1204 */ 1205 if (pthread_setspecific(_aio_key, aiowp) != 0) 1206 aio_panic("_aio_do_notify, pthread_setspecific()"); 1207 1208 /* 1209 * Notifications are never cancelled. 1210 * All signals remain blocked, forever. 1211 */ 1212 for (;;) { 1213 while ((reqp = _aio_req_get(aiowp)) == NULL) { 1214 if (_aio_idle(aiowp) != 0) 1215 aio_panic("_aio_do_notify: _aio_idle() failed"); 1216 } 1217 send_notification(&reqp->req_notify); 1218 _aio_req_free(reqp); 1219 } 1220 1221 /* NOTREACHED */ 1222 return (NULL); 1223 } 1224 1225 /* 1226 * Do the completion semantics for a request that was either canceled 1227 * by _aio_cancel_req() or was completed by _aio_do_request(). 1228 */ 1229 static void 1230 _aiodone(aio_req_t *reqp, ssize_t retval, int error) 1231 { 1232 aio_result_t *resultp = reqp->req_resultp; 1233 int notify = 0; 1234 aio_lio_t *head; 1235 int sigev_none; 1236 int sigev_signal; 1237 int sigev_thread; 1238 int sigev_port; 1239 notif_param_t np; 1240 1241 /* 1242 * We call _aiodone() only for Posix I/O. 1243 */ 1244 ASSERT(POSIX_AIO(reqp)); 1245 1246 sigev_none = 0; 1247 sigev_signal = 0; 1248 sigev_thread = 0; 1249 sigev_port = 0; 1250 np.np_signo = 0; 1251 np.np_port = -1; 1252 np.np_lio_signo = 0; 1253 np.np_lio_port = -1; 1254 1255 switch (reqp->req_sigevent.sigev_notify) { 1256 case SIGEV_NONE: 1257 sigev_none = 1; 1258 break; 1259 case SIGEV_SIGNAL: 1260 sigev_signal = 1; 1261 break; 1262 case SIGEV_THREAD: 1263 sigev_thread = 1; 1264 break; 1265 case SIGEV_PORT: 1266 sigev_port = 1; 1267 break; 1268 default: 1269 aio_panic("_aiodone: improper sigev_notify"); 1270 break; 1271 } 1272 1273 /* 1274 * Figure out the notification parameters while holding __aio_mutex. 1275 * Actually perform the notifications after dropping __aio_mutex. 1276 * This allows us to sleep for a long time (if the notifications 1277 * incur delays) without impeding other async I/O operations. 1278 */ 1279 1280 sig_mutex_lock(&__aio_mutex); 1281 1282 if (sigev_signal) { 1283 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 1284 notify = 1; 1285 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1286 } else if (sigev_thread | sigev_port) { 1287 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 1288 notify = 1; 1289 np.np_event = reqp->req_op; 1290 if (np.np_event == AIOFSYNC && reqp->req_largefile) 1291 np.np_event = AIOFSYNC64; 1292 np.np_object = (uintptr_t)reqp->req_aiocbp; 1293 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 1294 } 1295 1296 if (resultp->aio_errno == EINPROGRESS) 1297 _aio_set_result(reqp, retval, error); 1298 1299 _aio_outstand_cnt--; 1300 1301 head = reqp->req_head; 1302 reqp->req_head = NULL; 1303 1304 if (sigev_none) { 1305 _aio_enq_doneq(reqp); 1306 reqp = NULL; 1307 } else { 1308 (void) _aio_hash_del(resultp); 1309 _aio_req_mark_done(reqp); 1310 } 1311 1312 _aio_waitn_wakeup(); 1313 1314 /* 1315 * __aio_waitn() sets AIO_WAIT_INPROGRESS and 1316 * __aio_suspend() increments "_aio_kernel_suspend" 1317 * when they are waiting in the kernel for completed I/Os. 1318 * 1319 * _kaio(AIONOTIFY) awakes the corresponding function 1320 * in the kernel; then the corresponding __aio_waitn() or 1321 * __aio_suspend() function could reap the recently 1322 * completed I/Os (_aiodone()). 1323 */ 1324 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 1325 (void) _kaio(AIONOTIFY); 1326 1327 sig_mutex_unlock(&__aio_mutex); 1328 1329 if (head != NULL) { 1330 /* 1331 * If all the lio requests have completed, 1332 * prepare to notify the waiting thread. 1333 */ 1334 sig_mutex_lock(&head->lio_mutex); 1335 ASSERT(head->lio_refcnt == head->lio_nent); 1336 if (head->lio_refcnt == 1) { 1337 int waiting = 0; 1338 if (head->lio_mode == LIO_WAIT) { 1339 if ((waiting = head->lio_waiting) != 0) 1340 (void) cond_signal(&head->lio_cond_cv); 1341 } else if (head->lio_port < 0) { /* none or signal */ 1342 if ((np.np_lio_signo = head->lio_signo) != 0) 1343 notify = 1; 1344 np.np_lio_user = head->lio_sigval.sival_ptr; 1345 } else { /* thread or port */ 1346 notify = 1; 1347 np.np_lio_port = head->lio_port; 1348 np.np_lio_event = head->lio_event; 1349 np.np_lio_object = 1350 (uintptr_t)head->lio_sigevent; 1351 np.np_lio_user = head->lio_sigval.sival_ptr; 1352 } 1353 head->lio_nent = head->lio_refcnt = 0; 1354 sig_mutex_unlock(&head->lio_mutex); 1355 if (waiting == 0) 1356 _aio_lio_free(head); 1357 } else { 1358 head->lio_nent--; 1359 head->lio_refcnt--; 1360 sig_mutex_unlock(&head->lio_mutex); 1361 } 1362 } 1363 1364 /* 1365 * The request is completed; now perform the notifications. 1366 */ 1367 if (notify) { 1368 if (reqp != NULL) { 1369 /* 1370 * We usually put the request on the notification 1371 * queue because we don't want to block and delay 1372 * other operations behind us in the work queue. 1373 * Also we must never block on a cancel notification 1374 * because we are being called from an application 1375 * thread in this case and that could lead to deadlock 1376 * if no other thread is receiving notificatins. 1377 */ 1378 reqp->req_notify = np; 1379 reqp->req_op = AIONOTIFY; 1380 _aio_req_add(reqp, &__workers_no, AIONOTIFY); 1381 reqp = NULL; 1382 } else { 1383 /* 1384 * We already put the request on the done queue, 1385 * so we can't queue it to the notification queue. 1386 * Just do the notification directly. 1387 */ 1388 send_notification(&np); 1389 } 1390 } 1391 1392 if (reqp != NULL) 1393 _aio_req_free(reqp); 1394 } 1395 1396 /* 1397 * Delete fsync requests from list head until there is 1398 * only one left. Return 0 when there is only one, 1399 * otherwise return a non-zero value. 1400 */ 1401 static int 1402 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 1403 { 1404 aio_lio_t *head = reqp->req_head; 1405 int rval = 0; 1406 1407 ASSERT(reqp == aiowp->work_req); 1408 sig_mutex_lock(&aiowp->work_qlock1); 1409 sig_mutex_lock(&head->lio_mutex); 1410 if (head->lio_refcnt > 1) { 1411 head->lio_refcnt--; 1412 head->lio_nent--; 1413 aiowp->work_req = NULL; 1414 sig_mutex_unlock(&head->lio_mutex); 1415 sig_mutex_unlock(&aiowp->work_qlock1); 1416 sig_mutex_lock(&__aio_mutex); 1417 _aio_outstand_cnt--; 1418 _aio_waitn_wakeup(); 1419 sig_mutex_unlock(&__aio_mutex); 1420 _aio_req_free(reqp); 1421 return (1); 1422 } 1423 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 1424 reqp->req_head = NULL; 1425 if (head->lio_canned) 1426 reqp->req_state = AIO_REQ_CANCELED; 1427 if (head->lio_mode == LIO_DESTROY) { 1428 aiowp->work_req = NULL; 1429 rval = 1; 1430 } 1431 sig_mutex_unlock(&head->lio_mutex); 1432 sig_mutex_unlock(&aiowp->work_qlock1); 1433 head->lio_refcnt--; 1434 head->lio_nent--; 1435 _aio_lio_free(head); 1436 if (rval != 0) 1437 _aio_req_free(reqp); 1438 return (rval); 1439 } 1440 1441 /* 1442 * A worker is set idle when its work queue is empty. 1443 * The worker checks again that it has no more work 1444 * and then goes to sleep waiting for more work. 1445 */ 1446 int 1447 _aio_idle(aio_worker_t *aiowp) 1448 { 1449 int error = 0; 1450 1451 sig_mutex_lock(&aiowp->work_qlock1); 1452 if (aiowp->work_count1 == 0) { 1453 ASSERT(aiowp->work_minload1 == 0); 1454 aiowp->work_idleflg = 1; 1455 /* 1456 * A cancellation handler is not needed here. 1457 * aio worker threads are never cancelled via pthread_cancel(). 1458 */ 1459 error = sig_cond_wait(&aiowp->work_idle_cv, 1460 &aiowp->work_qlock1); 1461 /* 1462 * The idle flag is normally cleared before worker is awakened 1463 * by aio_req_add(). On error (EINTR), we clear it ourself. 1464 */ 1465 if (error) 1466 aiowp->work_idleflg = 0; 1467 } 1468 sig_mutex_unlock(&aiowp->work_qlock1); 1469 return (error); 1470 } 1471 1472 /* 1473 * A worker's completed AIO requests are placed onto a global 1474 * done queue. The application is only sent a SIGIO signal if 1475 * the process has a handler enabled and it is not waiting via 1476 * aiowait(). 1477 */ 1478 static void 1479 _aio_work_done(aio_worker_t *aiowp) 1480 { 1481 aio_req_t *reqp; 1482 1483 sig_mutex_lock(&aiowp->work_qlock1); 1484 reqp = aiowp->work_prev1; 1485 reqp->req_next = NULL; 1486 aiowp->work_done1 = 0; 1487 aiowp->work_tail1 = aiowp->work_next1; 1488 if (aiowp->work_tail1 == NULL) 1489 aiowp->work_head1 = NULL; 1490 aiowp->work_prev1 = NULL; 1491 sig_mutex_unlock(&aiowp->work_qlock1); 1492 sig_mutex_lock(&__aio_mutex); 1493 _aio_donecnt++; 1494 _aio_outstand_cnt--; 1495 _aio_req_done_cnt--; 1496 ASSERT(_aio_donecnt > 0 && 1497 _aio_outstand_cnt >= 0 && 1498 _aio_req_done_cnt >= 0); 1499 ASSERT(reqp != NULL); 1500 1501 if (_aio_done_tail == NULL) { 1502 _aio_done_head = _aio_done_tail = reqp; 1503 } else { 1504 _aio_done_head->req_next = reqp; 1505 _aio_done_head = reqp; 1506 } 1507 1508 if (_aiowait_flag) { 1509 sig_mutex_unlock(&__aio_mutex); 1510 (void) _kaio(AIONOTIFY); 1511 } else { 1512 sig_mutex_unlock(&__aio_mutex); 1513 if (_sigio_enabled) 1514 (void) kill(__pid, SIGIO); 1515 } 1516 } 1517 1518 /* 1519 * The done queue consists of AIO requests that are in either the 1520 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 1521 * are discarded. If the done queue is empty then NULL is returned. 1522 * Otherwise the address of a done aio_result_t is returned. 1523 */ 1524 aio_result_t * 1525 _aio_req_done(void) 1526 { 1527 aio_req_t *reqp; 1528 aio_result_t *resultp; 1529 1530 ASSERT(MUTEX_HELD(&__aio_mutex)); 1531 1532 if ((reqp = _aio_done_tail) != NULL) { 1533 if ((_aio_done_tail = reqp->req_next) == NULL) 1534 _aio_done_head = NULL; 1535 ASSERT(_aio_donecnt > 0); 1536 _aio_donecnt--; 1537 (void) _aio_hash_del(reqp->req_resultp); 1538 resultp = reqp->req_resultp; 1539 ASSERT(reqp->req_state == AIO_REQ_DONE); 1540 _aio_req_free(reqp); 1541 return (resultp); 1542 } 1543 /* is queue empty? */ 1544 if (reqp == NULL && _aio_outstand_cnt == 0) { 1545 return ((aio_result_t *)-1); 1546 } 1547 return (NULL); 1548 } 1549 1550 /* 1551 * Set the return and errno values for the application's use. 1552 * 1553 * For the Posix interfaces, we must set the return value first followed 1554 * by the errno value because the Posix interfaces allow for a change 1555 * in the errno value from EINPROGRESS to something else to signal 1556 * the completion of the asynchronous request. 1557 * 1558 * The opposite is true for the Solaris interfaces. These allow for 1559 * a change in the return value from AIO_INPROGRESS to something else 1560 * to signal the completion of the asynchronous request. 1561 */ 1562 void 1563 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 1564 { 1565 aio_result_t *resultp = reqp->req_resultp; 1566 1567 if (POSIX_AIO(reqp)) { 1568 resultp->aio_return = retval; 1569 membar_producer(); 1570 resultp->aio_errno = error; 1571 } else { 1572 resultp->aio_errno = error; 1573 membar_producer(); 1574 resultp->aio_return = retval; 1575 } 1576 } 1577 1578 /* 1579 * Add an AIO request onto the next work queue. 1580 * A circular list of workers is used to choose the next worker. 1581 */ 1582 void 1583 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 1584 { 1585 ulwp_t *self = curthread; 1586 aio_worker_t *aiowp; 1587 aio_worker_t *first; 1588 int load_bal_flg = 1; 1589 int found; 1590 1591 ASSERT(reqp->req_state != AIO_REQ_DONEQ); 1592 reqp->req_next = NULL; 1593 /* 1594 * Try to acquire the next worker's work queue. If it is locked, 1595 * then search the list of workers until a queue is found unlocked, 1596 * or until the list is completely traversed at which point another 1597 * worker will be created. 1598 */ 1599 sigoff(self); /* defer SIGIO */ 1600 sig_mutex_lock(&__aio_mutex); 1601 first = aiowp = *nextworker; 1602 if (mode != AIONOTIFY) 1603 _aio_outstand_cnt++; 1604 sig_mutex_unlock(&__aio_mutex); 1605 1606 switch (mode) { 1607 case AIOREAD: 1608 case AIOWRITE: 1609 case AIOAREAD: 1610 case AIOAWRITE: 1611 #if !defined(_LP64) 1612 case AIOAREAD64: 1613 case AIOAWRITE64: 1614 #endif 1615 /* try to find an idle worker */ 1616 found = 0; 1617 do { 1618 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1619 if (aiowp->work_idleflg) { 1620 found = 1; 1621 break; 1622 } 1623 sig_mutex_unlock(&aiowp->work_qlock1); 1624 } 1625 } while ((aiowp = aiowp->work_forw) != first); 1626 1627 if (found) { 1628 aiowp->work_minload1++; 1629 break; 1630 } 1631 1632 /* try to acquire some worker's queue lock */ 1633 do { 1634 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 1635 found = 1; 1636 break; 1637 } 1638 } while ((aiowp = aiowp->work_forw) != first); 1639 1640 /* 1641 * Create more workers when the workers appear overloaded. 1642 * Either all the workers are busy draining their queues 1643 * or no worker's queue lock could be acquired. 1644 */ 1645 if (!found) { 1646 if (_aio_worker_cnt < _max_workers) { 1647 if (_aio_create_worker(reqp, mode)) 1648 aio_panic("_aio_req_add: add worker"); 1649 sigon(self); /* reenable SIGIO */ 1650 return; 1651 } 1652 1653 /* 1654 * No worker available and we have created 1655 * _max_workers, keep going through the 1656 * list slowly until we get a lock 1657 */ 1658 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 1659 /* 1660 * give someone else a chance 1661 */ 1662 _aio_delay(1); 1663 aiowp = aiowp->work_forw; 1664 } 1665 } 1666 1667 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1668 if (_aio_worker_cnt < _max_workers && 1669 aiowp->work_minload1 >= _minworkload) { 1670 sig_mutex_unlock(&aiowp->work_qlock1); 1671 sig_mutex_lock(&__aio_mutex); 1672 *nextworker = aiowp->work_forw; 1673 sig_mutex_unlock(&__aio_mutex); 1674 if (_aio_create_worker(reqp, mode)) 1675 aio_panic("aio_req_add: add worker"); 1676 sigon(self); /* reenable SIGIO */ 1677 return; 1678 } 1679 aiowp->work_minload1++; 1680 break; 1681 case AIOFSYNC: 1682 case AIONOTIFY: 1683 load_bal_flg = 0; 1684 sig_mutex_lock(&aiowp->work_qlock1); 1685 break; 1686 default: 1687 aio_panic("_aio_req_add: invalid mode"); 1688 break; 1689 } 1690 /* 1691 * Put request onto worker's work queue. 1692 */ 1693 if (aiowp->work_tail1 == NULL) { 1694 ASSERT(aiowp->work_count1 == 0); 1695 aiowp->work_tail1 = reqp; 1696 aiowp->work_next1 = reqp; 1697 } else { 1698 aiowp->work_head1->req_next = reqp; 1699 if (aiowp->work_next1 == NULL) 1700 aiowp->work_next1 = reqp; 1701 } 1702 reqp->req_state = AIO_REQ_QUEUED; 1703 reqp->req_worker = aiowp; 1704 aiowp->work_head1 = reqp; 1705 /* 1706 * Awaken worker if it is not currently active. 1707 */ 1708 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 1709 aiowp->work_idleflg = 0; 1710 (void) cond_signal(&aiowp->work_idle_cv); 1711 } 1712 sig_mutex_unlock(&aiowp->work_qlock1); 1713 1714 if (load_bal_flg) { 1715 sig_mutex_lock(&__aio_mutex); 1716 *nextworker = aiowp->work_forw; 1717 sig_mutex_unlock(&__aio_mutex); 1718 } 1719 sigon(self); /* reenable SIGIO */ 1720 } 1721 1722 /* 1723 * Get an AIO request for a specified worker. 1724 * If the work queue is empty, return NULL. 1725 */ 1726 aio_req_t * 1727 _aio_req_get(aio_worker_t *aiowp) 1728 { 1729 aio_req_t *reqp; 1730 1731 sig_mutex_lock(&aiowp->work_qlock1); 1732 if ((reqp = aiowp->work_next1) != NULL) { 1733 /* 1734 * Remove a POSIX request from the queue; the 1735 * request queue is a singularly linked list 1736 * with a previous pointer. The request is 1737 * removed by updating the previous pointer. 1738 * 1739 * Non-posix requests are left on the queue 1740 * to eventually be placed on the done queue. 1741 */ 1742 1743 if (POSIX_AIO(reqp)) { 1744 if (aiowp->work_prev1 == NULL) { 1745 aiowp->work_tail1 = reqp->req_next; 1746 if (aiowp->work_tail1 == NULL) 1747 aiowp->work_head1 = NULL; 1748 } else { 1749 aiowp->work_prev1->req_next = reqp->req_next; 1750 if (aiowp->work_head1 == reqp) 1751 aiowp->work_head1 = reqp->req_next; 1752 } 1753 1754 } else { 1755 aiowp->work_prev1 = reqp; 1756 ASSERT(aiowp->work_done1 >= 0); 1757 aiowp->work_done1++; 1758 } 1759 ASSERT(reqp != reqp->req_next); 1760 aiowp->work_next1 = reqp->req_next; 1761 ASSERT(aiowp->work_count1 >= 1); 1762 aiowp->work_count1--; 1763 switch (reqp->req_op) { 1764 case AIOREAD: 1765 case AIOWRITE: 1766 case AIOAREAD: 1767 case AIOAWRITE: 1768 #if !defined(_LP64) 1769 case AIOAREAD64: 1770 case AIOAWRITE64: 1771 #endif 1772 ASSERT(aiowp->work_minload1 > 0); 1773 aiowp->work_minload1--; 1774 break; 1775 } 1776 reqp->req_state = AIO_REQ_INPROGRESS; 1777 } 1778 aiowp->work_req = reqp; 1779 ASSERT(reqp != NULL || aiowp->work_count1 == 0); 1780 sig_mutex_unlock(&aiowp->work_qlock1); 1781 return (reqp); 1782 } 1783 1784 static void 1785 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 1786 { 1787 aio_req_t **last; 1788 aio_req_t *lastrp; 1789 aio_req_t *next; 1790 1791 ASSERT(aiowp != NULL); 1792 ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 1793 if (POSIX_AIO(reqp)) { 1794 if (ostate != AIO_REQ_QUEUED) 1795 return; 1796 } 1797 last = &aiowp->work_tail1; 1798 lastrp = aiowp->work_tail1; 1799 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 1800 while ((next = *last) != NULL) { 1801 if (next == reqp) { 1802 *last = next->req_next; 1803 if (aiowp->work_next1 == next) 1804 aiowp->work_next1 = next->req_next; 1805 1806 if ((next->req_next != NULL) || 1807 (aiowp->work_done1 == 0)) { 1808 if (aiowp->work_head1 == next) 1809 aiowp->work_head1 = next->req_next; 1810 if (aiowp->work_prev1 == next) 1811 aiowp->work_prev1 = next->req_next; 1812 } else { 1813 if (aiowp->work_head1 == next) 1814 aiowp->work_head1 = lastrp; 1815 if (aiowp->work_prev1 == next) 1816 aiowp->work_prev1 = lastrp; 1817 } 1818 1819 if (ostate == AIO_REQ_QUEUED) { 1820 ASSERT(aiowp->work_count1 >= 1); 1821 aiowp->work_count1--; 1822 ASSERT(aiowp->work_minload1 >= 1); 1823 aiowp->work_minload1--; 1824 } else { 1825 ASSERT(ostate == AIO_REQ_INPROGRESS && 1826 !POSIX_AIO(reqp)); 1827 aiowp->work_done1--; 1828 } 1829 return; 1830 } 1831 last = &next->req_next; 1832 lastrp = next; 1833 } 1834 /* NOTREACHED */ 1835 } 1836 1837 static void 1838 _aio_enq_doneq(aio_req_t *reqp) 1839 { 1840 if (_aio_doneq == NULL) { 1841 _aio_doneq = reqp; 1842 reqp->req_next = reqp->req_prev = reqp; 1843 } else { 1844 reqp->req_next = _aio_doneq; 1845 reqp->req_prev = _aio_doneq->req_prev; 1846 _aio_doneq->req_prev->req_next = reqp; 1847 _aio_doneq->req_prev = reqp; 1848 } 1849 reqp->req_state = AIO_REQ_DONEQ; 1850 _aio_doneq_cnt++; 1851 } 1852 1853 /* 1854 * caller owns the _aio_mutex 1855 */ 1856 aio_req_t * 1857 _aio_req_remove(aio_req_t *reqp) 1858 { 1859 if (reqp && reqp->req_state != AIO_REQ_DONEQ) 1860 return (NULL); 1861 1862 if (reqp) { 1863 /* request in done queue */ 1864 if (_aio_doneq == reqp) 1865 _aio_doneq = reqp->req_next; 1866 if (_aio_doneq == reqp) { 1867 /* only one request on queue */ 1868 _aio_doneq = NULL; 1869 } else { 1870 aio_req_t *tmp = reqp->req_next; 1871 reqp->req_prev->req_next = tmp; 1872 tmp->req_prev = reqp->req_prev; 1873 } 1874 } else if ((reqp = _aio_doneq) != NULL) { 1875 if (reqp == reqp->req_next) { 1876 /* only one request on queue */ 1877 _aio_doneq = NULL; 1878 } else { 1879 reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 1880 _aio_doneq->req_prev = reqp->req_prev; 1881 } 1882 } 1883 if (reqp) { 1884 _aio_doneq_cnt--; 1885 reqp->req_next = reqp->req_prev = reqp; 1886 reqp->req_state = AIO_REQ_DONE; 1887 } 1888 return (reqp); 1889 } 1890 1891 /* 1892 * An AIO request is identified by an aio_result_t pointer. The library 1893 * maps this aio_result_t pointer to its internal representation using a 1894 * hash table. This function adds an aio_result_t pointer to the hash table. 1895 */ 1896 static int 1897 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 1898 { 1899 aio_hash_t *hashp; 1900 aio_req_t **prev; 1901 aio_req_t *next; 1902 1903 hashp = _aio_hash + AIOHASH(resultp); 1904 lmutex_lock(&hashp->hash_lock); 1905 prev = &hashp->hash_ptr; 1906 while ((next = *prev) != NULL) { 1907 if (resultp == next->req_resultp) { 1908 lmutex_unlock(&hashp->hash_lock); 1909 return (-1); 1910 } 1911 prev = &next->req_link; 1912 } 1913 *prev = reqp; 1914 ASSERT(reqp->req_link == NULL); 1915 lmutex_unlock(&hashp->hash_lock); 1916 return (0); 1917 } 1918 1919 /* 1920 * Remove an entry from the hash table. 1921 */ 1922 aio_req_t * 1923 _aio_hash_del(aio_result_t *resultp) 1924 { 1925 aio_hash_t *hashp; 1926 aio_req_t **prev; 1927 aio_req_t *next = NULL; 1928 1929 if (_aio_hash != NULL) { 1930 hashp = _aio_hash + AIOHASH(resultp); 1931 lmutex_lock(&hashp->hash_lock); 1932 prev = &hashp->hash_ptr; 1933 while ((next = *prev) != NULL) { 1934 if (resultp == next->req_resultp) { 1935 *prev = next->req_link; 1936 next->req_link = NULL; 1937 break; 1938 } 1939 prev = &next->req_link; 1940 } 1941 lmutex_unlock(&hashp->hash_lock); 1942 } 1943 return (next); 1944 } 1945 1946 /* 1947 * find an entry in the hash table 1948 */ 1949 aio_req_t * 1950 _aio_hash_find(aio_result_t *resultp) 1951 { 1952 aio_hash_t *hashp; 1953 aio_req_t **prev; 1954 aio_req_t *next = NULL; 1955 1956 if (_aio_hash != NULL) { 1957 hashp = _aio_hash + AIOHASH(resultp); 1958 lmutex_lock(&hashp->hash_lock); 1959 prev = &hashp->hash_ptr; 1960 while ((next = *prev) != NULL) { 1961 if (resultp == next->req_resultp) 1962 break; 1963 prev = &next->req_link; 1964 } 1965 lmutex_unlock(&hashp->hash_lock); 1966 } 1967 return (next); 1968 } 1969 1970 /* 1971 * AIO interface for POSIX 1972 */ 1973 int 1974 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 1975 int mode, int flg) 1976 { 1977 aio_req_t *reqp; 1978 aio_args_t *ap; 1979 int kerr; 1980 1981 if (aiocbp == NULL) { 1982 errno = EINVAL; 1983 return (-1); 1984 } 1985 1986 /* initialize kaio */ 1987 if (!_kaio_ok) 1988 _kaio_init(); 1989 1990 aiocbp->aio_state = NOCHECK; 1991 1992 /* 1993 * If we have been called because a list I/O 1994 * kaio() failed, we dont want to repeat the 1995 * system call 1996 */ 1997 1998 if (flg & AIO_KAIO) { 1999 /* 2000 * Try kernel aio first. 2001 * If errno is ENOTSUP/EBADFD, 2002 * fall back to the thread implementation. 2003 */ 2004 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2005 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2006 aiocbp->aio_state = CHECK; 2007 kerr = (int)_kaio(mode, aiocbp); 2008 if (kerr == 0) 2009 return (0); 2010 if (errno != ENOTSUP && errno != EBADFD) { 2011 aiocbp->aio_resultp.aio_errno = errno; 2012 aiocbp->aio_resultp.aio_return = -1; 2013 aiocbp->aio_state = NOCHECK; 2014 return (-1); 2015 } 2016 if (errno == EBADFD) 2017 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2018 } 2019 } 2020 2021 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2022 aiocbp->aio_state = USERAIO; 2023 2024 if (!__uaio_ok && __uaio_init() == -1) 2025 return (-1); 2026 2027 if ((reqp = _aio_req_alloc()) == NULL) { 2028 errno = EAGAIN; 2029 return (-1); 2030 } 2031 2032 /* 2033 * If an LIO request, add the list head to the aio request 2034 */ 2035 reqp->req_head = lio_head; 2036 reqp->req_type = AIO_POSIX_REQ; 2037 reqp->req_op = mode; 2038 reqp->req_largefile = 0; 2039 2040 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2041 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2042 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2043 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2044 reqp->req_sigevent.sigev_signo = 2045 aiocbp->aio_sigevent.sigev_signo; 2046 reqp->req_sigevent.sigev_value.sival_ptr = 2047 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2048 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2049 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2050 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2051 /* 2052 * Reuse the sigevent structure to contain the port number 2053 * and the user value. Same for SIGEV_THREAD, below. 2054 */ 2055 reqp->req_sigevent.sigev_signo = 2056 pn->portnfy_port; 2057 reqp->req_sigevent.sigev_value.sival_ptr = 2058 pn->portnfy_user; 2059 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2060 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2061 /* 2062 * The sigevent structure contains the port number 2063 * and the user value. Same for SIGEV_PORT, above. 2064 */ 2065 reqp->req_sigevent.sigev_signo = 2066 aiocbp->aio_sigevent.sigev_signo; 2067 reqp->req_sigevent.sigev_value.sival_ptr = 2068 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2069 } 2070 2071 reqp->req_resultp = &aiocbp->aio_resultp; 2072 reqp->req_aiocbp = aiocbp; 2073 ap = &reqp->req_args; 2074 ap->fd = aiocbp->aio_fildes; 2075 ap->buf = (caddr_t)aiocbp->aio_buf; 2076 ap->bufsz = aiocbp->aio_nbytes; 2077 ap->offset = aiocbp->aio_offset; 2078 2079 if ((flg & AIO_NO_DUPS) && 2080 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2081 aio_panic("_aio_rw(): request already in hash table"); 2082 _aio_req_free(reqp); 2083 errno = EINVAL; 2084 return (-1); 2085 } 2086 _aio_req_add(reqp, nextworker, mode); 2087 return (0); 2088 } 2089 2090 #if !defined(_LP64) 2091 /* 2092 * 64-bit AIO interface for POSIX 2093 */ 2094 int 2095 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 2096 int mode, int flg) 2097 { 2098 aio_req_t *reqp; 2099 aio_args_t *ap; 2100 int kerr; 2101 2102 if (aiocbp == NULL) { 2103 errno = EINVAL; 2104 return (-1); 2105 } 2106 2107 /* initialize kaio */ 2108 if (!_kaio_ok) 2109 _kaio_init(); 2110 2111 aiocbp->aio_state = NOCHECK; 2112 2113 /* 2114 * If we have been called because a list I/O 2115 * kaio() failed, we dont want to repeat the 2116 * system call 2117 */ 2118 2119 if (flg & AIO_KAIO) { 2120 /* 2121 * Try kernel aio first. 2122 * If errno is ENOTSUP/EBADFD, 2123 * fall back to the thread implementation. 2124 */ 2125 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 2126 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2127 aiocbp->aio_state = CHECK; 2128 kerr = (int)_kaio(mode, aiocbp); 2129 if (kerr == 0) 2130 return (0); 2131 if (errno != ENOTSUP && errno != EBADFD) { 2132 aiocbp->aio_resultp.aio_errno = errno; 2133 aiocbp->aio_resultp.aio_return = -1; 2134 aiocbp->aio_state = NOCHECK; 2135 return (-1); 2136 } 2137 if (errno == EBADFD) 2138 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 2139 } 2140 } 2141 2142 aiocbp->aio_resultp.aio_errno = EINPROGRESS; 2143 aiocbp->aio_state = USERAIO; 2144 2145 if (!__uaio_ok && __uaio_init() == -1) 2146 return (-1); 2147 2148 if ((reqp = _aio_req_alloc()) == NULL) { 2149 errno = EAGAIN; 2150 return (-1); 2151 } 2152 2153 /* 2154 * If an LIO request, add the list head to the aio request 2155 */ 2156 reqp->req_head = lio_head; 2157 reqp->req_type = AIO_POSIX_REQ; 2158 reqp->req_op = mode; 2159 reqp->req_largefile = 1; 2160 2161 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 2162 reqp->req_sigevent.sigev_notify = SIGEV_NONE; 2163 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 2164 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 2165 reqp->req_sigevent.sigev_signo = 2166 aiocbp->aio_sigevent.sigev_signo; 2167 reqp->req_sigevent.sigev_value.sival_ptr = 2168 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2169 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 2170 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 2171 reqp->req_sigevent.sigev_notify = SIGEV_PORT; 2172 reqp->req_sigevent.sigev_signo = 2173 pn->portnfy_port; 2174 reqp->req_sigevent.sigev_value.sival_ptr = 2175 pn->portnfy_user; 2176 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 2177 reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 2178 reqp->req_sigevent.sigev_signo = 2179 aiocbp->aio_sigevent.sigev_signo; 2180 reqp->req_sigevent.sigev_value.sival_ptr = 2181 aiocbp->aio_sigevent.sigev_value.sival_ptr; 2182 } 2183 2184 reqp->req_resultp = &aiocbp->aio_resultp; 2185 reqp->req_aiocbp = aiocbp; 2186 ap = &reqp->req_args; 2187 ap->fd = aiocbp->aio_fildes; 2188 ap->buf = (caddr_t)aiocbp->aio_buf; 2189 ap->bufsz = aiocbp->aio_nbytes; 2190 ap->offset = aiocbp->aio_offset; 2191 2192 if ((flg & AIO_NO_DUPS) && 2193 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 2194 aio_panic("_aio_rw64(): request already in hash table"); 2195 _aio_req_free(reqp); 2196 errno = EINVAL; 2197 return (-1); 2198 } 2199 _aio_req_add(reqp, nextworker, mode); 2200 return (0); 2201 } 2202 #endif /* !defined(_LP64) */ 2203