1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 * Copyright 2025 MNX Cloud, Inc.
26 */
27
28 #include "lint.h"
29 #include "thr_uberdata.h"
30 #include "libc.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43
44 extern void _aio_lio_free(aio_lio_t *);
45
46 extern int __fcntl(int, int, ...);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53
54 /*
55 * switch for kernel async I/O
56 */
57 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */
58
59 /*
60 * Key for thread-specific data
61 */
62 pthread_key_t _aio_key;
63
64 /*
65 * Array for determining whether or not a file supports kaio.
66 * Initialized in _kaio_init().
67 */
68 uint32_t *_kaio_supported = NULL;
69
70 /*
71 * workers for read/write requests
72 * (__aio_mutex lock protects circular linked list of workers)
73 */
74 aio_worker_t *__workers_rw; /* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw; /* next worker in list of workers */
76 int __rw_workerscnt; /* number of read/write workers */
77
78 /*
79 * worker for notification requests.
80 */
81 aio_worker_t *__workers_no; /* circular list of AIO workers */
82 aio_worker_t *__nextworker_no; /* next worker in list of workers */
83 int __no_workerscnt; /* number of write workers */
84
85 aio_req_t *_aio_done_tail; /* list of done requests */
86 aio_req_t *_aio_done_head;
87
88 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91
92 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */
94
95 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */
96 int _sigio_enabled = 0; /* when set, send SIGIO signal */
97
98 aio_hash_t *_aio_hash;
99
100 aio_req_t *_aio_doneq; /* double linked done queue list */
101
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0; /* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0; /* # of outstanding requests */
106 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0; /* active kernel kaio calls */
109 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */
110
111 int _max_workers = 256; /* max number of workers permitted */
112 int _min_workers = 4; /* min number of workers */
113 int _minworkload = 2; /* min number of request in q */
114 int _aio_worker_cnt = 0; /* number of workers to do requests */
115 int __uaio_ok = 0; /* AIO has been enabled */
116 sigset_t _worker_set; /* worker's signal mask */
117
118 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */
119 int _aio_flags = 0; /* see asyncio.h defines for */
120
121 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */
122
123 int hz; /* clock ticks per second */
124
125 static int
_kaio_supported_init(void)126 _kaio_supported_init(void)
127 {
128 void *ptr;
129 size_t size;
130
131 if (_kaio_supported != NULL) /* already initialized */
132 return (0);
133
134 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 if (ptr == MAP_FAILED)
138 return (-1);
139 _kaio_supported = ptr;
140 return (0);
141 }
142
143 /*
144 * The aio subsystem is initialized when an AIO request is made.
145 * Constants are initialized like the max number of workers that
146 * the subsystem can create, and the minimum number of workers
147 * permitted before imposing some restrictions. Also, some
148 * workers are created.
149 */
150 int
__uaio_init(void)151 __uaio_init(void)
152 {
153 int ret = -1;
154 int i;
155 int cancel_state;
156
157 lmutex_lock(&__aio_initlock);
158 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
159 while (__aio_initbusy)
160 (void) cond_wait(&__aio_initcv, &__aio_initlock);
161 (void) pthread_setcancelstate(cancel_state, NULL);
162 if (__uaio_ok) { /* already initialized */
163 lmutex_unlock(&__aio_initlock);
164 return (0);
165 }
166 __aio_initbusy = 1;
167 lmutex_unlock(&__aio_initlock);
168
169 hz = (int)sysconf(_SC_CLK_TCK);
170 __pid = getpid();
171
172 setup_cancelsig(SIGAIOCANCEL);
173
174 if (_kaio_supported_init() != 0)
175 goto out;
176
177 /*
178 * Allocate and initialize the hash table.
179 * Do this only once, even if __uaio_init() is called twice.
180 */
181 if (_aio_hash == NULL) {
182 /* LINTED pointer cast */
183 _aio_hash = (aio_hash_t *)mmap(NULL,
184 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
185 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
186 if ((void *)_aio_hash == MAP_FAILED) {
187 _aio_hash = NULL;
188 goto out;
189 }
190 for (i = 0; i < HASHSZ; i++)
191 (void) mutex_init(&_aio_hash[i].hash_lock,
192 USYNC_THREAD, NULL);
193 }
194
195 /*
196 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
197 */
198 (void) sigfillset(&_worker_set);
199 (void) sigdelset(&_worker_set, SIGAIOCANCEL);
200
201 /*
202 * Create one worker to send asynchronous notifications.
203 * Do this only once, even if __uaio_init() is called twice.
204 */
205 if (__no_workerscnt == 0 &&
206 (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
207 errno = EAGAIN;
208 goto out;
209 }
210
211 /*
212 * Create the minimum number of read/write workers.
213 * And later check whether atleast one worker is created;
214 * lwp_create() calls could fail because of segkp exhaustion.
215 */
216 for (i = 0; i < _min_workers; i++)
217 (void) _aio_create_worker(NULL, AIOREAD);
218 if (__rw_workerscnt == 0) {
219 errno = EAGAIN;
220 goto out;
221 }
222
223 ret = 0;
224 out:
225 lmutex_lock(&__aio_initlock);
226 if (ret == 0)
227 __uaio_ok = 1;
228 __aio_initbusy = 0;
229 (void) cond_broadcast(&__aio_initcv);
230 lmutex_unlock(&__aio_initlock);
231 return (ret);
232 }
233
234 /*
235 * Called from close() before actually performing the real _close().
236 */
237 void
_aio_close(int fd)238 _aio_close(int fd)
239 {
240 if (fd < 0) /* avoid cancelling everything */
241 return;
242 /*
243 * Cancel all outstanding aio requests for this file descriptor.
244 */
245 if (__uaio_ok)
246 (void) aiocancel_all(fd);
247 /*
248 * If we have allocated the bit array, clear the bit for this file.
249 * The next open may re-use this file descriptor and the new file
250 * may have different kaio() behaviour.
251 */
252 if (_kaio_supported != NULL)
253 CLEAR_KAIO_SUPPORTED(fd);
254 }
255
256 /*
257 * special kaio cleanup thread sits in a loop in the
258 * kernel waiting for pending kaio requests to complete.
259 */
260 void *
_kaio_cleanup_thread(void * arg)261 _kaio_cleanup_thread(void *arg)
262 {
263 if (pthread_setspecific(_aio_key, arg) != 0)
264 aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
265 (void) _kaio(AIOSTART);
266 return (arg);
267 }
268
269 /*
270 * initialize kaio.
271 */
272 void
_kaio_init()273 _kaio_init()
274 {
275 int error;
276 sigset_t oset;
277 int cancel_state;
278
279 lmutex_lock(&__aio_initlock);
280 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
281 while (__aio_initbusy)
282 (void) cond_wait(&__aio_initcv, &__aio_initlock);
283 (void) pthread_setcancelstate(cancel_state, NULL);
284 if (_kaio_ok) { /* already initialized */
285 lmutex_unlock(&__aio_initlock);
286 return;
287 }
288 __aio_initbusy = 1;
289 lmutex_unlock(&__aio_initlock);
290
291 if (_kaio_supported_init() != 0)
292 error = ENOMEM;
293 else if ((_kaiowp = _aio_worker_alloc()) == NULL)
294 error = ENOMEM;
295 else if ((error = (int)_kaio(AIOINIT)) == 0) {
296 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
297 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
298 _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
299 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
300 }
301 if (error && _kaiowp != NULL) {
302 _aio_worker_free(_kaiowp);
303 _kaiowp = NULL;
304 }
305
306 lmutex_lock(&__aio_initlock);
307 if (error)
308 _kaio_ok = -1;
309 else
310 _kaio_ok = 1;
311 __aio_initbusy = 0;
312 (void) cond_broadcast(&__aio_initcv);
313 lmutex_unlock(&__aio_initlock);
314 }
315
316 int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)317 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
318 aio_result_t *resultp)
319 {
320 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
321 }
322
323 int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)324 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
325 aio_result_t *resultp)
326 {
327 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
328 }
329
330 #if !defined(_LP64)
331 int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)332 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
333 aio_result_t *resultp)
334 {
335 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
336 }
337
338 int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)339 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
340 aio_result_t *resultp)
341 {
342 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
343 }
344 #endif /* !defined(_LP64) */
345
346 int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)347 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
348 aio_result_t *resultp, int mode)
349 {
350 aio_req_t *reqp;
351 aio_args_t *ap;
352 offset_t loffset;
353 struct stat64 stat64;
354 int error = 0;
355 int kerr;
356 int umode;
357
358 switch (whence) {
359
360 case SEEK_SET:
361 loffset = offset;
362 break;
363 case SEEK_CUR:
364 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
365 error = -1;
366 else
367 loffset += offset;
368 break;
369 case SEEK_END:
370 if (fstat64(fd, &stat64) == -1)
371 error = -1;
372 else
373 loffset = offset + stat64.st_size;
374 break;
375 default:
376 errno = EINVAL;
377 error = -1;
378 }
379
380 if (error)
381 return (error);
382
383 /* initialize kaio */
384 if (!_kaio_ok)
385 _kaio_init();
386
387 /*
388 * _aio_do_request() needs the original request code (mode) to be able
389 * to choose the appropiate 32/64 bit function. All other functions
390 * only require the difference between READ and WRITE (umode).
391 */
392 if (mode == AIOAREAD64 || mode == AIOAWRITE64)
393 umode = mode - AIOAREAD64;
394 else
395 umode = mode;
396
397 /*
398 * Try kernel aio first.
399 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
400 */
401 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
402 resultp->aio_errno = 0;
403 sig_mutex_lock(&__aio_mutex);
404 _kaio_outstand_cnt++;
405 sig_mutex_unlock(&__aio_mutex);
406 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
407 (umode | AIO_POLL_BIT) : umode),
408 fd, buf, bufsz, loffset, resultp);
409 if (kerr == 0) {
410 return (0);
411 }
412 sig_mutex_lock(&__aio_mutex);
413 _kaio_outstand_cnt--;
414 sig_mutex_unlock(&__aio_mutex);
415 if (errno != ENOTSUP && errno != EBADFD)
416 return (-1);
417 if (errno == EBADFD)
418 SET_KAIO_NOT_SUPPORTED(fd);
419 }
420
421 if (!__uaio_ok && __uaio_init() == -1)
422 return (-1);
423
424 if ((reqp = _aio_req_alloc()) == NULL) {
425 errno = EAGAIN;
426 return (-1);
427 }
428
429 /*
430 * _aio_do_request() checks reqp->req_op to differentiate
431 * between 32 and 64 bit access.
432 */
433 reqp->req_op = mode;
434 reqp->req_resultp = resultp;
435 ap = &reqp->req_args;
436 ap->fd = fd;
437 ap->buf = buf;
438 ap->bufsz = bufsz;
439 ap->offset = loffset;
440
441 if (_aio_hash_insert(resultp, reqp) != 0) {
442 _aio_req_free(reqp);
443 errno = EINVAL;
444 return (-1);
445 }
446 /*
447 * _aio_req_add() only needs the difference between READ and
448 * WRITE to choose the right worker queue.
449 */
450 _aio_req_add(reqp, &__nextworker_rw, umode);
451 return (0);
452 }
453
454 int
aiocancel(aio_result_t * resultp)455 aiocancel(aio_result_t *resultp)
456 {
457 aio_req_t *reqp;
458 aio_worker_t *aiowp;
459 int ret;
460 int done = 0;
461 int canceled = 0;
462
463 if (!__uaio_ok) {
464 errno = EINVAL;
465 return (-1);
466 }
467
468 sig_mutex_lock(&__aio_mutex);
469 reqp = _aio_hash_find(resultp);
470 if (reqp == NULL) {
471 if (_aio_outstand_cnt == _aio_req_done_cnt)
472 errno = EINVAL;
473 else
474 errno = EACCES;
475 ret = -1;
476 } else {
477 aiowp = reqp->req_worker;
478 sig_mutex_lock(&aiowp->work_qlock1);
479 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
480 sig_mutex_unlock(&aiowp->work_qlock1);
481
482 if (canceled) {
483 ret = 0;
484 } else {
485 if (_aio_outstand_cnt == 0 ||
486 _aio_outstand_cnt == _aio_req_done_cnt)
487 errno = EINVAL;
488 else
489 errno = EACCES;
490 ret = -1;
491 }
492 }
493 sig_mutex_unlock(&__aio_mutex);
494 return (ret);
495 }
496
497 static void
_aiowait_cleanup(void * arg __unused)498 _aiowait_cleanup(void *arg __unused)
499 {
500 sig_mutex_lock(&__aio_mutex);
501 _aiowait_flag--;
502 sig_mutex_unlock(&__aio_mutex);
503 }
504
505 /*
506 * This must be asynch safe and cancel safe
507 */
508 aio_result_t *
aiowait(struct timeval * uwait)509 aiowait(struct timeval *uwait)
510 {
511 aio_result_t *uresultp;
512 aio_result_t *kresultp;
513 aio_result_t *resultp;
514 int dontblock;
515 int timedwait = 0;
516 int kaio_errno = 0;
517 struct timeval twait;
518 struct timeval *wait = NULL;
519 hrtime_t hrtend;
520 hrtime_t hres;
521
522 if (uwait) {
523 /*
524 * Check for a valid specified wait time.
525 * If it is invalid, fail the call right away.
526 */
527 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
528 uwait->tv_usec >= MICROSEC) {
529 errno = EINVAL;
530 return ((aio_result_t *)-1);
531 }
532
533 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
534 hrtend = gethrtime() +
535 (hrtime_t)uwait->tv_sec * NANOSEC +
536 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
537 twait = *uwait;
538 wait = &twait;
539 timedwait++;
540 } else {
541 /* polling */
542 sig_mutex_lock(&__aio_mutex);
543 if (_kaio_outstand_cnt == 0) {
544 kresultp = (aio_result_t *)-1;
545 } else {
546 kresultp = (aio_result_t *)_kaio(AIOWAIT,
547 (struct timeval *)-1, 1);
548 if (kresultp != (aio_result_t *)-1 &&
549 kresultp != NULL &&
550 kresultp != (aio_result_t *)1) {
551 _kaio_outstand_cnt--;
552 sig_mutex_unlock(&__aio_mutex);
553 return (kresultp);
554 }
555 }
556 uresultp = _aio_req_done();
557 sig_mutex_unlock(&__aio_mutex);
558 if (uresultp != NULL &&
559 uresultp != (aio_result_t *)-1) {
560 return (uresultp);
561 }
562 if (uresultp == (aio_result_t *)-1 &&
563 kresultp == (aio_result_t *)-1) {
564 errno = EINVAL;
565 return ((aio_result_t *)-1);
566 } else {
567 return (NULL);
568 }
569 }
570 }
571
572 for (;;) {
573 sig_mutex_lock(&__aio_mutex);
574 uresultp = _aio_req_done();
575 if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
576 sig_mutex_unlock(&__aio_mutex);
577 resultp = uresultp;
578 break;
579 }
580 _aiowait_flag++;
581 dontblock = (uresultp == (aio_result_t *)-1);
582 if (dontblock && _kaio_outstand_cnt == 0) {
583 kresultp = (aio_result_t *)-1;
584 kaio_errno = EINVAL;
585 } else {
586 sig_mutex_unlock(&__aio_mutex);
587 pthread_cleanup_push(_aiowait_cleanup, NULL);
588 _cancel_prologue();
589 kresultp = (aio_result_t *)_kaio(AIOWAIT,
590 wait, dontblock);
591 _cancel_epilogue();
592 pthread_cleanup_pop(0);
593 sig_mutex_lock(&__aio_mutex);
594 kaio_errno = errno;
595 }
596 _aiowait_flag--;
597 sig_mutex_unlock(&__aio_mutex);
598 if (kresultp == (aio_result_t *)1) {
599 /* aiowait() awakened by an aionotify() */
600 continue;
601 } else if (kresultp != NULL &&
602 kresultp != (aio_result_t *)-1) {
603 resultp = kresultp;
604 sig_mutex_lock(&__aio_mutex);
605 _kaio_outstand_cnt--;
606 sig_mutex_unlock(&__aio_mutex);
607 break;
608 } else if (kresultp == (aio_result_t *)-1 &&
609 kaio_errno == EINVAL &&
610 uresultp == (aio_result_t *)-1) {
611 errno = kaio_errno;
612 resultp = (aio_result_t *)-1;
613 break;
614 } else if (kresultp == (aio_result_t *)-1 &&
615 kaio_errno == EINTR) {
616 errno = kaio_errno;
617 resultp = (aio_result_t *)-1;
618 break;
619 } else if (timedwait) {
620 hres = hrtend - gethrtime();
621 if (hres <= 0) {
622 /* time is up; return */
623 resultp = NULL;
624 break;
625 } else {
626 /*
627 * Some time left. Round up the remaining time
628 * in nanoseconds to microsec. Retry the call.
629 */
630 hres += (NANOSEC / MICROSEC) - 1;
631 wait->tv_sec = hres / NANOSEC;
632 wait->tv_usec =
633 (hres % NANOSEC) / (NANOSEC / MICROSEC);
634 }
635 } else {
636 ASSERT(kresultp == NULL && uresultp == NULL);
637 resultp = NULL;
638 continue;
639 }
640 }
641 return (resultp);
642 }
643
644 /*
645 * _aio_get_timedelta calculates the remaining time and stores the result
646 * into timespec_t *wait.
647 */
648
649 int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)650 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
651 {
652 int ret = 0;
653 struct timeval cur;
654 timespec_t curtime;
655
656 (void) gettimeofday(&cur, NULL);
657 curtime.tv_sec = cur.tv_sec;
658 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */
659
660 if (end->tv_sec >= curtime.tv_sec) {
661 wait->tv_sec = end->tv_sec - curtime.tv_sec;
662 if (end->tv_nsec >= curtime.tv_nsec) {
663 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
664 if (wait->tv_sec == 0 && wait->tv_nsec == 0)
665 ret = -1; /* timer expired */
666 } else {
667 if (end->tv_sec > curtime.tv_sec) {
668 wait->tv_sec -= 1;
669 wait->tv_nsec = NANOSEC -
670 (curtime.tv_nsec - end->tv_nsec);
671 } else {
672 ret = -1; /* timer expired */
673 }
674 }
675 } else {
676 ret = -1;
677 }
678 return (ret);
679 }
680
681 /*
682 * If closing by file descriptor: we will simply cancel all the outstanding
683 * aio`s and return. Those aio's in question will have either noticed the
684 * cancellation notice before, during, or after initiating io.
685 */
686 int
aiocancel_all(int fd)687 aiocancel_all(int fd)
688 {
689 aio_req_t *reqp;
690 aio_req_t **reqpp, *last;
691 aio_worker_t *first;
692 aio_worker_t *next;
693 int canceled = 0;
694 int done = 0;
695 int cancelall = 0;
696
697 sig_mutex_lock(&__aio_mutex);
698
699 if (_aio_outstand_cnt == 0) {
700 sig_mutex_unlock(&__aio_mutex);
701 return (AIO_ALLDONE);
702 }
703
704 /*
705 * Cancel requests from the read/write workers' queues.
706 */
707 first = __nextworker_rw;
708 next = first;
709 do {
710 _aio_cancel_work(next, fd, &canceled, &done);
711 } while ((next = next->work_forw) != first);
712
713 /*
714 * finally, check if there are requests on the done queue that
715 * should be canceled.
716 */
717 if (fd < 0)
718 cancelall = 1;
719 reqpp = &_aio_done_tail;
720 last = _aio_done_tail;
721 while ((reqp = *reqpp) != NULL) {
722 if (cancelall || reqp->req_args.fd == fd) {
723 *reqpp = reqp->req_next;
724 if (last == reqp) {
725 last = reqp->req_next;
726 }
727 if (_aio_done_head == reqp) {
728 /* this should be the last req in list */
729 _aio_done_head = last;
730 }
731 _aio_donecnt--;
732 _aio_set_result(reqp, -1, ECANCELED);
733 (void) _aio_hash_del(reqp->req_resultp);
734 _aio_req_free(reqp);
735 } else {
736 reqpp = &reqp->req_next;
737 last = reqp;
738 }
739 }
740
741 if (cancelall) {
742 ASSERT(_aio_donecnt == 0);
743 _aio_done_head = NULL;
744 }
745 sig_mutex_unlock(&__aio_mutex);
746
747 if (canceled && done == 0)
748 return (AIO_CANCELED);
749 else if (done && canceled == 0)
750 return (AIO_ALLDONE);
751 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
752 return ((int)_kaio(AIOCANCEL, fd, NULL));
753 return (AIO_NOTCANCELED);
754 }
755
756 /*
757 * Cancel requests from a given work queue. If the file descriptor
758 * parameter, fd, is non-negative, then only cancel those requests
759 * in this queue that are to this file descriptor. If the fd
760 * parameter is -1, then cancel all requests.
761 */
762 static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)763 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
764 {
765 aio_req_t *reqp;
766
767 sig_mutex_lock(&aiowp->work_qlock1);
768 /*
769 * cancel queued requests first.
770 */
771 reqp = aiowp->work_tail1;
772 while (reqp != NULL) {
773 if (fd < 0 || reqp->req_args.fd == fd) {
774 if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
775 /*
776 * Callers locks were dropped.
777 * reqp is invalid; start traversing
778 * the list from the beginning again.
779 */
780 reqp = aiowp->work_tail1;
781 continue;
782 }
783 }
784 reqp = reqp->req_next;
785 }
786 /*
787 * Since the queued requests have been canceled, there can
788 * only be one inprogress request that should be canceled.
789 */
790 if ((reqp = aiowp->work_req) != NULL &&
791 (fd < 0 || reqp->req_args.fd == fd))
792 (void) _aio_cancel_req(aiowp, reqp, canceled, done);
793 sig_mutex_unlock(&aiowp->work_qlock1);
794 }
795
796 /*
797 * Cancel a request. Return 1 if the callers locks were temporarily
798 * dropped, otherwise return 0.
799 */
800 int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)801 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
802 {
803 int ostate = reqp->req_state;
804
805 ASSERT(MUTEX_HELD(&__aio_mutex));
806 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
807 if (ostate == AIO_REQ_CANCELED)
808 return (0);
809 if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
810 aiowp->work_prev1 == reqp) {
811 ASSERT(aiowp->work_done1 != 0);
812 /*
813 * If not on the done queue yet, just mark it CANCELED,
814 * _aio_work_done() will do the necessary clean up.
815 * This is required to ensure that aiocancel_all() cancels
816 * all the outstanding requests, including this one which
817 * is not yet on done queue but has been marked done.
818 */
819 _aio_set_result(reqp, -1, ECANCELED);
820 (void) _aio_hash_del(reqp->req_resultp);
821 reqp->req_state = AIO_REQ_CANCELED;
822 (*canceled)++;
823 return (0);
824 }
825
826 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
827 (*done)++;
828 return (0);
829 }
830 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
831 ASSERT(POSIX_AIO(reqp));
832 /* Cancel the queued aio_fsync() request */
833 if (!reqp->req_head->lio_canned) {
834 reqp->req_head->lio_canned = 1;
835 _aio_outstand_cnt--;
836 (*canceled)++;
837 }
838 return (0);
839 }
840 reqp->req_state = AIO_REQ_CANCELED;
841 _aio_req_del(aiowp, reqp, ostate);
842 (void) _aio_hash_del(reqp->req_resultp);
843 (*canceled)++;
844 if (reqp == aiowp->work_req) {
845 ASSERT(ostate == AIO_REQ_INPROGRESS);
846 /*
847 * Set the result values now, before _aiodone() is called.
848 * We do this because the application can expect aio_return
849 * and aio_errno to be set to -1 and ECANCELED, respectively,
850 * immediately after a successful return from aiocancel()
851 * or aio_cancel().
852 */
853 _aio_set_result(reqp, -1, ECANCELED);
854 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
855 return (0);
856 }
857 if (!POSIX_AIO(reqp)) {
858 _aio_outstand_cnt--;
859 _aio_set_result(reqp, -1, ECANCELED);
860 _aio_req_free(reqp);
861 return (0);
862 }
863 sig_mutex_unlock(&aiowp->work_qlock1);
864 sig_mutex_unlock(&__aio_mutex);
865 _aiodone(reqp, -1, ECANCELED);
866 sig_mutex_lock(&__aio_mutex);
867 sig_mutex_lock(&aiowp->work_qlock1);
868 return (1);
869 }
870
871 int
_aio_create_worker(aio_req_t * reqp,int mode)872 _aio_create_worker(aio_req_t *reqp, int mode)
873 {
874 aio_worker_t *aiowp, **workers, **nextworker;
875 int *aio_workerscnt;
876 void *(*func)(void *);
877 sigset_t oset;
878 int error;
879
880 /*
881 * Put the new worker thread in the right queue.
882 */
883 switch (mode) {
884 case AIOREAD:
885 case AIOWRITE:
886 case AIOAREAD:
887 case AIOAWRITE:
888 #if !defined(_LP64)
889 case AIOAREAD64:
890 case AIOAWRITE64:
891 #endif
892 workers = &__workers_rw;
893 nextworker = &__nextworker_rw;
894 aio_workerscnt = &__rw_workerscnt;
895 func = _aio_do_request;
896 break;
897 case AIONOTIFY:
898 workers = &__workers_no;
899 nextworker = &__nextworker_no;
900 func = _aio_do_notify;
901 aio_workerscnt = &__no_workerscnt;
902 break;
903 default:
904 aio_panic("_aio_create_worker: invalid mode");
905 break;
906 }
907
908 if ((aiowp = _aio_worker_alloc()) == NULL)
909 return (-1);
910
911 if (reqp) {
912 reqp->req_state = AIO_REQ_QUEUED;
913 reqp->req_worker = aiowp;
914 aiowp->work_head1 = reqp;
915 aiowp->work_tail1 = reqp;
916 aiowp->work_next1 = reqp;
917 aiowp->work_count1 = 1;
918 aiowp->work_minload1 = 1;
919 }
920
921 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
922 error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
923 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
924 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
925 if (error) {
926 if (reqp) {
927 reqp->req_state = 0;
928 reqp->req_worker = NULL;
929 }
930 _aio_worker_free(aiowp);
931 return (-1);
932 }
933
934 lmutex_lock(&__aio_mutex);
935 (*aio_workerscnt)++;
936 if (*workers == NULL) {
937 aiowp->work_forw = aiowp;
938 aiowp->work_backw = aiowp;
939 *nextworker = aiowp;
940 *workers = aiowp;
941 } else {
942 aiowp->work_backw = (*workers)->work_backw;
943 aiowp->work_forw = (*workers);
944 (*workers)->work_backw->work_forw = aiowp;
945 (*workers)->work_backw = aiowp;
946 }
947 _aio_worker_cnt++;
948 lmutex_unlock(&__aio_mutex);
949
950 (void) thr_continue(aiowp->work_tid);
951
952 return (0);
953 }
954
955 /*
956 * This is the worker's main routine.
957 * The task of this function is to execute all queued requests;
958 * once the last pending request is executed this function will block
959 * in _aio_idle(). A new incoming request must wakeup this thread to
960 * restart the work.
961 * Every worker has an own work queue. The queue lock is required
962 * to synchronize the addition of new requests for this worker or
963 * cancellation of pending/running requests.
964 *
965 * Cancellation scenarios:
966 * The cancellation of a request is being done asynchronously using
967 * _aio_cancel_req() from another thread context.
968 * A queued request can be cancelled in different manners :
969 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
970 * - lock the queue -> remove the request -> unlock the queue
971 * - this function/thread does not detect this cancellation process
972 * b) request is in progress (AIO_REQ_INPROGRESS) :
973 * - this function first allow the cancellation of the running
974 * request with the flag "work_cancel_flg=1"
975 * see _aio_req_get() -> _aio_cancel_on()
976 * During this phase, it is allowed to interrupt the worker
977 * thread running the request (this thread) using the SIGAIOCANCEL
978 * signal.
979 * Once this thread returns from the kernel (because the request
980 * is just done), then it must disable a possible cancellation
981 * and proceed to finish the request. To disable the cancellation
982 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
983 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
984 * same procedure as in a)
985 *
986 * To b)
987 * This thread uses sigsetjmp() to define the position in the code, where
988 * it wish to continue working in the case that a SIGAIOCANCEL signal
989 * is detected.
990 * Normally this thread should get the cancellation signal during the
991 * kernel phase (reading or writing). In that case the signal handler
992 * aiosigcancelhndlr() is activated using the worker thread context,
993 * which again will use the siglongjmp() function to break the standard
994 * code flow and jump to the "sigsetjmp" position, provided that
995 * "work_cancel_flg" is set to "1".
996 * Because the "work_cancel_flg" is only manipulated by this worker
997 * thread and it can only run on one CPU at a given time, it is not
998 * necessary to protect that flag with the queue lock.
999 * Returning from the kernel (read or write system call) we must
1000 * first disable the use of the SIGAIOCANCEL signal and accordingly
1001 * the use of the siglongjmp() function to prevent a possible deadlock:
1002 * - It can happens that this worker thread returns from the kernel and
1003 * blocks in "work_qlock1",
1004 * - then a second thread cancels the apparently "in progress" request
1005 * and sends the SIGAIOCANCEL signal to the worker thread,
1006 * - the worker thread gets assigned the "work_qlock1" and will returns
1007 * from the kernel,
1008 * - the kernel detects the pending signal and activates the signal
1009 * handler instead,
1010 * - if the "work_cancel_flg" is still set then the signal handler
1011 * should use siglongjmp() to cancel the "in progress" request and
1012 * it would try to acquire the same work_qlock1 in _aio_req_get()
1013 * for a second time => deadlock.
1014 * To avoid that situation we disable the cancellation of the request
1015 * in progress BEFORE we try to acquire the work_qlock1.
1016 * In that case the signal handler will not call siglongjmp() and the
1017 * worker thread will continue running the standard code flow.
1018 * Then this thread must check the AIO_REQ_CANCELED flag to emulate
1019 * an eventually required siglongjmp() freeing the work_qlock1 and
1020 * avoiding a deadlock.
1021 */
1022 void *
_aio_do_request(void * arglist)1023 _aio_do_request(void *arglist)
1024 {
1025 aio_worker_t *aiowp = (aio_worker_t *)arglist;
1026 ulwp_t *self = curthread;
1027 struct aio_args *arg;
1028 aio_req_t *reqp; /* current AIO request */
1029 ssize_t retval;
1030 int append;
1031 int error;
1032
1033 if (pthread_setspecific(_aio_key, aiowp) != 0)
1034 aio_panic("_aio_do_request, pthread_setspecific()");
1035 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1036 ASSERT(aiowp->work_req == NULL);
1037
1038 /*
1039 * We resume here when an operation is cancelled.
1040 * On first entry, aiowp->work_req == NULL, so all
1041 * we do is block SIGAIOCANCEL.
1042 */
1043 (void) sigsetjmp(aiowp->work_jmp_buf, 0);
1044 ASSERT(self->ul_sigdefer == 0);
1045
1046 sigoff(self); /* block SIGAIOCANCEL */
1047 if (aiowp->work_req != NULL)
1048 _aio_finish_request(aiowp, -1, ECANCELED);
1049
1050 for (;;) {
1051 /*
1052 * Put completed requests on aio_done_list. This has
1053 * to be done as part of the main loop to ensure that
1054 * we don't artificially starve any aiowait'ers.
1055 */
1056 if (aiowp->work_done1)
1057 _aio_work_done(aiowp);
1058
1059 top:
1060 /* consume any deferred SIGAIOCANCEL signal here */
1061 sigon(self);
1062 sigoff(self);
1063
1064 while ((reqp = _aio_req_get(aiowp)) == NULL) {
1065 if (_aio_idle(aiowp) != 0)
1066 goto top;
1067 }
1068 arg = &reqp->req_args;
1069 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1070 reqp->req_state == AIO_REQ_CANCELED);
1071 error = 0;
1072
1073 switch (reqp->req_op) {
1074 case AIOREAD:
1075 case AIOAREAD:
1076 sigon(self); /* unblock SIGAIOCANCEL */
1077 retval = pread(arg->fd, arg->buf,
1078 arg->bufsz, arg->offset);
1079 if (retval == -1) {
1080 if (errno == ESPIPE) {
1081 retval = read(arg->fd,
1082 arg->buf, arg->bufsz);
1083 if (retval == -1)
1084 error = errno;
1085 } else {
1086 error = errno;
1087 }
1088 }
1089 sigoff(self); /* block SIGAIOCANCEL */
1090 break;
1091 case AIOWRITE:
1092 case AIOAWRITE:
1093 /*
1094 * The SUSv3 POSIX spec for aio_write() states:
1095 * If O_APPEND is set for the file descriptor,
1096 * write operations append to the file in the
1097 * same order as the calls were made.
1098 * but, somewhat inconsistently, it requires pwrite()
1099 * to ignore the O_APPEND setting. So we have to use
1100 * fcntl() to get the open modes and call write() for
1101 * the O_APPEND case.
1102 */
1103 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1104 sigon(self); /* unblock SIGAIOCANCEL */
1105 retval = append?
1106 write(arg->fd, arg->buf, arg->bufsz) :
1107 pwrite(arg->fd, arg->buf, arg->bufsz,
1108 arg->offset);
1109 if (retval == -1) {
1110 if (errno == ESPIPE) {
1111 retval = write(arg->fd,
1112 arg->buf, arg->bufsz);
1113 if (retval == -1)
1114 error = errno;
1115 } else {
1116 error = errno;
1117 }
1118 }
1119 sigoff(self); /* block SIGAIOCANCEL */
1120 break;
1121 #if !defined(_LP64)
1122 case AIOAREAD64:
1123 sigon(self); /* unblock SIGAIOCANCEL */
1124 retval = pread64(arg->fd, arg->buf,
1125 arg->bufsz, arg->offset);
1126 if (retval == -1) {
1127 if (errno == ESPIPE) {
1128 retval = read(arg->fd,
1129 arg->buf, arg->bufsz);
1130 if (retval == -1)
1131 error = errno;
1132 } else {
1133 error = errno;
1134 }
1135 }
1136 sigoff(self); /* block SIGAIOCANCEL */
1137 break;
1138 case AIOAWRITE64:
1139 /*
1140 * The SUSv3 POSIX spec for aio_write() states:
1141 * If O_APPEND is set for the file descriptor,
1142 * write operations append to the file in the
1143 * same order as the calls were made.
1144 * but, somewhat inconsistently, it requires pwrite()
1145 * to ignore the O_APPEND setting. So we have to use
1146 * fcntl() to get the open modes and call write() for
1147 * the O_APPEND case.
1148 */
1149 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1150 sigon(self); /* unblock SIGAIOCANCEL */
1151 retval = append?
1152 write(arg->fd, arg->buf, arg->bufsz) :
1153 pwrite64(arg->fd, arg->buf, arg->bufsz,
1154 arg->offset);
1155 if (retval == -1) {
1156 if (errno == ESPIPE) {
1157 retval = write(arg->fd,
1158 arg->buf, arg->bufsz);
1159 if (retval == -1)
1160 error = errno;
1161 } else {
1162 error = errno;
1163 }
1164 }
1165 sigoff(self); /* block SIGAIOCANCEL */
1166 break;
1167 #endif /* !defined(_LP64) */
1168 case AIOFSYNC:
1169 if (_aio_fsync_del(aiowp, reqp))
1170 goto top;
1171 ASSERT(reqp->req_head == NULL);
1172 /*
1173 * All writes for this fsync request are now
1174 * acknowledged. Now make these writes visible
1175 * and put the final request into the hash table.
1176 */
1177 if (reqp->req_state == AIO_REQ_CANCELED) {
1178 /* EMPTY */;
1179 } else if (arg->offset == O_SYNC) {
1180 if ((retval = __fdsync(arg->fd, FDSYNC_FILE)) ==
1181 -1) {
1182 error = errno;
1183 }
1184 } else {
1185 if ((retval = __fdsync(arg->fd, FDSYNC_DATA)) ==
1186 -1) {
1187 error = errno;
1188 }
1189 }
1190 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1191 aio_panic("_aio_do_request(): AIOFSYNC: "
1192 "request already in hash table");
1193 break;
1194 default:
1195 aio_panic("_aio_do_request, bad op");
1196 }
1197
1198 _aio_finish_request(aiowp, retval, error);
1199 }
1200 /* NOTREACHED */
1201 return (NULL);
1202 }
1203
1204 /*
1205 * Perform the tail processing for _aio_do_request().
1206 * The in-progress request may or may not have been cancelled.
1207 */
1208 static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)1209 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1210 {
1211 aio_req_t *reqp;
1212
1213 sig_mutex_lock(&aiowp->work_qlock1);
1214 if ((reqp = aiowp->work_req) == NULL)
1215 sig_mutex_unlock(&aiowp->work_qlock1);
1216 else {
1217 aiowp->work_req = NULL;
1218 if (reqp->req_state == AIO_REQ_CANCELED) {
1219 retval = -1;
1220 error = ECANCELED;
1221 }
1222 if (!POSIX_AIO(reqp)) {
1223 int notify;
1224 if (reqp->req_state == AIO_REQ_INPROGRESS) {
1225 reqp->req_state = AIO_REQ_DONE;
1226 _aio_set_result(reqp, retval, error);
1227 }
1228 sig_mutex_unlock(&aiowp->work_qlock1);
1229 sig_mutex_lock(&__aio_mutex);
1230 /*
1231 * If it was canceled, this request will not be
1232 * added to done list. Just free it.
1233 */
1234 if (error == ECANCELED) {
1235 _aio_outstand_cnt--;
1236 _aio_req_free(reqp);
1237 } else {
1238 _aio_req_done_cnt++;
1239 }
1240 /*
1241 * Notify any thread that may have blocked
1242 * because it saw an outstanding request.
1243 */
1244 notify = 0;
1245 if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1246 notify = 1;
1247 }
1248 sig_mutex_unlock(&__aio_mutex);
1249 if (notify) {
1250 (void) _kaio(AIONOTIFY);
1251 }
1252 } else {
1253 if (reqp->req_state == AIO_REQ_INPROGRESS)
1254 reqp->req_state = AIO_REQ_DONE;
1255 sig_mutex_unlock(&aiowp->work_qlock1);
1256 _aiodone(reqp, retval, error);
1257 }
1258 }
1259 }
1260
1261 void
_aio_req_mark_done(aio_req_t * reqp)1262 _aio_req_mark_done(aio_req_t *reqp)
1263 {
1264 #if !defined(_LP64)
1265 if (reqp->req_largefile)
1266 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1267 else
1268 #endif
1269 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1270 }
1271
1272 /*
1273 * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1274 * hopefully to consume one of our queued signals.
1275 */
1276 static void
_aio_delay(int ticks)1277 _aio_delay(int ticks)
1278 {
1279 (void) usleep(ticks * (MICROSEC / hz));
1280 }
1281
1282 /*
1283 * Actually send the notifications.
1284 * We could block indefinitely here if the application
1285 * is not listening for the signal or port notifications.
1286 */
1287 static void
send_notification(notif_param_t * npp)1288 send_notification(notif_param_t *npp)
1289 {
1290 extern int __sigqueue(pid_t pid, int signo,
1291 /* const union sigval */ void *value, int si_code, int block);
1292
1293 if (npp->np_signo)
1294 (void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1295 SI_ASYNCIO, 1);
1296 else if (npp->np_port >= 0)
1297 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1298 npp->np_event, npp->np_object, npp->np_user);
1299
1300 if (npp->np_lio_signo)
1301 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1302 SI_ASYNCIO, 1);
1303 else if (npp->np_lio_port >= 0)
1304 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1305 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1306 }
1307
1308 /*
1309 * Asynchronous notification worker.
1310 */
1311 void *
_aio_do_notify(void * arg)1312 _aio_do_notify(void *arg)
1313 {
1314 aio_worker_t *aiowp = (aio_worker_t *)arg;
1315 aio_req_t *reqp;
1316
1317 /*
1318 * This isn't really necessary. All signals are blocked.
1319 */
1320 if (pthread_setspecific(_aio_key, aiowp) != 0)
1321 aio_panic("_aio_do_notify, pthread_setspecific()");
1322
1323 /*
1324 * Notifications are never cancelled.
1325 * All signals remain blocked, forever.
1326 */
1327 for (;;) {
1328 while ((reqp = _aio_req_get(aiowp)) == NULL) {
1329 if (_aio_idle(aiowp) != 0)
1330 aio_panic("_aio_do_notify: _aio_idle() failed");
1331 }
1332 send_notification(&reqp->req_notify);
1333 _aio_req_free(reqp);
1334 }
1335
1336 /* NOTREACHED */
1337 return (NULL);
1338 }
1339
1340 /*
1341 * Do the completion semantics for a request that was either canceled
1342 * by _aio_cancel_req() or was completed by _aio_do_request().
1343 */
1344 static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)1345 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1346 {
1347 aio_result_t *resultp = reqp->req_resultp;
1348 int notify = 0;
1349 aio_lio_t *head;
1350 int sigev_none;
1351 int sigev_signal;
1352 int sigev_thread;
1353 int sigev_port;
1354 notif_param_t np;
1355
1356 /*
1357 * We call _aiodone() only for Posix I/O.
1358 */
1359 ASSERT(POSIX_AIO(reqp));
1360
1361 sigev_none = 0;
1362 sigev_signal = 0;
1363 sigev_thread = 0;
1364 sigev_port = 0;
1365 np.np_signo = 0;
1366 np.np_port = -1;
1367 np.np_lio_signo = 0;
1368 np.np_lio_port = -1;
1369
1370 switch (reqp->req_sigevent.sigev_notify) {
1371 case SIGEV_NONE:
1372 sigev_none = 1;
1373 break;
1374 case SIGEV_SIGNAL:
1375 sigev_signal = 1;
1376 break;
1377 case SIGEV_THREAD:
1378 sigev_thread = 1;
1379 break;
1380 case SIGEV_PORT:
1381 sigev_port = 1;
1382 break;
1383 default:
1384 aio_panic("_aiodone: improper sigev_notify");
1385 break;
1386 }
1387
1388 /*
1389 * Figure out the notification parameters while holding __aio_mutex.
1390 * Actually perform the notifications after dropping __aio_mutex.
1391 * This allows us to sleep for a long time (if the notifications
1392 * incur delays) without impeding other async I/O operations.
1393 */
1394
1395 sig_mutex_lock(&__aio_mutex);
1396
1397 if (sigev_signal) {
1398 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1399 notify = 1;
1400 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1401 } else if (sigev_thread | sigev_port) {
1402 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1403 notify = 1;
1404 np.np_event = reqp->req_op;
1405 if (np.np_event == AIOFSYNC && reqp->req_largefile)
1406 np.np_event = AIOFSYNC64;
1407 np.np_object = (uintptr_t)reqp->req_aiocbp;
1408 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1409 }
1410
1411 if (resultp->aio_errno == EINPROGRESS)
1412 _aio_set_result(reqp, retval, error);
1413
1414 _aio_outstand_cnt--;
1415
1416 head = reqp->req_head;
1417 reqp->req_head = NULL;
1418
1419 if (sigev_none) {
1420 _aio_enq_doneq(reqp);
1421 reqp = NULL;
1422 } else {
1423 (void) _aio_hash_del(resultp);
1424 _aio_req_mark_done(reqp);
1425 }
1426
1427 _aio_waitn_wakeup();
1428
1429 /*
1430 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1431 * __aio_suspend() increments "_aio_kernel_suspend"
1432 * when they are waiting in the kernel for completed I/Os.
1433 *
1434 * _kaio(AIONOTIFY) awakes the corresponding function
1435 * in the kernel; then the corresponding __aio_waitn() or
1436 * __aio_suspend() function could reap the recently
1437 * completed I/Os (_aiodone()).
1438 */
1439 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1440 (void) _kaio(AIONOTIFY);
1441
1442 sig_mutex_unlock(&__aio_mutex);
1443
1444 if (head != NULL) {
1445 /*
1446 * If all the lio requests have completed,
1447 * prepare to notify the waiting thread.
1448 */
1449 sig_mutex_lock(&head->lio_mutex);
1450 ASSERT(head->lio_refcnt == head->lio_nent);
1451 if (head->lio_refcnt == 1) {
1452 int waiting = 0;
1453 if (head->lio_mode == LIO_WAIT) {
1454 if ((waiting = head->lio_waiting) != 0)
1455 (void) cond_signal(&head->lio_cond_cv);
1456 } else if (head->lio_port < 0) { /* none or signal */
1457 if ((np.np_lio_signo = head->lio_signo) != 0)
1458 notify = 1;
1459 np.np_lio_user = head->lio_sigval.sival_ptr;
1460 } else { /* thread or port */
1461 notify = 1;
1462 np.np_lio_port = head->lio_port;
1463 np.np_lio_event = head->lio_event;
1464 np.np_lio_object =
1465 (uintptr_t)head->lio_sigevent;
1466 np.np_lio_user = head->lio_sigval.sival_ptr;
1467 }
1468 head->lio_nent = head->lio_refcnt = 0;
1469 sig_mutex_unlock(&head->lio_mutex);
1470 if (waiting == 0)
1471 _aio_lio_free(head);
1472 } else {
1473 head->lio_nent--;
1474 head->lio_refcnt--;
1475 sig_mutex_unlock(&head->lio_mutex);
1476 }
1477 }
1478
1479 /*
1480 * The request is completed; now perform the notifications.
1481 */
1482 if (notify) {
1483 if (reqp != NULL) {
1484 /*
1485 * We usually put the request on the notification
1486 * queue because we don't want to block and delay
1487 * other operations behind us in the work queue.
1488 * Also we must never block on a cancel notification
1489 * because we are being called from an application
1490 * thread in this case and that could lead to deadlock
1491 * if no other thread is receiving notificatins.
1492 */
1493 reqp->req_notify = np;
1494 reqp->req_op = AIONOTIFY;
1495 _aio_req_add(reqp, &__workers_no, AIONOTIFY);
1496 reqp = NULL;
1497 } else {
1498 /*
1499 * We already put the request on the done queue,
1500 * so we can't queue it to the notification queue.
1501 * Just do the notification directly.
1502 */
1503 send_notification(&np);
1504 }
1505 }
1506
1507 if (reqp != NULL)
1508 _aio_req_free(reqp);
1509 }
1510
1511 /*
1512 * Delete fsync requests from list head until there is
1513 * only one left. Return 0 when there is only one,
1514 * otherwise return a non-zero value.
1515 */
1516 static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)1517 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1518 {
1519 aio_lio_t *head = reqp->req_head;
1520 int rval = 0;
1521
1522 ASSERT(reqp == aiowp->work_req);
1523 sig_mutex_lock(&aiowp->work_qlock1);
1524 sig_mutex_lock(&head->lio_mutex);
1525 if (head->lio_refcnt > 1) {
1526 head->lio_refcnt--;
1527 head->lio_nent--;
1528 aiowp->work_req = NULL;
1529 sig_mutex_unlock(&head->lio_mutex);
1530 sig_mutex_unlock(&aiowp->work_qlock1);
1531 sig_mutex_lock(&__aio_mutex);
1532 _aio_outstand_cnt--;
1533 _aio_waitn_wakeup();
1534 sig_mutex_unlock(&__aio_mutex);
1535 _aio_req_free(reqp);
1536 return (1);
1537 }
1538 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1539 reqp->req_head = NULL;
1540 if (head->lio_canned)
1541 reqp->req_state = AIO_REQ_CANCELED;
1542 if (head->lio_mode == LIO_DESTROY) {
1543 aiowp->work_req = NULL;
1544 rval = 1;
1545 }
1546 sig_mutex_unlock(&head->lio_mutex);
1547 sig_mutex_unlock(&aiowp->work_qlock1);
1548 head->lio_refcnt--;
1549 head->lio_nent--;
1550 _aio_lio_free(head);
1551 if (rval != 0)
1552 _aio_req_free(reqp);
1553 return (rval);
1554 }
1555
1556 /*
1557 * A worker is set idle when its work queue is empty.
1558 * The worker checks again that it has no more work
1559 * and then goes to sleep waiting for more work.
1560 */
1561 int
_aio_idle(aio_worker_t * aiowp)1562 _aio_idle(aio_worker_t *aiowp)
1563 {
1564 int error = 0;
1565
1566 sig_mutex_lock(&aiowp->work_qlock1);
1567 if (aiowp->work_count1 == 0) {
1568 ASSERT(aiowp->work_minload1 == 0);
1569 aiowp->work_idleflg = 1;
1570 /*
1571 * A cancellation handler is not needed here.
1572 * aio worker threads are never cancelled via pthread_cancel().
1573 */
1574 error = sig_cond_wait(&aiowp->work_idle_cv,
1575 &aiowp->work_qlock1);
1576 /*
1577 * The idle flag is normally cleared before worker is awakened
1578 * by aio_req_add(). On error (EINTR), we clear it ourself.
1579 */
1580 if (error)
1581 aiowp->work_idleflg = 0;
1582 }
1583 sig_mutex_unlock(&aiowp->work_qlock1);
1584 return (error);
1585 }
1586
1587 /*
1588 * A worker's completed AIO requests are placed onto a global
1589 * done queue. The application is only sent a SIGIO signal if
1590 * the process has a handler enabled and it is not waiting via
1591 * aiowait().
1592 */
1593 static void
_aio_work_done(aio_worker_t * aiowp)1594 _aio_work_done(aio_worker_t *aiowp)
1595 {
1596 aio_req_t *reqp;
1597
1598 sig_mutex_lock(&__aio_mutex);
1599 sig_mutex_lock(&aiowp->work_qlock1);
1600 reqp = aiowp->work_prev1;
1601 reqp->req_next = NULL;
1602 aiowp->work_done1 = 0;
1603 aiowp->work_tail1 = aiowp->work_next1;
1604 if (aiowp->work_tail1 == NULL)
1605 aiowp->work_head1 = NULL;
1606 aiowp->work_prev1 = NULL;
1607 _aio_outstand_cnt--;
1608 _aio_req_done_cnt--;
1609 if (reqp->req_state == AIO_REQ_CANCELED) {
1610 /*
1611 * Request got cancelled after it was marked done. This can
1612 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1613 * and drops all locks. Don't add the request to the done
1614 * queue and just discard it.
1615 */
1616 sig_mutex_unlock(&aiowp->work_qlock1);
1617 _aio_req_free(reqp);
1618 if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1619 sig_mutex_unlock(&__aio_mutex);
1620 (void) _kaio(AIONOTIFY);
1621 } else {
1622 sig_mutex_unlock(&__aio_mutex);
1623 }
1624 return;
1625 }
1626 sig_mutex_unlock(&aiowp->work_qlock1);
1627 _aio_donecnt++;
1628 ASSERT(_aio_donecnt > 0 &&
1629 _aio_outstand_cnt >= 0 &&
1630 _aio_req_done_cnt >= 0);
1631 ASSERT(reqp != NULL);
1632
1633 if (_aio_done_tail == NULL) {
1634 _aio_done_head = _aio_done_tail = reqp;
1635 } else {
1636 _aio_done_head->req_next = reqp;
1637 _aio_done_head = reqp;
1638 }
1639
1640 if (_aiowait_flag) {
1641 sig_mutex_unlock(&__aio_mutex);
1642 (void) _kaio(AIONOTIFY);
1643 } else {
1644 sig_mutex_unlock(&__aio_mutex);
1645 if (_sigio_enabled)
1646 (void) kill(__pid, SIGIO);
1647 }
1648 }
1649
1650 /*
1651 * The done queue consists of AIO requests that are in either the
1652 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled
1653 * are discarded. If the done queue is empty then NULL is returned.
1654 * Otherwise the address of a done aio_result_t is returned.
1655 */
1656 aio_result_t *
_aio_req_done(void)1657 _aio_req_done(void)
1658 {
1659 aio_req_t *reqp;
1660 aio_result_t *resultp;
1661
1662 ASSERT(MUTEX_HELD(&__aio_mutex));
1663
1664 if ((reqp = _aio_done_tail) != NULL) {
1665 if ((_aio_done_tail = reqp->req_next) == NULL)
1666 _aio_done_head = NULL;
1667 ASSERT(_aio_donecnt > 0);
1668 _aio_donecnt--;
1669 (void) _aio_hash_del(reqp->req_resultp);
1670 resultp = reqp->req_resultp;
1671 ASSERT(reqp->req_state == AIO_REQ_DONE);
1672 _aio_req_free(reqp);
1673 return (resultp);
1674 }
1675 /* is queue empty? */
1676 if (reqp == NULL && _aio_outstand_cnt == 0) {
1677 return ((aio_result_t *)-1);
1678 }
1679 return (NULL);
1680 }
1681
1682 /*
1683 * Set the return and errno values for the application's use.
1684 *
1685 * For the Posix interfaces, we must set the return value first followed
1686 * by the errno value because the Posix interfaces allow for a change
1687 * in the errno value from EINPROGRESS to something else to signal
1688 * the completion of the asynchronous request.
1689 *
1690 * The opposite is true for the Solaris interfaces. These allow for
1691 * a change in the return value from AIO_INPROGRESS to something else
1692 * to signal the completion of the asynchronous request.
1693 */
1694 void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)1695 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1696 {
1697 aio_result_t *resultp = reqp->req_resultp;
1698
1699 if (POSIX_AIO(reqp)) {
1700 resultp->aio_return = retval;
1701 membar_producer();
1702 resultp->aio_errno = error;
1703 } else {
1704 resultp->aio_errno = error;
1705 membar_producer();
1706 resultp->aio_return = retval;
1707 }
1708 }
1709
1710 /*
1711 * Add an AIO request onto the next work queue.
1712 * A circular list of workers is used to choose the next worker.
1713 */
1714 void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)1715 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1716 {
1717 ulwp_t *self = curthread;
1718 aio_worker_t *aiowp;
1719 aio_worker_t *first;
1720 int load_bal_flg = 1;
1721 int found;
1722
1723 ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1724 reqp->req_next = NULL;
1725 /*
1726 * Try to acquire the next worker's work queue. If it is locked,
1727 * then search the list of workers until a queue is found unlocked,
1728 * or until the list is completely traversed at which point another
1729 * worker will be created.
1730 */
1731 sigoff(self); /* defer SIGIO */
1732 sig_mutex_lock(&__aio_mutex);
1733 first = aiowp = *nextworker;
1734 if (mode != AIONOTIFY)
1735 _aio_outstand_cnt++;
1736 sig_mutex_unlock(&__aio_mutex);
1737
1738 switch (mode) {
1739 case AIOREAD:
1740 case AIOWRITE:
1741 case AIOAREAD:
1742 case AIOAWRITE:
1743 #if !defined(_LP64)
1744 case AIOAREAD64:
1745 case AIOAWRITE64:
1746 #endif
1747 /* try to find an idle worker */
1748 found = 0;
1749 do {
1750 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1751 if (aiowp->work_idleflg) {
1752 found = 1;
1753 break;
1754 }
1755 sig_mutex_unlock(&aiowp->work_qlock1);
1756 }
1757 } while ((aiowp = aiowp->work_forw) != first);
1758
1759 if (found) {
1760 aiowp->work_minload1++;
1761 break;
1762 }
1763
1764 /* try to acquire some worker's queue lock */
1765 do {
1766 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1767 found = 1;
1768 break;
1769 }
1770 } while ((aiowp = aiowp->work_forw) != first);
1771
1772 /*
1773 * Create more workers when the workers appear overloaded.
1774 * Either all the workers are busy draining their queues
1775 * or no worker's queue lock could be acquired.
1776 */
1777 if (!found) {
1778 if (_aio_worker_cnt < _max_workers) {
1779 if (_aio_create_worker(reqp, mode))
1780 aio_panic("_aio_req_add: add worker");
1781 sigon(self); /* reenable SIGIO */
1782 return;
1783 }
1784
1785 /*
1786 * No worker available and we have created
1787 * _max_workers, keep going through the
1788 * list slowly until we get a lock
1789 */
1790 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1791 /*
1792 * give someone else a chance
1793 */
1794 _aio_delay(1);
1795 aiowp = aiowp->work_forw;
1796 }
1797 }
1798
1799 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1800 if (_aio_worker_cnt < _max_workers &&
1801 aiowp->work_minload1 >= _minworkload) {
1802 sig_mutex_unlock(&aiowp->work_qlock1);
1803 sig_mutex_lock(&__aio_mutex);
1804 *nextworker = aiowp->work_forw;
1805 sig_mutex_unlock(&__aio_mutex);
1806 if (_aio_create_worker(reqp, mode))
1807 aio_panic("aio_req_add: add worker");
1808 sigon(self); /* reenable SIGIO */
1809 return;
1810 }
1811 aiowp->work_minload1++;
1812 break;
1813 case AIOFSYNC:
1814 case AIONOTIFY:
1815 load_bal_flg = 0;
1816 sig_mutex_lock(&aiowp->work_qlock1);
1817 break;
1818 default:
1819 aio_panic("_aio_req_add: invalid mode");
1820 break;
1821 }
1822 /*
1823 * Put request onto worker's work queue.
1824 */
1825 if (aiowp->work_tail1 == NULL) {
1826 ASSERT(aiowp->work_count1 == 0);
1827 aiowp->work_tail1 = reqp;
1828 aiowp->work_next1 = reqp;
1829 } else {
1830 aiowp->work_head1->req_next = reqp;
1831 if (aiowp->work_next1 == NULL)
1832 aiowp->work_next1 = reqp;
1833 }
1834 reqp->req_state = AIO_REQ_QUEUED;
1835 reqp->req_worker = aiowp;
1836 aiowp->work_head1 = reqp;
1837 /*
1838 * Awaken worker if it is not currently active.
1839 */
1840 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1841 aiowp->work_idleflg = 0;
1842 (void) cond_signal(&aiowp->work_idle_cv);
1843 }
1844 sig_mutex_unlock(&aiowp->work_qlock1);
1845
1846 if (load_bal_flg) {
1847 sig_mutex_lock(&__aio_mutex);
1848 *nextworker = aiowp->work_forw;
1849 sig_mutex_unlock(&__aio_mutex);
1850 }
1851 sigon(self); /* reenable SIGIO */
1852 }
1853
1854 /*
1855 * Get an AIO request for a specified worker.
1856 * If the work queue is empty, return NULL.
1857 */
1858 aio_req_t *
_aio_req_get(aio_worker_t * aiowp)1859 _aio_req_get(aio_worker_t *aiowp)
1860 {
1861 aio_req_t *reqp;
1862
1863 sig_mutex_lock(&aiowp->work_qlock1);
1864 if ((reqp = aiowp->work_next1) != NULL) {
1865 /*
1866 * Remove a POSIX request from the queue; the
1867 * request queue is a singularly linked list
1868 * with a previous pointer. The request is
1869 * removed by updating the previous pointer.
1870 *
1871 * Non-posix requests are left on the queue
1872 * to eventually be placed on the done queue.
1873 */
1874
1875 if (POSIX_AIO(reqp)) {
1876 if (aiowp->work_prev1 == NULL) {
1877 aiowp->work_tail1 = reqp->req_next;
1878 if (aiowp->work_tail1 == NULL)
1879 aiowp->work_head1 = NULL;
1880 } else {
1881 aiowp->work_prev1->req_next = reqp->req_next;
1882 if (aiowp->work_head1 == reqp)
1883 aiowp->work_head1 = reqp->req_next;
1884 }
1885
1886 } else {
1887 aiowp->work_prev1 = reqp;
1888 ASSERT(aiowp->work_done1 >= 0);
1889 aiowp->work_done1++;
1890 }
1891 ASSERT(reqp != reqp->req_next);
1892 aiowp->work_next1 = reqp->req_next;
1893 ASSERT(aiowp->work_count1 >= 1);
1894 aiowp->work_count1--;
1895 switch (reqp->req_op) {
1896 case AIOREAD:
1897 case AIOWRITE:
1898 case AIOAREAD:
1899 case AIOAWRITE:
1900 #if !defined(_LP64)
1901 case AIOAREAD64:
1902 case AIOAWRITE64:
1903 #endif
1904 ASSERT(aiowp->work_minload1 > 0);
1905 aiowp->work_minload1--;
1906 break;
1907 }
1908 reqp->req_state = AIO_REQ_INPROGRESS;
1909 }
1910 aiowp->work_req = reqp;
1911 ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1912 sig_mutex_unlock(&aiowp->work_qlock1);
1913 return (reqp);
1914 }
1915
1916 static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)1917 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1918 {
1919 aio_req_t **last;
1920 aio_req_t *lastrp;
1921 aio_req_t *next;
1922
1923 ASSERT(aiowp != NULL);
1924 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1925 if (POSIX_AIO(reqp)) {
1926 if (ostate != AIO_REQ_QUEUED)
1927 return;
1928 }
1929 last = &aiowp->work_tail1;
1930 lastrp = aiowp->work_tail1;
1931 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1932 while ((next = *last) != NULL) {
1933 if (next == reqp) {
1934 *last = next->req_next;
1935 if (aiowp->work_next1 == next)
1936 aiowp->work_next1 = next->req_next;
1937
1938 /*
1939 * if this is the first request on the queue, move
1940 * the lastrp pointer forward.
1941 */
1942 if (lastrp == next)
1943 lastrp = next->req_next;
1944
1945 /*
1946 * if this request is pointed by work_head1, then
1947 * make work_head1 point to the last request that is
1948 * present on the queue.
1949 */
1950 if (aiowp->work_head1 == next)
1951 aiowp->work_head1 = lastrp;
1952
1953 /*
1954 * work_prev1 is used only in non posix case and it
1955 * points to the current AIO_REQ_INPROGRESS request.
1956 * If work_prev1 points to this request which is being
1957 * deleted, make work_prev1 NULL and set work_done1
1958 * to 0.
1959 *
1960 * A worker thread can be processing only one request
1961 * at a time.
1962 */
1963 if (aiowp->work_prev1 == next) {
1964 ASSERT(ostate == AIO_REQ_INPROGRESS &&
1965 !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1966 aiowp->work_prev1 = NULL;
1967 aiowp->work_done1--;
1968 }
1969
1970 if (ostate == AIO_REQ_QUEUED) {
1971 ASSERT(aiowp->work_count1 >= 1);
1972 aiowp->work_count1--;
1973 ASSERT(aiowp->work_minload1 >= 1);
1974 aiowp->work_minload1--;
1975 }
1976 return;
1977 }
1978 last = &next->req_next;
1979 lastrp = next;
1980 }
1981 /* NOTREACHED */
1982 }
1983
1984 static void
_aio_enq_doneq(aio_req_t * reqp)1985 _aio_enq_doneq(aio_req_t *reqp)
1986 {
1987 if (_aio_doneq == NULL) {
1988 _aio_doneq = reqp;
1989 reqp->req_next = reqp->req_prev = reqp;
1990 } else {
1991 reqp->req_next = _aio_doneq;
1992 reqp->req_prev = _aio_doneq->req_prev;
1993 _aio_doneq->req_prev->req_next = reqp;
1994 _aio_doneq->req_prev = reqp;
1995 }
1996 reqp->req_state = AIO_REQ_DONEQ;
1997 _aio_doneq_cnt++;
1998 }
1999
2000 /*
2001 * caller owns the _aio_mutex
2002 */
2003 aio_req_t *
_aio_req_remove(aio_req_t * reqp)2004 _aio_req_remove(aio_req_t *reqp)
2005 {
2006 if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2007 return (NULL);
2008
2009 if (reqp) {
2010 /* request in done queue */
2011 if (_aio_doneq == reqp)
2012 _aio_doneq = reqp->req_next;
2013 if (_aio_doneq == reqp) {
2014 /* only one request on queue */
2015 _aio_doneq = NULL;
2016 } else {
2017 aio_req_t *tmp = reqp->req_next;
2018 reqp->req_prev->req_next = tmp;
2019 tmp->req_prev = reqp->req_prev;
2020 }
2021 } else if ((reqp = _aio_doneq) != NULL) {
2022 if (reqp == reqp->req_next) {
2023 /* only one request on queue */
2024 _aio_doneq = NULL;
2025 } else {
2026 reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2027 _aio_doneq->req_prev = reqp->req_prev;
2028 }
2029 }
2030 if (reqp) {
2031 _aio_doneq_cnt--;
2032 reqp->req_next = reqp->req_prev = reqp;
2033 reqp->req_state = AIO_REQ_DONE;
2034 }
2035 return (reqp);
2036 }
2037
2038 /*
2039 * An AIO request is identified by an aio_result_t pointer. The library
2040 * maps this aio_result_t pointer to its internal representation using a
2041 * hash table. This function adds an aio_result_t pointer to the hash table.
2042 */
2043 static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)2044 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2045 {
2046 aio_hash_t *hashp;
2047 aio_req_t **prev;
2048 aio_req_t *next;
2049
2050 hashp = _aio_hash + AIOHASH(resultp);
2051 lmutex_lock(&hashp->hash_lock);
2052 prev = &hashp->hash_ptr;
2053 while ((next = *prev) != NULL) {
2054 if (resultp == next->req_resultp) {
2055 lmutex_unlock(&hashp->hash_lock);
2056 return (-1);
2057 }
2058 prev = &next->req_link;
2059 }
2060 *prev = reqp;
2061 ASSERT(reqp->req_link == NULL);
2062 lmutex_unlock(&hashp->hash_lock);
2063 return (0);
2064 }
2065
2066 /*
2067 * Remove an entry from the hash table.
2068 */
2069 aio_req_t *
_aio_hash_del(aio_result_t * resultp)2070 _aio_hash_del(aio_result_t *resultp)
2071 {
2072 aio_hash_t *hashp;
2073 aio_req_t **prev;
2074 aio_req_t *next = NULL;
2075
2076 if (_aio_hash != NULL) {
2077 hashp = _aio_hash + AIOHASH(resultp);
2078 lmutex_lock(&hashp->hash_lock);
2079 prev = &hashp->hash_ptr;
2080 while ((next = *prev) != NULL) {
2081 if (resultp == next->req_resultp) {
2082 *prev = next->req_link;
2083 next->req_link = NULL;
2084 break;
2085 }
2086 prev = &next->req_link;
2087 }
2088 lmutex_unlock(&hashp->hash_lock);
2089 }
2090 return (next);
2091 }
2092
2093 /*
2094 * find an entry in the hash table
2095 */
2096 aio_req_t *
_aio_hash_find(aio_result_t * resultp)2097 _aio_hash_find(aio_result_t *resultp)
2098 {
2099 aio_hash_t *hashp;
2100 aio_req_t **prev;
2101 aio_req_t *next = NULL;
2102
2103 if (_aio_hash != NULL) {
2104 hashp = _aio_hash + AIOHASH(resultp);
2105 lmutex_lock(&hashp->hash_lock);
2106 prev = &hashp->hash_ptr;
2107 while ((next = *prev) != NULL) {
2108 if (resultp == next->req_resultp)
2109 break;
2110 prev = &next->req_link;
2111 }
2112 lmutex_unlock(&hashp->hash_lock);
2113 }
2114 return (next);
2115 }
2116
2117 /*
2118 * AIO interface for POSIX
2119 */
2120 int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2121 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2122 int mode, int flg)
2123 {
2124 aio_req_t *reqp;
2125 aio_args_t *ap;
2126 int kerr;
2127
2128 if (aiocbp == NULL) {
2129 errno = EINVAL;
2130 return (-1);
2131 }
2132
2133 /* initialize kaio */
2134 if (!_kaio_ok)
2135 _kaio_init();
2136
2137 aiocbp->aio_state = NOCHECK;
2138
2139 /*
2140 * If we have been called because a list I/O
2141 * kaio() failed, we dont want to repeat the
2142 * system call
2143 */
2144
2145 if (flg & AIO_KAIO) {
2146 /*
2147 * Try kernel aio first.
2148 * If errno is ENOTSUP/EBADFD,
2149 * fall back to the thread implementation.
2150 */
2151 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2152 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2153 aiocbp->aio_state = CHECK;
2154 kerr = (int)_kaio(mode, aiocbp);
2155 if (kerr == 0)
2156 return (0);
2157 if (errno != ENOTSUP && errno != EBADFD) {
2158 aiocbp->aio_resultp.aio_errno = errno;
2159 aiocbp->aio_resultp.aio_return = -1;
2160 aiocbp->aio_state = NOCHECK;
2161 return (-1);
2162 }
2163 if (errno == EBADFD)
2164 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2165 }
2166 }
2167
2168 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2169 aiocbp->aio_state = USERAIO;
2170
2171 if (!__uaio_ok && __uaio_init() == -1)
2172 return (-1);
2173
2174 if ((reqp = _aio_req_alloc()) == NULL) {
2175 errno = EAGAIN;
2176 return (-1);
2177 }
2178
2179 /*
2180 * If an LIO request, add the list head to the aio request
2181 */
2182 reqp->req_head = lio_head;
2183 reqp->req_type = AIO_POSIX_REQ;
2184 reqp->req_op = mode;
2185 reqp->req_largefile = 0;
2186
2187 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2188 reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2189 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2190 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2191 reqp->req_sigevent.sigev_signo =
2192 aiocbp->aio_sigevent.sigev_signo;
2193 reqp->req_sigevent.sigev_value.sival_ptr =
2194 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2195 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2196 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2197 reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2198 /*
2199 * Reuse the sigevent structure to contain the port number
2200 * and the user value. Same for SIGEV_THREAD, below.
2201 */
2202 reqp->req_sigevent.sigev_signo =
2203 pn->portnfy_port;
2204 reqp->req_sigevent.sigev_value.sival_ptr =
2205 pn->portnfy_user;
2206 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2207 reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2208 /*
2209 * The sigevent structure contains the port number
2210 * and the user value. Same for SIGEV_PORT, above.
2211 */
2212 reqp->req_sigevent.sigev_signo =
2213 aiocbp->aio_sigevent.sigev_signo;
2214 reqp->req_sigevent.sigev_value.sival_ptr =
2215 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2216 }
2217
2218 reqp->req_resultp = &aiocbp->aio_resultp;
2219 reqp->req_aiocbp = aiocbp;
2220 ap = &reqp->req_args;
2221 ap->fd = aiocbp->aio_fildes;
2222 ap->buf = (caddr_t)aiocbp->aio_buf;
2223 ap->bufsz = aiocbp->aio_nbytes;
2224 ap->offset = aiocbp->aio_offset;
2225
2226 if ((flg & AIO_NO_DUPS) &&
2227 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2228 aio_panic("_aio_rw(): request already in hash table");
2229 }
2230 _aio_req_add(reqp, nextworker, mode);
2231 return (0);
2232 }
2233
2234 #if !defined(_LP64)
2235 /*
2236 * 64-bit AIO interface for POSIX
2237 */
2238 int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2239 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2240 int mode, int flg)
2241 {
2242 aio_req_t *reqp;
2243 aio_args_t *ap;
2244 int kerr;
2245
2246 if (aiocbp == NULL) {
2247 errno = EINVAL;
2248 return (-1);
2249 }
2250
2251 /* initialize kaio */
2252 if (!_kaio_ok)
2253 _kaio_init();
2254
2255 aiocbp->aio_state = NOCHECK;
2256
2257 /*
2258 * If we have been called because a list I/O
2259 * kaio() failed, we dont want to repeat the
2260 * system call
2261 */
2262
2263 if (flg & AIO_KAIO) {
2264 /*
2265 * Try kernel aio first.
2266 * If errno is ENOTSUP/EBADFD,
2267 * fall back to the thread implementation.
2268 */
2269 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2270 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2271 aiocbp->aio_state = CHECK;
2272 kerr = (int)_kaio(mode, aiocbp);
2273 if (kerr == 0)
2274 return (0);
2275 if (errno != ENOTSUP && errno != EBADFD) {
2276 aiocbp->aio_resultp.aio_errno = errno;
2277 aiocbp->aio_resultp.aio_return = -1;
2278 aiocbp->aio_state = NOCHECK;
2279 return (-1);
2280 }
2281 if (errno == EBADFD)
2282 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2283 }
2284 }
2285
2286 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2287 aiocbp->aio_state = USERAIO;
2288
2289 if (!__uaio_ok && __uaio_init() == -1)
2290 return (-1);
2291
2292 if ((reqp = _aio_req_alloc()) == NULL) {
2293 errno = EAGAIN;
2294 return (-1);
2295 }
2296
2297 /*
2298 * If an LIO request, add the list head to the aio request
2299 */
2300 reqp->req_head = lio_head;
2301 reqp->req_type = AIO_POSIX_REQ;
2302 reqp->req_op = mode;
2303 reqp->req_largefile = 1;
2304
2305 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2306 reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2307 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2308 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2309 reqp->req_sigevent.sigev_signo =
2310 aiocbp->aio_sigevent.sigev_signo;
2311 reqp->req_sigevent.sigev_value.sival_ptr =
2312 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2313 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2314 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2315 reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2316 reqp->req_sigevent.sigev_signo =
2317 pn->portnfy_port;
2318 reqp->req_sigevent.sigev_value.sival_ptr =
2319 pn->portnfy_user;
2320 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2321 reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2322 reqp->req_sigevent.sigev_signo =
2323 aiocbp->aio_sigevent.sigev_signo;
2324 reqp->req_sigevent.sigev_value.sival_ptr =
2325 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2326 }
2327
2328 reqp->req_resultp = &aiocbp->aio_resultp;
2329 reqp->req_aiocbp = aiocbp;
2330 ap = &reqp->req_args;
2331 ap->fd = aiocbp->aio_fildes;
2332 ap->buf = (caddr_t)aiocbp->aio_buf;
2333 ap->bufsz = aiocbp->aio_nbytes;
2334 ap->offset = aiocbp->aio_offset;
2335
2336 if ((flg & AIO_NO_DUPS) &&
2337 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2338 aio_panic("_aio_rw64(): request already in hash table");
2339 }
2340 _aio_req_add(reqp, nextworker, mode);
2341 return (0);
2342 }
2343 #endif /* !defined(_LP64) */
2344