1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #pragma ident "%Z%%M% %I% %E% SMI"
28
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43
44 extern void _aio_lio_free(aio_lio_t *);
45
46 extern int __fdsync(int, int);
47 extern int __fcntl(int, int, ...);
48 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
49
50 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
51 static void _aiodone(aio_req_t *, ssize_t, int);
52 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
53 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
54
55 /*
56 * switch for kernel async I/O
57 */
58 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */
59
60 /*
61 * Key for thread-specific data
62 */
63 pthread_key_t _aio_key;
64
65 /*
66 * Array for determining whether or not a file supports kaio.
67 * Initialized in _kaio_init().
68 */
69 uint32_t *_kaio_supported = NULL;
70
71 /*
72 * workers for read/write requests
73 * (__aio_mutex lock protects circular linked list of workers)
74 */
75 aio_worker_t *__workers_rw; /* circular list of AIO workers */
76 aio_worker_t *__nextworker_rw; /* next worker in list of workers */
77 int __rw_workerscnt; /* number of read/write workers */
78
79 /*
80 * worker for notification requests.
81 */
82 aio_worker_t *__workers_no; /* circular list of AIO workers */
83 aio_worker_t *__nextworker_no; /* next worker in list of workers */
84 int __no_workerscnt; /* number of write workers */
85
86 aio_req_t *_aio_done_tail; /* list of done requests */
87 aio_req_t *_aio_done_head;
88
89 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */
90 cond_t __aio_initcv = DEFAULTCV;
91 int __aio_initbusy = 0;
92
93 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */
94 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */
95
96 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */
97 int _sigio_enabled = 0; /* when set, send SIGIO signal */
98
99 aio_hash_t *_aio_hash;
100
101 aio_req_t *_aio_doneq; /* double linked done queue list */
102
103 int _aio_donecnt = 0;
104 int _aio_waitncnt = 0; /* # of requests for aio_waitn */
105 int _aio_doneq_cnt = 0;
106 int _aio_outstand_cnt = 0; /* # of outstanding requests */
107 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */
108 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */
109 int _aio_kernel_suspend = 0; /* active kernel kaio calls */
110 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */
111
112 int _max_workers = 256; /* max number of workers permitted */
113 int _min_workers = 4; /* min number of workers */
114 int _minworkload = 2; /* min number of request in q */
115 int _aio_worker_cnt = 0; /* number of workers to do requests */
116 int __uaio_ok = 0; /* AIO has been enabled */
117 sigset_t _worker_set; /* worker's signal mask */
118
119 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */
120 int _aio_flags = 0; /* see asyncio.h defines for */
121
122 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */
123
124 int hz; /* clock ticks per second */
125
126 static int
_kaio_supported_init(void)127 _kaio_supported_init(void)
128 {
129 void *ptr;
130 size_t size;
131
132 if (_kaio_supported != NULL) /* already initialized */
133 return (0);
134
135 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
136 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
137 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
138 if (ptr == MAP_FAILED)
139 return (-1);
140 _kaio_supported = ptr;
141 return (0);
142 }
143
144 /*
145 * The aio subsystem is initialized when an AIO request is made.
146 * Constants are initialized like the max number of workers that
147 * the subsystem can create, and the minimum number of workers
148 * permitted before imposing some restrictions. Also, some
149 * workers are created.
150 */
151 int
__uaio_init(void)152 __uaio_init(void)
153 {
154 int ret = -1;
155 int i;
156 int cancel_state;
157
158 lmutex_lock(&__aio_initlock);
159 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
160 while (__aio_initbusy)
161 (void) cond_wait(&__aio_initcv, &__aio_initlock);
162 (void) pthread_setcancelstate(cancel_state, NULL);
163 if (__uaio_ok) { /* already initialized */
164 lmutex_unlock(&__aio_initlock);
165 return (0);
166 }
167 __aio_initbusy = 1;
168 lmutex_unlock(&__aio_initlock);
169
170 hz = (int)sysconf(_SC_CLK_TCK);
171 __pid = getpid();
172
173 setup_cancelsig(SIGAIOCANCEL);
174
175 if (_kaio_supported_init() != 0)
176 goto out;
177
178 /*
179 * Allocate and initialize the hash table.
180 * Do this only once, even if __uaio_init() is called twice.
181 */
182 if (_aio_hash == NULL) {
183 /* LINTED pointer cast */
184 _aio_hash = (aio_hash_t *)mmap(NULL,
185 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
186 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
187 if ((void *)_aio_hash == MAP_FAILED) {
188 _aio_hash = NULL;
189 goto out;
190 }
191 for (i = 0; i < HASHSZ; i++)
192 (void) mutex_init(&_aio_hash[i].hash_lock,
193 USYNC_THREAD, NULL);
194 }
195
196 /*
197 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
198 */
199 (void) sigfillset(&_worker_set);
200 (void) sigdelset(&_worker_set, SIGAIOCANCEL);
201
202 /*
203 * Create one worker to send asynchronous notifications.
204 * Do this only once, even if __uaio_init() is called twice.
205 */
206 if (__no_workerscnt == 0 &&
207 (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
208 errno = EAGAIN;
209 goto out;
210 }
211
212 /*
213 * Create the minimum number of read/write workers.
214 * And later check whether atleast one worker is created;
215 * lwp_create() calls could fail because of segkp exhaustion.
216 */
217 for (i = 0; i < _min_workers; i++)
218 (void) _aio_create_worker(NULL, AIOREAD);
219 if (__rw_workerscnt == 0) {
220 errno = EAGAIN;
221 goto out;
222 }
223
224 ret = 0;
225 out:
226 lmutex_lock(&__aio_initlock);
227 if (ret == 0)
228 __uaio_ok = 1;
229 __aio_initbusy = 0;
230 (void) cond_broadcast(&__aio_initcv);
231 lmutex_unlock(&__aio_initlock);
232 return (ret);
233 }
234
235 /*
236 * Called from close() before actually performing the real _close().
237 */
238 void
_aio_close(int fd)239 _aio_close(int fd)
240 {
241 if (fd < 0) /* avoid cancelling everything */
242 return;
243 /*
244 * Cancel all outstanding aio requests for this file descriptor.
245 */
246 if (__uaio_ok)
247 (void) aiocancel_all(fd);
248 /*
249 * If we have allocated the bit array, clear the bit for this file.
250 * The next open may re-use this file descriptor and the new file
251 * may have different kaio() behaviour.
252 */
253 if (_kaio_supported != NULL)
254 CLEAR_KAIO_SUPPORTED(fd);
255 }
256
257 /*
258 * special kaio cleanup thread sits in a loop in the
259 * kernel waiting for pending kaio requests to complete.
260 */
261 void *
_kaio_cleanup_thread(void * arg)262 _kaio_cleanup_thread(void *arg)
263 {
264 if (pthread_setspecific(_aio_key, arg) != 0)
265 aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
266 (void) _kaio(AIOSTART);
267 return (arg);
268 }
269
270 /*
271 * initialize kaio.
272 */
273 void
_kaio_init()274 _kaio_init()
275 {
276 int error;
277 sigset_t oset;
278 int cancel_state;
279
280 lmutex_lock(&__aio_initlock);
281 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
282 while (__aio_initbusy)
283 (void) cond_wait(&__aio_initcv, &__aio_initlock);
284 (void) pthread_setcancelstate(cancel_state, NULL);
285 if (_kaio_ok) { /* already initialized */
286 lmutex_unlock(&__aio_initlock);
287 return;
288 }
289 __aio_initbusy = 1;
290 lmutex_unlock(&__aio_initlock);
291
292 if (_kaio_supported_init() != 0)
293 error = ENOMEM;
294 else if ((_kaiowp = _aio_worker_alloc()) == NULL)
295 error = ENOMEM;
296 else if ((error = (int)_kaio(AIOINIT)) == 0) {
297 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
298 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
299 _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
300 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
301 }
302 if (error && _kaiowp != NULL) {
303 _aio_worker_free(_kaiowp);
304 _kaiowp = NULL;
305 }
306
307 lmutex_lock(&__aio_initlock);
308 if (error)
309 _kaio_ok = -1;
310 else
311 _kaio_ok = 1;
312 __aio_initbusy = 0;
313 (void) cond_broadcast(&__aio_initcv);
314 lmutex_unlock(&__aio_initlock);
315 }
316
317 int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)318 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
319 aio_result_t *resultp)
320 {
321 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
322 }
323
324 int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)325 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
326 aio_result_t *resultp)
327 {
328 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
329 }
330
331 #if !defined(_LP64)
332 int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)333 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
334 aio_result_t *resultp)
335 {
336 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
337 }
338
339 int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)340 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
341 aio_result_t *resultp)
342 {
343 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
344 }
345 #endif /* !defined(_LP64) */
346
347 int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)348 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
349 aio_result_t *resultp, int mode)
350 {
351 aio_req_t *reqp;
352 aio_args_t *ap;
353 offset_t loffset;
354 struct stat64 stat64;
355 int error = 0;
356 int kerr;
357 int umode;
358
359 switch (whence) {
360
361 case SEEK_SET:
362 loffset = offset;
363 break;
364 case SEEK_CUR:
365 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
366 error = -1;
367 else
368 loffset += offset;
369 break;
370 case SEEK_END:
371 if (fstat64(fd, &stat64) == -1)
372 error = -1;
373 else
374 loffset = offset + stat64.st_size;
375 break;
376 default:
377 errno = EINVAL;
378 error = -1;
379 }
380
381 if (error)
382 return (error);
383
384 /* initialize kaio */
385 if (!_kaio_ok)
386 _kaio_init();
387
388 /*
389 * _aio_do_request() needs the original request code (mode) to be able
390 * to choose the appropiate 32/64 bit function. All other functions
391 * only require the difference between READ and WRITE (umode).
392 */
393 if (mode == AIOAREAD64 || mode == AIOAWRITE64)
394 umode = mode - AIOAREAD64;
395 else
396 umode = mode;
397
398 /*
399 * Try kernel aio first.
400 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
401 */
402 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
403 resultp->aio_errno = 0;
404 sig_mutex_lock(&__aio_mutex);
405 _kaio_outstand_cnt++;
406 sig_mutex_unlock(&__aio_mutex);
407 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
408 (umode | AIO_POLL_BIT) : umode),
409 fd, buf, bufsz, loffset, resultp);
410 if (kerr == 0) {
411 return (0);
412 }
413 sig_mutex_lock(&__aio_mutex);
414 _kaio_outstand_cnt--;
415 sig_mutex_unlock(&__aio_mutex);
416 if (errno != ENOTSUP && errno != EBADFD)
417 return (-1);
418 if (errno == EBADFD)
419 SET_KAIO_NOT_SUPPORTED(fd);
420 }
421
422 if (!__uaio_ok && __uaio_init() == -1)
423 return (-1);
424
425 if ((reqp = _aio_req_alloc()) == NULL) {
426 errno = EAGAIN;
427 return (-1);
428 }
429
430 /*
431 * _aio_do_request() checks reqp->req_op to differentiate
432 * between 32 and 64 bit access.
433 */
434 reqp->req_op = mode;
435 reqp->req_resultp = resultp;
436 ap = &reqp->req_args;
437 ap->fd = fd;
438 ap->buf = buf;
439 ap->bufsz = bufsz;
440 ap->offset = loffset;
441
442 if (_aio_hash_insert(resultp, reqp) != 0) {
443 _aio_req_free(reqp);
444 errno = EINVAL;
445 return (-1);
446 }
447 /*
448 * _aio_req_add() only needs the difference between READ and
449 * WRITE to choose the right worker queue.
450 */
451 _aio_req_add(reqp, &__nextworker_rw, umode);
452 return (0);
453 }
454
455 int
aiocancel(aio_result_t * resultp)456 aiocancel(aio_result_t *resultp)
457 {
458 aio_req_t *reqp;
459 aio_worker_t *aiowp;
460 int ret;
461 int done = 0;
462 int canceled = 0;
463
464 if (!__uaio_ok) {
465 errno = EINVAL;
466 return (-1);
467 }
468
469 sig_mutex_lock(&__aio_mutex);
470 reqp = _aio_hash_find(resultp);
471 if (reqp == NULL) {
472 if (_aio_outstand_cnt == _aio_req_done_cnt)
473 errno = EINVAL;
474 else
475 errno = EACCES;
476 ret = -1;
477 } else {
478 aiowp = reqp->req_worker;
479 sig_mutex_lock(&aiowp->work_qlock1);
480 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
481 sig_mutex_unlock(&aiowp->work_qlock1);
482
483 if (canceled) {
484 ret = 0;
485 } else {
486 if (_aio_outstand_cnt == 0 ||
487 _aio_outstand_cnt == _aio_req_done_cnt)
488 errno = EINVAL;
489 else
490 errno = EACCES;
491 ret = -1;
492 }
493 }
494 sig_mutex_unlock(&__aio_mutex);
495 return (ret);
496 }
497
498 /* ARGSUSED */
499 static void
_aiowait_cleanup(void * arg)500 _aiowait_cleanup(void *arg)
501 {
502 sig_mutex_lock(&__aio_mutex);
503 _aiowait_flag--;
504 sig_mutex_unlock(&__aio_mutex);
505 }
506
507 /*
508 * This must be asynch safe and cancel safe
509 */
510 aio_result_t *
aiowait(struct timeval * uwait)511 aiowait(struct timeval *uwait)
512 {
513 aio_result_t *uresultp;
514 aio_result_t *kresultp;
515 aio_result_t *resultp;
516 int dontblock;
517 int timedwait = 0;
518 int kaio_errno = 0;
519 struct timeval twait;
520 struct timeval *wait = NULL;
521 hrtime_t hrtend;
522 hrtime_t hres;
523
524 if (uwait) {
525 /*
526 * Check for a valid specified wait time.
527 * If it is invalid, fail the call right away.
528 */
529 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
530 uwait->tv_usec >= MICROSEC) {
531 errno = EINVAL;
532 return ((aio_result_t *)-1);
533 }
534
535 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
536 hrtend = gethrtime() +
537 (hrtime_t)uwait->tv_sec * NANOSEC +
538 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
539 twait = *uwait;
540 wait = &twait;
541 timedwait++;
542 } else {
543 /* polling */
544 sig_mutex_lock(&__aio_mutex);
545 if (_kaio_outstand_cnt == 0) {
546 kresultp = (aio_result_t *)-1;
547 } else {
548 kresultp = (aio_result_t *)_kaio(AIOWAIT,
549 (struct timeval *)-1, 1);
550 if (kresultp != (aio_result_t *)-1 &&
551 kresultp != NULL &&
552 kresultp != (aio_result_t *)1) {
553 _kaio_outstand_cnt--;
554 sig_mutex_unlock(&__aio_mutex);
555 return (kresultp);
556 }
557 }
558 uresultp = _aio_req_done();
559 sig_mutex_unlock(&__aio_mutex);
560 if (uresultp != NULL &&
561 uresultp != (aio_result_t *)-1) {
562 return (uresultp);
563 }
564 if (uresultp == (aio_result_t *)-1 &&
565 kresultp == (aio_result_t *)-1) {
566 errno = EINVAL;
567 return ((aio_result_t *)-1);
568 } else {
569 return (NULL);
570 }
571 }
572 }
573
574 for (;;) {
575 sig_mutex_lock(&__aio_mutex);
576 uresultp = _aio_req_done();
577 if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
578 sig_mutex_unlock(&__aio_mutex);
579 resultp = uresultp;
580 break;
581 }
582 _aiowait_flag++;
583 dontblock = (uresultp == (aio_result_t *)-1);
584 if (dontblock && _kaio_outstand_cnt == 0) {
585 kresultp = (aio_result_t *)-1;
586 kaio_errno = EINVAL;
587 } else {
588 sig_mutex_unlock(&__aio_mutex);
589 pthread_cleanup_push(_aiowait_cleanup, NULL);
590 _cancel_prologue();
591 kresultp = (aio_result_t *)_kaio(AIOWAIT,
592 wait, dontblock);
593 _cancel_epilogue();
594 pthread_cleanup_pop(0);
595 sig_mutex_lock(&__aio_mutex);
596 kaio_errno = errno;
597 }
598 _aiowait_flag--;
599 sig_mutex_unlock(&__aio_mutex);
600 if (kresultp == (aio_result_t *)1) {
601 /* aiowait() awakened by an aionotify() */
602 continue;
603 } else if (kresultp != NULL &&
604 kresultp != (aio_result_t *)-1) {
605 resultp = kresultp;
606 sig_mutex_lock(&__aio_mutex);
607 _kaio_outstand_cnt--;
608 sig_mutex_unlock(&__aio_mutex);
609 break;
610 } else if (kresultp == (aio_result_t *)-1 &&
611 kaio_errno == EINVAL &&
612 uresultp == (aio_result_t *)-1) {
613 errno = kaio_errno;
614 resultp = (aio_result_t *)-1;
615 break;
616 } else if (kresultp == (aio_result_t *)-1 &&
617 kaio_errno == EINTR) {
618 errno = kaio_errno;
619 resultp = (aio_result_t *)-1;
620 break;
621 } else if (timedwait) {
622 hres = hrtend - gethrtime();
623 if (hres <= 0) {
624 /* time is up; return */
625 resultp = NULL;
626 break;
627 } else {
628 /*
629 * Some time left. Round up the remaining time
630 * in nanoseconds to microsec. Retry the call.
631 */
632 hres += (NANOSEC / MICROSEC) - 1;
633 wait->tv_sec = hres / NANOSEC;
634 wait->tv_usec =
635 (hres % NANOSEC) / (NANOSEC / MICROSEC);
636 }
637 } else {
638 ASSERT(kresultp == NULL && uresultp == NULL);
639 resultp = NULL;
640 continue;
641 }
642 }
643 return (resultp);
644 }
645
646 /*
647 * _aio_get_timedelta calculates the remaining time and stores the result
648 * into timespec_t *wait.
649 */
650
651 int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)652 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
653 {
654 int ret = 0;
655 struct timeval cur;
656 timespec_t curtime;
657
658 (void) gettimeofday(&cur, NULL);
659 curtime.tv_sec = cur.tv_sec;
660 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */
661
662 if (end->tv_sec >= curtime.tv_sec) {
663 wait->tv_sec = end->tv_sec - curtime.tv_sec;
664 if (end->tv_nsec >= curtime.tv_nsec) {
665 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
666 if (wait->tv_sec == 0 && wait->tv_nsec == 0)
667 ret = -1; /* timer expired */
668 } else {
669 if (end->tv_sec > curtime.tv_sec) {
670 wait->tv_sec -= 1;
671 wait->tv_nsec = NANOSEC -
672 (curtime.tv_nsec - end->tv_nsec);
673 } else {
674 ret = -1; /* timer expired */
675 }
676 }
677 } else {
678 ret = -1;
679 }
680 return (ret);
681 }
682
683 /*
684 * If closing by file descriptor: we will simply cancel all the outstanding
685 * aio`s and return. Those aio's in question will have either noticed the
686 * cancellation notice before, during, or after initiating io.
687 */
688 int
aiocancel_all(int fd)689 aiocancel_all(int fd)
690 {
691 aio_req_t *reqp;
692 aio_req_t **reqpp, *last;
693 aio_worker_t *first;
694 aio_worker_t *next;
695 int canceled = 0;
696 int done = 0;
697 int cancelall = 0;
698
699 sig_mutex_lock(&__aio_mutex);
700
701 if (_aio_outstand_cnt == 0) {
702 sig_mutex_unlock(&__aio_mutex);
703 return (AIO_ALLDONE);
704 }
705
706 /*
707 * Cancel requests from the read/write workers' queues.
708 */
709 first = __nextworker_rw;
710 next = first;
711 do {
712 _aio_cancel_work(next, fd, &canceled, &done);
713 } while ((next = next->work_forw) != first);
714
715 /*
716 * finally, check if there are requests on the done queue that
717 * should be canceled.
718 */
719 if (fd < 0)
720 cancelall = 1;
721 reqpp = &_aio_done_tail;
722 last = _aio_done_tail;
723 while ((reqp = *reqpp) != NULL) {
724 if (cancelall || reqp->req_args.fd == fd) {
725 *reqpp = reqp->req_next;
726 if (last == reqp) {
727 last = reqp->req_next;
728 }
729 if (_aio_done_head == reqp) {
730 /* this should be the last req in list */
731 _aio_done_head = last;
732 }
733 _aio_donecnt--;
734 _aio_set_result(reqp, -1, ECANCELED);
735 (void) _aio_hash_del(reqp->req_resultp);
736 _aio_req_free(reqp);
737 } else {
738 reqpp = &reqp->req_next;
739 last = reqp;
740 }
741 }
742
743 if (cancelall) {
744 ASSERT(_aio_donecnt == 0);
745 _aio_done_head = NULL;
746 }
747 sig_mutex_unlock(&__aio_mutex);
748
749 if (canceled && done == 0)
750 return (AIO_CANCELED);
751 else if (done && canceled == 0)
752 return (AIO_ALLDONE);
753 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
754 return ((int)_kaio(AIOCANCEL, fd, NULL));
755 return (AIO_NOTCANCELED);
756 }
757
758 /*
759 * Cancel requests from a given work queue. If the file descriptor
760 * parameter, fd, is non-negative, then only cancel those requests
761 * in this queue that are to this file descriptor. If the fd
762 * parameter is -1, then cancel all requests.
763 */
764 static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)765 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
766 {
767 aio_req_t *reqp;
768
769 sig_mutex_lock(&aiowp->work_qlock1);
770 /*
771 * cancel queued requests first.
772 */
773 reqp = aiowp->work_tail1;
774 while (reqp != NULL) {
775 if (fd < 0 || reqp->req_args.fd == fd) {
776 if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
777 /*
778 * Callers locks were dropped.
779 * reqp is invalid; start traversing
780 * the list from the beginning again.
781 */
782 reqp = aiowp->work_tail1;
783 continue;
784 }
785 }
786 reqp = reqp->req_next;
787 }
788 /*
789 * Since the queued requests have been canceled, there can
790 * only be one inprogress request that should be canceled.
791 */
792 if ((reqp = aiowp->work_req) != NULL &&
793 (fd < 0 || reqp->req_args.fd == fd))
794 (void) _aio_cancel_req(aiowp, reqp, canceled, done);
795 sig_mutex_unlock(&aiowp->work_qlock1);
796 }
797
798 /*
799 * Cancel a request. Return 1 if the callers locks were temporarily
800 * dropped, otherwise return 0.
801 */
802 int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)803 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
804 {
805 int ostate = reqp->req_state;
806
807 ASSERT(MUTEX_HELD(&__aio_mutex));
808 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
809 if (ostate == AIO_REQ_CANCELED)
810 return (0);
811 if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
812 aiowp->work_prev1 == reqp) {
813 ASSERT(aiowp->work_done1 != 0);
814 /*
815 * If not on the done queue yet, just mark it CANCELED,
816 * _aio_work_done() will do the necessary clean up.
817 * This is required to ensure that aiocancel_all() cancels
818 * all the outstanding requests, including this one which
819 * is not yet on done queue but has been marked done.
820 */
821 _aio_set_result(reqp, -1, ECANCELED);
822 (void) _aio_hash_del(reqp->req_resultp);
823 reqp->req_state = AIO_REQ_CANCELED;
824 (*canceled)++;
825 return (0);
826 }
827
828 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
829 (*done)++;
830 return (0);
831 }
832 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
833 ASSERT(POSIX_AIO(reqp));
834 /* Cancel the queued aio_fsync() request */
835 if (!reqp->req_head->lio_canned) {
836 reqp->req_head->lio_canned = 1;
837 _aio_outstand_cnt--;
838 (*canceled)++;
839 }
840 return (0);
841 }
842 reqp->req_state = AIO_REQ_CANCELED;
843 _aio_req_del(aiowp, reqp, ostate);
844 (void) _aio_hash_del(reqp->req_resultp);
845 (*canceled)++;
846 if (reqp == aiowp->work_req) {
847 ASSERT(ostate == AIO_REQ_INPROGRESS);
848 /*
849 * Set the result values now, before _aiodone() is called.
850 * We do this because the application can expect aio_return
851 * and aio_errno to be set to -1 and ECANCELED, respectively,
852 * immediately after a successful return from aiocancel()
853 * or aio_cancel().
854 */
855 _aio_set_result(reqp, -1, ECANCELED);
856 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
857 return (0);
858 }
859 if (!POSIX_AIO(reqp)) {
860 _aio_outstand_cnt--;
861 _aio_set_result(reqp, -1, ECANCELED);
862 _aio_req_free(reqp);
863 return (0);
864 }
865 sig_mutex_unlock(&aiowp->work_qlock1);
866 sig_mutex_unlock(&__aio_mutex);
867 _aiodone(reqp, -1, ECANCELED);
868 sig_mutex_lock(&__aio_mutex);
869 sig_mutex_lock(&aiowp->work_qlock1);
870 return (1);
871 }
872
873 int
_aio_create_worker(aio_req_t * reqp,int mode)874 _aio_create_worker(aio_req_t *reqp, int mode)
875 {
876 aio_worker_t *aiowp, **workers, **nextworker;
877 int *aio_workerscnt;
878 void *(*func)(void *);
879 sigset_t oset;
880 int error;
881
882 /*
883 * Put the new worker thread in the right queue.
884 */
885 switch (mode) {
886 case AIOREAD:
887 case AIOWRITE:
888 case AIOAREAD:
889 case AIOAWRITE:
890 #if !defined(_LP64)
891 case AIOAREAD64:
892 case AIOAWRITE64:
893 #endif
894 workers = &__workers_rw;
895 nextworker = &__nextworker_rw;
896 aio_workerscnt = &__rw_workerscnt;
897 func = _aio_do_request;
898 break;
899 case AIONOTIFY:
900 workers = &__workers_no;
901 nextworker = &__nextworker_no;
902 func = _aio_do_notify;
903 aio_workerscnt = &__no_workerscnt;
904 break;
905 default:
906 aio_panic("_aio_create_worker: invalid mode");
907 break;
908 }
909
910 if ((aiowp = _aio_worker_alloc()) == NULL)
911 return (-1);
912
913 if (reqp) {
914 reqp->req_state = AIO_REQ_QUEUED;
915 reqp->req_worker = aiowp;
916 aiowp->work_head1 = reqp;
917 aiowp->work_tail1 = reqp;
918 aiowp->work_next1 = reqp;
919 aiowp->work_count1 = 1;
920 aiowp->work_minload1 = 1;
921 }
922
923 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
924 error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
925 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
926 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
927 if (error) {
928 if (reqp) {
929 reqp->req_state = 0;
930 reqp->req_worker = NULL;
931 }
932 _aio_worker_free(aiowp);
933 return (-1);
934 }
935
936 lmutex_lock(&__aio_mutex);
937 (*aio_workerscnt)++;
938 if (*workers == NULL) {
939 aiowp->work_forw = aiowp;
940 aiowp->work_backw = aiowp;
941 *nextworker = aiowp;
942 *workers = aiowp;
943 } else {
944 aiowp->work_backw = (*workers)->work_backw;
945 aiowp->work_forw = (*workers);
946 (*workers)->work_backw->work_forw = aiowp;
947 (*workers)->work_backw = aiowp;
948 }
949 _aio_worker_cnt++;
950 lmutex_unlock(&__aio_mutex);
951
952 (void) thr_continue(aiowp->work_tid);
953
954 return (0);
955 }
956
957 /*
958 * This is the worker's main routine.
959 * The task of this function is to execute all queued requests;
960 * once the last pending request is executed this function will block
961 * in _aio_idle(). A new incoming request must wakeup this thread to
962 * restart the work.
963 * Every worker has an own work queue. The queue lock is required
964 * to synchronize the addition of new requests for this worker or
965 * cancellation of pending/running requests.
966 *
967 * Cancellation scenarios:
968 * The cancellation of a request is being done asynchronously using
969 * _aio_cancel_req() from another thread context.
970 * A queued request can be cancelled in different manners :
971 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
972 * - lock the queue -> remove the request -> unlock the queue
973 * - this function/thread does not detect this cancellation process
974 * b) request is in progress (AIO_REQ_INPROGRESS) :
975 * - this function first allow the cancellation of the running
976 * request with the flag "work_cancel_flg=1"
977 * see _aio_req_get() -> _aio_cancel_on()
978 * During this phase, it is allowed to interrupt the worker
979 * thread running the request (this thread) using the SIGAIOCANCEL
980 * signal.
981 * Once this thread returns from the kernel (because the request
982 * is just done), then it must disable a possible cancellation
983 * and proceed to finish the request. To disable the cancellation
984 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
985 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
986 * same procedure as in a)
987 *
988 * To b)
989 * This thread uses sigsetjmp() to define the position in the code, where
990 * it wish to continue working in the case that a SIGAIOCANCEL signal
991 * is detected.
992 * Normally this thread should get the cancellation signal during the
993 * kernel phase (reading or writing). In that case the signal handler
994 * aiosigcancelhndlr() is activated using the worker thread context,
995 * which again will use the siglongjmp() function to break the standard
996 * code flow and jump to the "sigsetjmp" position, provided that
997 * "work_cancel_flg" is set to "1".
998 * Because the "work_cancel_flg" is only manipulated by this worker
999 * thread and it can only run on one CPU at a given time, it is not
1000 * necessary to protect that flag with the queue lock.
1001 * Returning from the kernel (read or write system call) we must
1002 * first disable the use of the SIGAIOCANCEL signal and accordingly
1003 * the use of the siglongjmp() function to prevent a possible deadlock:
1004 * - It can happens that this worker thread returns from the kernel and
1005 * blocks in "work_qlock1",
1006 * - then a second thread cancels the apparently "in progress" request
1007 * and sends the SIGAIOCANCEL signal to the worker thread,
1008 * - the worker thread gets assigned the "work_qlock1" and will returns
1009 * from the kernel,
1010 * - the kernel detects the pending signal and activates the signal
1011 * handler instead,
1012 * - if the "work_cancel_flg" is still set then the signal handler
1013 * should use siglongjmp() to cancel the "in progress" request and
1014 * it would try to acquire the same work_qlock1 in _aio_req_get()
1015 * for a second time => deadlock.
1016 * To avoid that situation we disable the cancellation of the request
1017 * in progress BEFORE we try to acquire the work_qlock1.
1018 * In that case the signal handler will not call siglongjmp() and the
1019 * worker thread will continue running the standard code flow.
1020 * Then this thread must check the AIO_REQ_CANCELED flag to emulate
1021 * an eventually required siglongjmp() freeing the work_qlock1 and
1022 * avoiding a deadlock.
1023 */
1024 void *
_aio_do_request(void * arglist)1025 _aio_do_request(void *arglist)
1026 {
1027 aio_worker_t *aiowp = (aio_worker_t *)arglist;
1028 ulwp_t *self = curthread;
1029 struct aio_args *arg;
1030 aio_req_t *reqp; /* current AIO request */
1031 ssize_t retval;
1032 int append;
1033 int error;
1034
1035 if (pthread_setspecific(_aio_key, aiowp) != 0)
1036 aio_panic("_aio_do_request, pthread_setspecific()");
1037 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1038 ASSERT(aiowp->work_req == NULL);
1039
1040 /*
1041 * We resume here when an operation is cancelled.
1042 * On first entry, aiowp->work_req == NULL, so all
1043 * we do is block SIGAIOCANCEL.
1044 */
1045 (void) sigsetjmp(aiowp->work_jmp_buf, 0);
1046 ASSERT(self->ul_sigdefer == 0);
1047
1048 sigoff(self); /* block SIGAIOCANCEL */
1049 if (aiowp->work_req != NULL)
1050 _aio_finish_request(aiowp, -1, ECANCELED);
1051
1052 for (;;) {
1053 /*
1054 * Put completed requests on aio_done_list. This has
1055 * to be done as part of the main loop to ensure that
1056 * we don't artificially starve any aiowait'ers.
1057 */
1058 if (aiowp->work_done1)
1059 _aio_work_done(aiowp);
1060
1061 top:
1062 /* consume any deferred SIGAIOCANCEL signal here */
1063 sigon(self);
1064 sigoff(self);
1065
1066 while ((reqp = _aio_req_get(aiowp)) == NULL) {
1067 if (_aio_idle(aiowp) != 0)
1068 goto top;
1069 }
1070 arg = &reqp->req_args;
1071 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1072 reqp->req_state == AIO_REQ_CANCELED);
1073 error = 0;
1074
1075 switch (reqp->req_op) {
1076 case AIOREAD:
1077 case AIOAREAD:
1078 sigon(self); /* unblock SIGAIOCANCEL */
1079 retval = pread(arg->fd, arg->buf,
1080 arg->bufsz, arg->offset);
1081 if (retval == -1) {
1082 if (errno == ESPIPE) {
1083 retval = read(arg->fd,
1084 arg->buf, arg->bufsz);
1085 if (retval == -1)
1086 error = errno;
1087 } else {
1088 error = errno;
1089 }
1090 }
1091 sigoff(self); /* block SIGAIOCANCEL */
1092 break;
1093 case AIOWRITE:
1094 case AIOAWRITE:
1095 /*
1096 * The SUSv3 POSIX spec for aio_write() states:
1097 * If O_APPEND is set for the file descriptor,
1098 * write operations append to the file in the
1099 * same order as the calls were made.
1100 * but, somewhat inconsistently, it requires pwrite()
1101 * to ignore the O_APPEND setting. So we have to use
1102 * fcntl() to get the open modes and call write() for
1103 * the O_APPEND case.
1104 */
1105 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1106 sigon(self); /* unblock SIGAIOCANCEL */
1107 retval = append?
1108 write(arg->fd, arg->buf, arg->bufsz) :
1109 pwrite(arg->fd, arg->buf, arg->bufsz,
1110 arg->offset);
1111 if (retval == -1) {
1112 if (errno == ESPIPE) {
1113 retval = write(arg->fd,
1114 arg->buf, arg->bufsz);
1115 if (retval == -1)
1116 error = errno;
1117 } else {
1118 error = errno;
1119 }
1120 }
1121 sigoff(self); /* block SIGAIOCANCEL */
1122 break;
1123 #if !defined(_LP64)
1124 case AIOAREAD64:
1125 sigon(self); /* unblock SIGAIOCANCEL */
1126 retval = pread64(arg->fd, arg->buf,
1127 arg->bufsz, arg->offset);
1128 if (retval == -1) {
1129 if (errno == ESPIPE) {
1130 retval = read(arg->fd,
1131 arg->buf, arg->bufsz);
1132 if (retval == -1)
1133 error = errno;
1134 } else {
1135 error = errno;
1136 }
1137 }
1138 sigoff(self); /* block SIGAIOCANCEL */
1139 break;
1140 case AIOAWRITE64:
1141 /*
1142 * The SUSv3 POSIX spec for aio_write() states:
1143 * If O_APPEND is set for the file descriptor,
1144 * write operations append to the file in the
1145 * same order as the calls were made.
1146 * but, somewhat inconsistently, it requires pwrite()
1147 * to ignore the O_APPEND setting. So we have to use
1148 * fcntl() to get the open modes and call write() for
1149 * the O_APPEND case.
1150 */
1151 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1152 sigon(self); /* unblock SIGAIOCANCEL */
1153 retval = append?
1154 write(arg->fd, arg->buf, arg->bufsz) :
1155 pwrite64(arg->fd, arg->buf, arg->bufsz,
1156 arg->offset);
1157 if (retval == -1) {
1158 if (errno == ESPIPE) {
1159 retval = write(arg->fd,
1160 arg->buf, arg->bufsz);
1161 if (retval == -1)
1162 error = errno;
1163 } else {
1164 error = errno;
1165 }
1166 }
1167 sigoff(self); /* block SIGAIOCANCEL */
1168 break;
1169 #endif /* !defined(_LP64) */
1170 case AIOFSYNC:
1171 if (_aio_fsync_del(aiowp, reqp))
1172 goto top;
1173 ASSERT(reqp->req_head == NULL);
1174 /*
1175 * All writes for this fsync request are now
1176 * acknowledged. Now make these writes visible
1177 * and put the final request into the hash table.
1178 */
1179 if (reqp->req_state == AIO_REQ_CANCELED) {
1180 /* EMPTY */;
1181 } else if (arg->offset == O_SYNC) {
1182 if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1183 error = errno;
1184 } else {
1185 if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1186 error = errno;
1187 }
1188 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1189 aio_panic("_aio_do_request(): AIOFSYNC: "
1190 "request already in hash table");
1191 break;
1192 default:
1193 aio_panic("_aio_do_request, bad op");
1194 }
1195
1196 _aio_finish_request(aiowp, retval, error);
1197 }
1198 /* NOTREACHED */
1199 return (NULL);
1200 }
1201
1202 /*
1203 * Perform the tail processing for _aio_do_request().
1204 * The in-progress request may or may not have been cancelled.
1205 */
1206 static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)1207 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1208 {
1209 aio_req_t *reqp;
1210
1211 sig_mutex_lock(&aiowp->work_qlock1);
1212 if ((reqp = aiowp->work_req) == NULL)
1213 sig_mutex_unlock(&aiowp->work_qlock1);
1214 else {
1215 aiowp->work_req = NULL;
1216 if (reqp->req_state == AIO_REQ_CANCELED) {
1217 retval = -1;
1218 error = ECANCELED;
1219 }
1220 if (!POSIX_AIO(reqp)) {
1221 int notify;
1222 if (reqp->req_state == AIO_REQ_INPROGRESS) {
1223 reqp->req_state = AIO_REQ_DONE;
1224 _aio_set_result(reqp, retval, error);
1225 }
1226 sig_mutex_unlock(&aiowp->work_qlock1);
1227 sig_mutex_lock(&__aio_mutex);
1228 /*
1229 * If it was canceled, this request will not be
1230 * added to done list. Just free it.
1231 */
1232 if (error == ECANCELED) {
1233 _aio_outstand_cnt--;
1234 _aio_req_free(reqp);
1235 } else {
1236 _aio_req_done_cnt++;
1237 }
1238 /*
1239 * Notify any thread that may have blocked
1240 * because it saw an outstanding request.
1241 */
1242 notify = 0;
1243 if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1244 notify = 1;
1245 }
1246 sig_mutex_unlock(&__aio_mutex);
1247 if (notify) {
1248 (void) _kaio(AIONOTIFY);
1249 }
1250 } else {
1251 if (reqp->req_state == AIO_REQ_INPROGRESS)
1252 reqp->req_state = AIO_REQ_DONE;
1253 sig_mutex_unlock(&aiowp->work_qlock1);
1254 _aiodone(reqp, retval, error);
1255 }
1256 }
1257 }
1258
1259 void
_aio_req_mark_done(aio_req_t * reqp)1260 _aio_req_mark_done(aio_req_t *reqp)
1261 {
1262 #if !defined(_LP64)
1263 if (reqp->req_largefile)
1264 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1265 else
1266 #endif
1267 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1268 }
1269
1270 /*
1271 * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1272 * hopefully to consume one of our queued signals.
1273 */
1274 static void
_aio_delay(int ticks)1275 _aio_delay(int ticks)
1276 {
1277 (void) usleep(ticks * (MICROSEC / hz));
1278 }
1279
1280 /*
1281 * Actually send the notifications.
1282 * We could block indefinitely here if the application
1283 * is not listening for the signal or port notifications.
1284 */
1285 static void
send_notification(notif_param_t * npp)1286 send_notification(notif_param_t *npp)
1287 {
1288 extern int __sigqueue(pid_t pid, int signo,
1289 /* const union sigval */ void *value, int si_code, int block);
1290
1291 if (npp->np_signo)
1292 (void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1293 SI_ASYNCIO, 1);
1294 else if (npp->np_port >= 0)
1295 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1296 npp->np_event, npp->np_object, npp->np_user);
1297
1298 if (npp->np_lio_signo)
1299 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1300 SI_ASYNCIO, 1);
1301 else if (npp->np_lio_port >= 0)
1302 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1303 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1304 }
1305
1306 /*
1307 * Asynchronous notification worker.
1308 */
1309 void *
_aio_do_notify(void * arg)1310 _aio_do_notify(void *arg)
1311 {
1312 aio_worker_t *aiowp = (aio_worker_t *)arg;
1313 aio_req_t *reqp;
1314
1315 /*
1316 * This isn't really necessary. All signals are blocked.
1317 */
1318 if (pthread_setspecific(_aio_key, aiowp) != 0)
1319 aio_panic("_aio_do_notify, pthread_setspecific()");
1320
1321 /*
1322 * Notifications are never cancelled.
1323 * All signals remain blocked, forever.
1324 */
1325 for (;;) {
1326 while ((reqp = _aio_req_get(aiowp)) == NULL) {
1327 if (_aio_idle(aiowp) != 0)
1328 aio_panic("_aio_do_notify: _aio_idle() failed");
1329 }
1330 send_notification(&reqp->req_notify);
1331 _aio_req_free(reqp);
1332 }
1333
1334 /* NOTREACHED */
1335 return (NULL);
1336 }
1337
1338 /*
1339 * Do the completion semantics for a request that was either canceled
1340 * by _aio_cancel_req() or was completed by _aio_do_request().
1341 */
1342 static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)1343 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1344 {
1345 aio_result_t *resultp = reqp->req_resultp;
1346 int notify = 0;
1347 aio_lio_t *head;
1348 int sigev_none;
1349 int sigev_signal;
1350 int sigev_thread;
1351 int sigev_port;
1352 notif_param_t np;
1353
1354 /*
1355 * We call _aiodone() only for Posix I/O.
1356 */
1357 ASSERT(POSIX_AIO(reqp));
1358
1359 sigev_none = 0;
1360 sigev_signal = 0;
1361 sigev_thread = 0;
1362 sigev_port = 0;
1363 np.np_signo = 0;
1364 np.np_port = -1;
1365 np.np_lio_signo = 0;
1366 np.np_lio_port = -1;
1367
1368 switch (reqp->req_sigevent.sigev_notify) {
1369 case SIGEV_NONE:
1370 sigev_none = 1;
1371 break;
1372 case SIGEV_SIGNAL:
1373 sigev_signal = 1;
1374 break;
1375 case SIGEV_THREAD:
1376 sigev_thread = 1;
1377 break;
1378 case SIGEV_PORT:
1379 sigev_port = 1;
1380 break;
1381 default:
1382 aio_panic("_aiodone: improper sigev_notify");
1383 break;
1384 }
1385
1386 /*
1387 * Figure out the notification parameters while holding __aio_mutex.
1388 * Actually perform the notifications after dropping __aio_mutex.
1389 * This allows us to sleep for a long time (if the notifications
1390 * incur delays) without impeding other async I/O operations.
1391 */
1392
1393 sig_mutex_lock(&__aio_mutex);
1394
1395 if (sigev_signal) {
1396 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1397 notify = 1;
1398 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1399 } else if (sigev_thread | sigev_port) {
1400 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1401 notify = 1;
1402 np.np_event = reqp->req_op;
1403 if (np.np_event == AIOFSYNC && reqp->req_largefile)
1404 np.np_event = AIOFSYNC64;
1405 np.np_object = (uintptr_t)reqp->req_aiocbp;
1406 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1407 }
1408
1409 if (resultp->aio_errno == EINPROGRESS)
1410 _aio_set_result(reqp, retval, error);
1411
1412 _aio_outstand_cnt--;
1413
1414 head = reqp->req_head;
1415 reqp->req_head = NULL;
1416
1417 if (sigev_none) {
1418 _aio_enq_doneq(reqp);
1419 reqp = NULL;
1420 } else {
1421 (void) _aio_hash_del(resultp);
1422 _aio_req_mark_done(reqp);
1423 }
1424
1425 _aio_waitn_wakeup();
1426
1427 /*
1428 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1429 * __aio_suspend() increments "_aio_kernel_suspend"
1430 * when they are waiting in the kernel for completed I/Os.
1431 *
1432 * _kaio(AIONOTIFY) awakes the corresponding function
1433 * in the kernel; then the corresponding __aio_waitn() or
1434 * __aio_suspend() function could reap the recently
1435 * completed I/Os (_aiodone()).
1436 */
1437 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1438 (void) _kaio(AIONOTIFY);
1439
1440 sig_mutex_unlock(&__aio_mutex);
1441
1442 if (head != NULL) {
1443 /*
1444 * If all the lio requests have completed,
1445 * prepare to notify the waiting thread.
1446 */
1447 sig_mutex_lock(&head->lio_mutex);
1448 ASSERT(head->lio_refcnt == head->lio_nent);
1449 if (head->lio_refcnt == 1) {
1450 int waiting = 0;
1451 if (head->lio_mode == LIO_WAIT) {
1452 if ((waiting = head->lio_waiting) != 0)
1453 (void) cond_signal(&head->lio_cond_cv);
1454 } else if (head->lio_port < 0) { /* none or signal */
1455 if ((np.np_lio_signo = head->lio_signo) != 0)
1456 notify = 1;
1457 np.np_lio_user = head->lio_sigval.sival_ptr;
1458 } else { /* thread or port */
1459 notify = 1;
1460 np.np_lio_port = head->lio_port;
1461 np.np_lio_event = head->lio_event;
1462 np.np_lio_object =
1463 (uintptr_t)head->lio_sigevent;
1464 np.np_lio_user = head->lio_sigval.sival_ptr;
1465 }
1466 head->lio_nent = head->lio_refcnt = 0;
1467 sig_mutex_unlock(&head->lio_mutex);
1468 if (waiting == 0)
1469 _aio_lio_free(head);
1470 } else {
1471 head->lio_nent--;
1472 head->lio_refcnt--;
1473 sig_mutex_unlock(&head->lio_mutex);
1474 }
1475 }
1476
1477 /*
1478 * The request is completed; now perform the notifications.
1479 */
1480 if (notify) {
1481 if (reqp != NULL) {
1482 /*
1483 * We usually put the request on the notification
1484 * queue because we don't want to block and delay
1485 * other operations behind us in the work queue.
1486 * Also we must never block on a cancel notification
1487 * because we are being called from an application
1488 * thread in this case and that could lead to deadlock
1489 * if no other thread is receiving notificatins.
1490 */
1491 reqp->req_notify = np;
1492 reqp->req_op = AIONOTIFY;
1493 _aio_req_add(reqp, &__workers_no, AIONOTIFY);
1494 reqp = NULL;
1495 } else {
1496 /*
1497 * We already put the request on the done queue,
1498 * so we can't queue it to the notification queue.
1499 * Just do the notification directly.
1500 */
1501 send_notification(&np);
1502 }
1503 }
1504
1505 if (reqp != NULL)
1506 _aio_req_free(reqp);
1507 }
1508
1509 /*
1510 * Delete fsync requests from list head until there is
1511 * only one left. Return 0 when there is only one,
1512 * otherwise return a non-zero value.
1513 */
1514 static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)1515 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1516 {
1517 aio_lio_t *head = reqp->req_head;
1518 int rval = 0;
1519
1520 ASSERT(reqp == aiowp->work_req);
1521 sig_mutex_lock(&aiowp->work_qlock1);
1522 sig_mutex_lock(&head->lio_mutex);
1523 if (head->lio_refcnt > 1) {
1524 head->lio_refcnt--;
1525 head->lio_nent--;
1526 aiowp->work_req = NULL;
1527 sig_mutex_unlock(&head->lio_mutex);
1528 sig_mutex_unlock(&aiowp->work_qlock1);
1529 sig_mutex_lock(&__aio_mutex);
1530 _aio_outstand_cnt--;
1531 _aio_waitn_wakeup();
1532 sig_mutex_unlock(&__aio_mutex);
1533 _aio_req_free(reqp);
1534 return (1);
1535 }
1536 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1537 reqp->req_head = NULL;
1538 if (head->lio_canned)
1539 reqp->req_state = AIO_REQ_CANCELED;
1540 if (head->lio_mode == LIO_DESTROY) {
1541 aiowp->work_req = NULL;
1542 rval = 1;
1543 }
1544 sig_mutex_unlock(&head->lio_mutex);
1545 sig_mutex_unlock(&aiowp->work_qlock1);
1546 head->lio_refcnt--;
1547 head->lio_nent--;
1548 _aio_lio_free(head);
1549 if (rval != 0)
1550 _aio_req_free(reqp);
1551 return (rval);
1552 }
1553
1554 /*
1555 * A worker is set idle when its work queue is empty.
1556 * The worker checks again that it has no more work
1557 * and then goes to sleep waiting for more work.
1558 */
1559 int
_aio_idle(aio_worker_t * aiowp)1560 _aio_idle(aio_worker_t *aiowp)
1561 {
1562 int error = 0;
1563
1564 sig_mutex_lock(&aiowp->work_qlock1);
1565 if (aiowp->work_count1 == 0) {
1566 ASSERT(aiowp->work_minload1 == 0);
1567 aiowp->work_idleflg = 1;
1568 /*
1569 * A cancellation handler is not needed here.
1570 * aio worker threads are never cancelled via pthread_cancel().
1571 */
1572 error = sig_cond_wait(&aiowp->work_idle_cv,
1573 &aiowp->work_qlock1);
1574 /*
1575 * The idle flag is normally cleared before worker is awakened
1576 * by aio_req_add(). On error (EINTR), we clear it ourself.
1577 */
1578 if (error)
1579 aiowp->work_idleflg = 0;
1580 }
1581 sig_mutex_unlock(&aiowp->work_qlock1);
1582 return (error);
1583 }
1584
1585 /*
1586 * A worker's completed AIO requests are placed onto a global
1587 * done queue. The application is only sent a SIGIO signal if
1588 * the process has a handler enabled and it is not waiting via
1589 * aiowait().
1590 */
1591 static void
_aio_work_done(aio_worker_t * aiowp)1592 _aio_work_done(aio_worker_t *aiowp)
1593 {
1594 aio_req_t *reqp;
1595
1596 sig_mutex_lock(&__aio_mutex);
1597 sig_mutex_lock(&aiowp->work_qlock1);
1598 reqp = aiowp->work_prev1;
1599 reqp->req_next = NULL;
1600 aiowp->work_done1 = 0;
1601 aiowp->work_tail1 = aiowp->work_next1;
1602 if (aiowp->work_tail1 == NULL)
1603 aiowp->work_head1 = NULL;
1604 aiowp->work_prev1 = NULL;
1605 _aio_outstand_cnt--;
1606 _aio_req_done_cnt--;
1607 if (reqp->req_state == AIO_REQ_CANCELED) {
1608 /*
1609 * Request got cancelled after it was marked done. This can
1610 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1611 * and drops all locks. Don't add the request to the done
1612 * queue and just discard it.
1613 */
1614 sig_mutex_unlock(&aiowp->work_qlock1);
1615 _aio_req_free(reqp);
1616 if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1617 sig_mutex_unlock(&__aio_mutex);
1618 (void) _kaio(AIONOTIFY);
1619 } else {
1620 sig_mutex_unlock(&__aio_mutex);
1621 }
1622 return;
1623 }
1624 sig_mutex_unlock(&aiowp->work_qlock1);
1625 _aio_donecnt++;
1626 ASSERT(_aio_donecnt > 0 &&
1627 _aio_outstand_cnt >= 0 &&
1628 _aio_req_done_cnt >= 0);
1629 ASSERT(reqp != NULL);
1630
1631 if (_aio_done_tail == NULL) {
1632 _aio_done_head = _aio_done_tail = reqp;
1633 } else {
1634 _aio_done_head->req_next = reqp;
1635 _aio_done_head = reqp;
1636 }
1637
1638 if (_aiowait_flag) {
1639 sig_mutex_unlock(&__aio_mutex);
1640 (void) _kaio(AIONOTIFY);
1641 } else {
1642 sig_mutex_unlock(&__aio_mutex);
1643 if (_sigio_enabled)
1644 (void) kill(__pid, SIGIO);
1645 }
1646 }
1647
1648 /*
1649 * The done queue consists of AIO requests that are in either the
1650 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled
1651 * are discarded. If the done queue is empty then NULL is returned.
1652 * Otherwise the address of a done aio_result_t is returned.
1653 */
1654 aio_result_t *
_aio_req_done(void)1655 _aio_req_done(void)
1656 {
1657 aio_req_t *reqp;
1658 aio_result_t *resultp;
1659
1660 ASSERT(MUTEX_HELD(&__aio_mutex));
1661
1662 if ((reqp = _aio_done_tail) != NULL) {
1663 if ((_aio_done_tail = reqp->req_next) == NULL)
1664 _aio_done_head = NULL;
1665 ASSERT(_aio_donecnt > 0);
1666 _aio_donecnt--;
1667 (void) _aio_hash_del(reqp->req_resultp);
1668 resultp = reqp->req_resultp;
1669 ASSERT(reqp->req_state == AIO_REQ_DONE);
1670 _aio_req_free(reqp);
1671 return (resultp);
1672 }
1673 /* is queue empty? */
1674 if (reqp == NULL && _aio_outstand_cnt == 0) {
1675 return ((aio_result_t *)-1);
1676 }
1677 return (NULL);
1678 }
1679
1680 /*
1681 * Set the return and errno values for the application's use.
1682 *
1683 * For the Posix interfaces, we must set the return value first followed
1684 * by the errno value because the Posix interfaces allow for a change
1685 * in the errno value from EINPROGRESS to something else to signal
1686 * the completion of the asynchronous request.
1687 *
1688 * The opposite is true for the Solaris interfaces. These allow for
1689 * a change in the return value from AIO_INPROGRESS to something else
1690 * to signal the completion of the asynchronous request.
1691 */
1692 void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)1693 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1694 {
1695 aio_result_t *resultp = reqp->req_resultp;
1696
1697 if (POSIX_AIO(reqp)) {
1698 resultp->aio_return = retval;
1699 membar_producer();
1700 resultp->aio_errno = error;
1701 } else {
1702 resultp->aio_errno = error;
1703 membar_producer();
1704 resultp->aio_return = retval;
1705 }
1706 }
1707
1708 /*
1709 * Add an AIO request onto the next work queue.
1710 * A circular list of workers is used to choose the next worker.
1711 */
1712 void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)1713 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1714 {
1715 ulwp_t *self = curthread;
1716 aio_worker_t *aiowp;
1717 aio_worker_t *first;
1718 int load_bal_flg = 1;
1719 int found;
1720
1721 ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1722 reqp->req_next = NULL;
1723 /*
1724 * Try to acquire the next worker's work queue. If it is locked,
1725 * then search the list of workers until a queue is found unlocked,
1726 * or until the list is completely traversed at which point another
1727 * worker will be created.
1728 */
1729 sigoff(self); /* defer SIGIO */
1730 sig_mutex_lock(&__aio_mutex);
1731 first = aiowp = *nextworker;
1732 if (mode != AIONOTIFY)
1733 _aio_outstand_cnt++;
1734 sig_mutex_unlock(&__aio_mutex);
1735
1736 switch (mode) {
1737 case AIOREAD:
1738 case AIOWRITE:
1739 case AIOAREAD:
1740 case AIOAWRITE:
1741 #if !defined(_LP64)
1742 case AIOAREAD64:
1743 case AIOAWRITE64:
1744 #endif
1745 /* try to find an idle worker */
1746 found = 0;
1747 do {
1748 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1749 if (aiowp->work_idleflg) {
1750 found = 1;
1751 break;
1752 }
1753 sig_mutex_unlock(&aiowp->work_qlock1);
1754 }
1755 } while ((aiowp = aiowp->work_forw) != first);
1756
1757 if (found) {
1758 aiowp->work_minload1++;
1759 break;
1760 }
1761
1762 /* try to acquire some worker's queue lock */
1763 do {
1764 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1765 found = 1;
1766 break;
1767 }
1768 } while ((aiowp = aiowp->work_forw) != first);
1769
1770 /*
1771 * Create more workers when the workers appear overloaded.
1772 * Either all the workers are busy draining their queues
1773 * or no worker's queue lock could be acquired.
1774 */
1775 if (!found) {
1776 if (_aio_worker_cnt < _max_workers) {
1777 if (_aio_create_worker(reqp, mode))
1778 aio_panic("_aio_req_add: add worker");
1779 sigon(self); /* reenable SIGIO */
1780 return;
1781 }
1782
1783 /*
1784 * No worker available and we have created
1785 * _max_workers, keep going through the
1786 * list slowly until we get a lock
1787 */
1788 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1789 /*
1790 * give someone else a chance
1791 */
1792 _aio_delay(1);
1793 aiowp = aiowp->work_forw;
1794 }
1795 }
1796
1797 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1798 if (_aio_worker_cnt < _max_workers &&
1799 aiowp->work_minload1 >= _minworkload) {
1800 sig_mutex_unlock(&aiowp->work_qlock1);
1801 sig_mutex_lock(&__aio_mutex);
1802 *nextworker = aiowp->work_forw;
1803 sig_mutex_unlock(&__aio_mutex);
1804 if (_aio_create_worker(reqp, mode))
1805 aio_panic("aio_req_add: add worker");
1806 sigon(self); /* reenable SIGIO */
1807 return;
1808 }
1809 aiowp->work_minload1++;
1810 break;
1811 case AIOFSYNC:
1812 case AIONOTIFY:
1813 load_bal_flg = 0;
1814 sig_mutex_lock(&aiowp->work_qlock1);
1815 break;
1816 default:
1817 aio_panic("_aio_req_add: invalid mode");
1818 break;
1819 }
1820 /*
1821 * Put request onto worker's work queue.
1822 */
1823 if (aiowp->work_tail1 == NULL) {
1824 ASSERT(aiowp->work_count1 == 0);
1825 aiowp->work_tail1 = reqp;
1826 aiowp->work_next1 = reqp;
1827 } else {
1828 aiowp->work_head1->req_next = reqp;
1829 if (aiowp->work_next1 == NULL)
1830 aiowp->work_next1 = reqp;
1831 }
1832 reqp->req_state = AIO_REQ_QUEUED;
1833 reqp->req_worker = aiowp;
1834 aiowp->work_head1 = reqp;
1835 /*
1836 * Awaken worker if it is not currently active.
1837 */
1838 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1839 aiowp->work_idleflg = 0;
1840 (void) cond_signal(&aiowp->work_idle_cv);
1841 }
1842 sig_mutex_unlock(&aiowp->work_qlock1);
1843
1844 if (load_bal_flg) {
1845 sig_mutex_lock(&__aio_mutex);
1846 *nextworker = aiowp->work_forw;
1847 sig_mutex_unlock(&__aio_mutex);
1848 }
1849 sigon(self); /* reenable SIGIO */
1850 }
1851
1852 /*
1853 * Get an AIO request for a specified worker.
1854 * If the work queue is empty, return NULL.
1855 */
1856 aio_req_t *
_aio_req_get(aio_worker_t * aiowp)1857 _aio_req_get(aio_worker_t *aiowp)
1858 {
1859 aio_req_t *reqp;
1860
1861 sig_mutex_lock(&aiowp->work_qlock1);
1862 if ((reqp = aiowp->work_next1) != NULL) {
1863 /*
1864 * Remove a POSIX request from the queue; the
1865 * request queue is a singularly linked list
1866 * with a previous pointer. The request is
1867 * removed by updating the previous pointer.
1868 *
1869 * Non-posix requests are left on the queue
1870 * to eventually be placed on the done queue.
1871 */
1872
1873 if (POSIX_AIO(reqp)) {
1874 if (aiowp->work_prev1 == NULL) {
1875 aiowp->work_tail1 = reqp->req_next;
1876 if (aiowp->work_tail1 == NULL)
1877 aiowp->work_head1 = NULL;
1878 } else {
1879 aiowp->work_prev1->req_next = reqp->req_next;
1880 if (aiowp->work_head1 == reqp)
1881 aiowp->work_head1 = reqp->req_next;
1882 }
1883
1884 } else {
1885 aiowp->work_prev1 = reqp;
1886 ASSERT(aiowp->work_done1 >= 0);
1887 aiowp->work_done1++;
1888 }
1889 ASSERT(reqp != reqp->req_next);
1890 aiowp->work_next1 = reqp->req_next;
1891 ASSERT(aiowp->work_count1 >= 1);
1892 aiowp->work_count1--;
1893 switch (reqp->req_op) {
1894 case AIOREAD:
1895 case AIOWRITE:
1896 case AIOAREAD:
1897 case AIOAWRITE:
1898 #if !defined(_LP64)
1899 case AIOAREAD64:
1900 case AIOAWRITE64:
1901 #endif
1902 ASSERT(aiowp->work_minload1 > 0);
1903 aiowp->work_minload1--;
1904 break;
1905 }
1906 reqp->req_state = AIO_REQ_INPROGRESS;
1907 }
1908 aiowp->work_req = reqp;
1909 ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1910 sig_mutex_unlock(&aiowp->work_qlock1);
1911 return (reqp);
1912 }
1913
1914 static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)1915 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1916 {
1917 aio_req_t **last;
1918 aio_req_t *lastrp;
1919 aio_req_t *next;
1920
1921 ASSERT(aiowp != NULL);
1922 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1923 if (POSIX_AIO(reqp)) {
1924 if (ostate != AIO_REQ_QUEUED)
1925 return;
1926 }
1927 last = &aiowp->work_tail1;
1928 lastrp = aiowp->work_tail1;
1929 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1930 while ((next = *last) != NULL) {
1931 if (next == reqp) {
1932 *last = next->req_next;
1933 if (aiowp->work_next1 == next)
1934 aiowp->work_next1 = next->req_next;
1935
1936 /*
1937 * if this is the first request on the queue, move
1938 * the lastrp pointer forward.
1939 */
1940 if (lastrp == next)
1941 lastrp = next->req_next;
1942
1943 /*
1944 * if this request is pointed by work_head1, then
1945 * make work_head1 point to the last request that is
1946 * present on the queue.
1947 */
1948 if (aiowp->work_head1 == next)
1949 aiowp->work_head1 = lastrp;
1950
1951 /*
1952 * work_prev1 is used only in non posix case and it
1953 * points to the current AIO_REQ_INPROGRESS request.
1954 * If work_prev1 points to this request which is being
1955 * deleted, make work_prev1 NULL and set work_done1
1956 * to 0.
1957 *
1958 * A worker thread can be processing only one request
1959 * at a time.
1960 */
1961 if (aiowp->work_prev1 == next) {
1962 ASSERT(ostate == AIO_REQ_INPROGRESS &&
1963 !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1964 aiowp->work_prev1 = NULL;
1965 aiowp->work_done1--;
1966 }
1967
1968 if (ostate == AIO_REQ_QUEUED) {
1969 ASSERT(aiowp->work_count1 >= 1);
1970 aiowp->work_count1--;
1971 ASSERT(aiowp->work_minload1 >= 1);
1972 aiowp->work_minload1--;
1973 }
1974 return;
1975 }
1976 last = &next->req_next;
1977 lastrp = next;
1978 }
1979 /* NOTREACHED */
1980 }
1981
1982 static void
_aio_enq_doneq(aio_req_t * reqp)1983 _aio_enq_doneq(aio_req_t *reqp)
1984 {
1985 if (_aio_doneq == NULL) {
1986 _aio_doneq = reqp;
1987 reqp->req_next = reqp->req_prev = reqp;
1988 } else {
1989 reqp->req_next = _aio_doneq;
1990 reqp->req_prev = _aio_doneq->req_prev;
1991 _aio_doneq->req_prev->req_next = reqp;
1992 _aio_doneq->req_prev = reqp;
1993 }
1994 reqp->req_state = AIO_REQ_DONEQ;
1995 _aio_doneq_cnt++;
1996 }
1997
1998 /*
1999 * caller owns the _aio_mutex
2000 */
2001 aio_req_t *
_aio_req_remove(aio_req_t * reqp)2002 _aio_req_remove(aio_req_t *reqp)
2003 {
2004 if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2005 return (NULL);
2006
2007 if (reqp) {
2008 /* request in done queue */
2009 if (_aio_doneq == reqp)
2010 _aio_doneq = reqp->req_next;
2011 if (_aio_doneq == reqp) {
2012 /* only one request on queue */
2013 _aio_doneq = NULL;
2014 } else {
2015 aio_req_t *tmp = reqp->req_next;
2016 reqp->req_prev->req_next = tmp;
2017 tmp->req_prev = reqp->req_prev;
2018 }
2019 } else if ((reqp = _aio_doneq) != NULL) {
2020 if (reqp == reqp->req_next) {
2021 /* only one request on queue */
2022 _aio_doneq = NULL;
2023 } else {
2024 reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2025 _aio_doneq->req_prev = reqp->req_prev;
2026 }
2027 }
2028 if (reqp) {
2029 _aio_doneq_cnt--;
2030 reqp->req_next = reqp->req_prev = reqp;
2031 reqp->req_state = AIO_REQ_DONE;
2032 }
2033 return (reqp);
2034 }
2035
2036 /*
2037 * An AIO request is identified by an aio_result_t pointer. The library
2038 * maps this aio_result_t pointer to its internal representation using a
2039 * hash table. This function adds an aio_result_t pointer to the hash table.
2040 */
2041 static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)2042 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2043 {
2044 aio_hash_t *hashp;
2045 aio_req_t **prev;
2046 aio_req_t *next;
2047
2048 hashp = _aio_hash + AIOHASH(resultp);
2049 lmutex_lock(&hashp->hash_lock);
2050 prev = &hashp->hash_ptr;
2051 while ((next = *prev) != NULL) {
2052 if (resultp == next->req_resultp) {
2053 lmutex_unlock(&hashp->hash_lock);
2054 return (-1);
2055 }
2056 prev = &next->req_link;
2057 }
2058 *prev = reqp;
2059 ASSERT(reqp->req_link == NULL);
2060 lmutex_unlock(&hashp->hash_lock);
2061 return (0);
2062 }
2063
2064 /*
2065 * Remove an entry from the hash table.
2066 */
2067 aio_req_t *
_aio_hash_del(aio_result_t * resultp)2068 _aio_hash_del(aio_result_t *resultp)
2069 {
2070 aio_hash_t *hashp;
2071 aio_req_t **prev;
2072 aio_req_t *next = NULL;
2073
2074 if (_aio_hash != NULL) {
2075 hashp = _aio_hash + AIOHASH(resultp);
2076 lmutex_lock(&hashp->hash_lock);
2077 prev = &hashp->hash_ptr;
2078 while ((next = *prev) != NULL) {
2079 if (resultp == next->req_resultp) {
2080 *prev = next->req_link;
2081 next->req_link = NULL;
2082 break;
2083 }
2084 prev = &next->req_link;
2085 }
2086 lmutex_unlock(&hashp->hash_lock);
2087 }
2088 return (next);
2089 }
2090
2091 /*
2092 * find an entry in the hash table
2093 */
2094 aio_req_t *
_aio_hash_find(aio_result_t * resultp)2095 _aio_hash_find(aio_result_t *resultp)
2096 {
2097 aio_hash_t *hashp;
2098 aio_req_t **prev;
2099 aio_req_t *next = NULL;
2100
2101 if (_aio_hash != NULL) {
2102 hashp = _aio_hash + AIOHASH(resultp);
2103 lmutex_lock(&hashp->hash_lock);
2104 prev = &hashp->hash_ptr;
2105 while ((next = *prev) != NULL) {
2106 if (resultp == next->req_resultp)
2107 break;
2108 prev = &next->req_link;
2109 }
2110 lmutex_unlock(&hashp->hash_lock);
2111 }
2112 return (next);
2113 }
2114
2115 /*
2116 * AIO interface for POSIX
2117 */
2118 int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2119 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2120 int mode, int flg)
2121 {
2122 aio_req_t *reqp;
2123 aio_args_t *ap;
2124 int kerr;
2125
2126 if (aiocbp == NULL) {
2127 errno = EINVAL;
2128 return (-1);
2129 }
2130
2131 /* initialize kaio */
2132 if (!_kaio_ok)
2133 _kaio_init();
2134
2135 aiocbp->aio_state = NOCHECK;
2136
2137 /*
2138 * If we have been called because a list I/O
2139 * kaio() failed, we dont want to repeat the
2140 * system call
2141 */
2142
2143 if (flg & AIO_KAIO) {
2144 /*
2145 * Try kernel aio first.
2146 * If errno is ENOTSUP/EBADFD,
2147 * fall back to the thread implementation.
2148 */
2149 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2150 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2151 aiocbp->aio_state = CHECK;
2152 kerr = (int)_kaio(mode, aiocbp);
2153 if (kerr == 0)
2154 return (0);
2155 if (errno != ENOTSUP && errno != EBADFD) {
2156 aiocbp->aio_resultp.aio_errno = errno;
2157 aiocbp->aio_resultp.aio_return = -1;
2158 aiocbp->aio_state = NOCHECK;
2159 return (-1);
2160 }
2161 if (errno == EBADFD)
2162 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2163 }
2164 }
2165
2166 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2167 aiocbp->aio_state = USERAIO;
2168
2169 if (!__uaio_ok && __uaio_init() == -1)
2170 return (-1);
2171
2172 if ((reqp = _aio_req_alloc()) == NULL) {
2173 errno = EAGAIN;
2174 return (-1);
2175 }
2176
2177 /*
2178 * If an LIO request, add the list head to the aio request
2179 */
2180 reqp->req_head = lio_head;
2181 reqp->req_type = AIO_POSIX_REQ;
2182 reqp->req_op = mode;
2183 reqp->req_largefile = 0;
2184
2185 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2186 reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2187 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2188 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2189 reqp->req_sigevent.sigev_signo =
2190 aiocbp->aio_sigevent.sigev_signo;
2191 reqp->req_sigevent.sigev_value.sival_ptr =
2192 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2193 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2194 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2195 reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2196 /*
2197 * Reuse the sigevent structure to contain the port number
2198 * and the user value. Same for SIGEV_THREAD, below.
2199 */
2200 reqp->req_sigevent.sigev_signo =
2201 pn->portnfy_port;
2202 reqp->req_sigevent.sigev_value.sival_ptr =
2203 pn->portnfy_user;
2204 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2205 reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2206 /*
2207 * The sigevent structure contains the port number
2208 * and the user value. Same for SIGEV_PORT, above.
2209 */
2210 reqp->req_sigevent.sigev_signo =
2211 aiocbp->aio_sigevent.sigev_signo;
2212 reqp->req_sigevent.sigev_value.sival_ptr =
2213 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2214 }
2215
2216 reqp->req_resultp = &aiocbp->aio_resultp;
2217 reqp->req_aiocbp = aiocbp;
2218 ap = &reqp->req_args;
2219 ap->fd = aiocbp->aio_fildes;
2220 ap->buf = (caddr_t)aiocbp->aio_buf;
2221 ap->bufsz = aiocbp->aio_nbytes;
2222 ap->offset = aiocbp->aio_offset;
2223
2224 if ((flg & AIO_NO_DUPS) &&
2225 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2226 aio_panic("_aio_rw(): request already in hash table");
2227 _aio_req_free(reqp);
2228 errno = EINVAL;
2229 return (-1);
2230 }
2231 _aio_req_add(reqp, nextworker, mode);
2232 return (0);
2233 }
2234
2235 #if !defined(_LP64)
2236 /*
2237 * 64-bit AIO interface for POSIX
2238 */
2239 int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2240 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2241 int mode, int flg)
2242 {
2243 aio_req_t *reqp;
2244 aio_args_t *ap;
2245 int kerr;
2246
2247 if (aiocbp == NULL) {
2248 errno = EINVAL;
2249 return (-1);
2250 }
2251
2252 /* initialize kaio */
2253 if (!_kaio_ok)
2254 _kaio_init();
2255
2256 aiocbp->aio_state = NOCHECK;
2257
2258 /*
2259 * If we have been called because a list I/O
2260 * kaio() failed, we dont want to repeat the
2261 * system call
2262 */
2263
2264 if (flg & AIO_KAIO) {
2265 /*
2266 * Try kernel aio first.
2267 * If errno is ENOTSUP/EBADFD,
2268 * fall back to the thread implementation.
2269 */
2270 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2271 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2272 aiocbp->aio_state = CHECK;
2273 kerr = (int)_kaio(mode, aiocbp);
2274 if (kerr == 0)
2275 return (0);
2276 if (errno != ENOTSUP && errno != EBADFD) {
2277 aiocbp->aio_resultp.aio_errno = errno;
2278 aiocbp->aio_resultp.aio_return = -1;
2279 aiocbp->aio_state = NOCHECK;
2280 return (-1);
2281 }
2282 if (errno == EBADFD)
2283 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2284 }
2285 }
2286
2287 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2288 aiocbp->aio_state = USERAIO;
2289
2290 if (!__uaio_ok && __uaio_init() == -1)
2291 return (-1);
2292
2293 if ((reqp = _aio_req_alloc()) == NULL) {
2294 errno = EAGAIN;
2295 return (-1);
2296 }
2297
2298 /*
2299 * If an LIO request, add the list head to the aio request
2300 */
2301 reqp->req_head = lio_head;
2302 reqp->req_type = AIO_POSIX_REQ;
2303 reqp->req_op = mode;
2304 reqp->req_largefile = 1;
2305
2306 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2307 reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2308 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2309 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2310 reqp->req_sigevent.sigev_signo =
2311 aiocbp->aio_sigevent.sigev_signo;
2312 reqp->req_sigevent.sigev_value.sival_ptr =
2313 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2314 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2315 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2316 reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2317 reqp->req_sigevent.sigev_signo =
2318 pn->portnfy_port;
2319 reqp->req_sigevent.sigev_value.sival_ptr =
2320 pn->portnfy_user;
2321 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2322 reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2323 reqp->req_sigevent.sigev_signo =
2324 aiocbp->aio_sigevent.sigev_signo;
2325 reqp->req_sigevent.sigev_value.sival_ptr =
2326 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2327 }
2328
2329 reqp->req_resultp = &aiocbp->aio_resultp;
2330 reqp->req_aiocbp = aiocbp;
2331 ap = &reqp->req_args;
2332 ap->fd = aiocbp->aio_fildes;
2333 ap->buf = (caddr_t)aiocbp->aio_buf;
2334 ap->bufsz = aiocbp->aio_nbytes;
2335 ap->offset = aiocbp->aio_offset;
2336
2337 if ((flg & AIO_NO_DUPS) &&
2338 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2339 aio_panic("_aio_rw64(): request already in hash table");
2340 _aio_req_free(reqp);
2341 errno = EINVAL;
2342 return (-1);
2343 }
2344 _aio_req_add(reqp, nextworker, mode);
2345 return (0);
2346 }
2347 #endif /* !defined(_LP64) */
2348