xref: /titanic_51/usr/src/lib/libc/port/aio/aio.c (revision 52978630c494bee8d54ed3f55387ab291818be9d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "synonyms.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fdsync(int, int);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48 
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53 
54 /*
55  * switch for kernel async I/O
56  */
57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58 
59 /*
60  * Key for thread-specific data
61  */
62 pthread_key_t _aio_key;
63 
64 /*
65  * Array for determining whether or not a file supports kaio.
66  * Initialized in _kaio_init().
67  */
68 uint32_t *_kaio_supported = NULL;
69 
70 /*
71  *  workers for read/write requests
72  * (__aio_mutex lock protects circular linked list of workers)
73  */
74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76 int __rw_workerscnt;		/* number of read/write workers */
77 
78 /*
79  * worker for notification requests.
80  */
81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83 int __no_workerscnt;		/* number of write workers */
84 
85 aio_req_t *_aio_done_tail;		/* list of done requests */
86 aio_req_t *_aio_done_head;
87 
88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91 
92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94 
95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97 
98 aio_hash_t *_aio_hash;
99 
100 aio_req_t *_aio_doneq;			/* double linked done queue list */
101 
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110 
111 int _max_workers = 256;			/* max number of workers permitted */
112 int _min_workers = 4;			/* min number of workers */
113 int _minworkload = 2;			/* min number of request in q */
114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
115 int __uaio_ok = 0;			/* AIO has been enabled */
116 sigset_t _worker_set;			/* worker's signal mask */
117 
118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119 int _aio_flags = 0;			/* see asyncio.h defines for */
120 
121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122 
123 int hz;					/* clock ticks per second */
124 
125 static int
126 _kaio_supported_init(void)
127 {
128 	void *ptr;
129 	size_t size;
130 
131 	if (_kaio_supported != NULL)	/* already initialized */
132 		return (0);
133 
134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 	if (ptr == MAP_FAILED)
138 		return (-1);
139 	_kaio_supported = ptr;
140 	return (0);
141 }
142 
143 /*
144  * The aio subsystem is initialized when an AIO request is made.
145  * Constants are initialized like the max number of workers that
146  * the subsystem can create, and the minimum number of workers
147  * permitted before imposing some restrictions.  Also, some
148  * workers are created.
149  */
150 int
151 __uaio_init(void)
152 {
153 	int ret = -1;
154 	int i;
155 
156 	lmutex_lock(&__aio_initlock);
157 	while (__aio_initbusy)
158 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
159 	if (__uaio_ok) {	/* already initialized */
160 		lmutex_unlock(&__aio_initlock);
161 		return (0);
162 	}
163 	__aio_initbusy = 1;
164 	lmutex_unlock(&__aio_initlock);
165 
166 	hz = (int)sysconf(_SC_CLK_TCK);
167 	__pid = getpid();
168 
169 	setup_cancelsig(SIGAIOCANCEL);
170 
171 	if (_kaio_supported_init() != 0)
172 		goto out;
173 
174 	/*
175 	 * Allocate and initialize the hash table.
176 	 * Do this only once, even if __uaio_init() is called twice.
177 	 */
178 	if (_aio_hash == NULL) {
179 		/* LINTED pointer cast */
180 		_aio_hash = (aio_hash_t *)mmap(NULL,
181 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
182 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
183 		if ((void *)_aio_hash == MAP_FAILED) {
184 			_aio_hash = NULL;
185 			goto out;
186 		}
187 		for (i = 0; i < HASHSZ; i++)
188 			(void) mutex_init(&_aio_hash[i].hash_lock,
189 			    USYNC_THREAD, NULL);
190 	}
191 
192 	/*
193 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
194 	 */
195 	(void) sigfillset(&_worker_set);
196 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
197 
198 	/*
199 	 * Create one worker to send asynchronous notifications.
200 	 * Do this only once, even if __uaio_init() is called twice.
201 	 */
202 	if (__no_workerscnt == 0 &&
203 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
204 		errno = EAGAIN;
205 		goto out;
206 	}
207 
208 	/*
209 	 * Create the minimum number of read/write workers.
210 	 * And later check whether atleast one worker is created;
211 	 * lwp_create() calls could fail because of segkp exhaustion.
212 	 */
213 	for (i = 0; i < _min_workers; i++)
214 		(void) _aio_create_worker(NULL, AIOREAD);
215 	if (__rw_workerscnt == 0) {
216 		errno = EAGAIN;
217 		goto out;
218 	}
219 
220 	ret = 0;
221 out:
222 	lmutex_lock(&__aio_initlock);
223 	if (ret == 0)
224 		__uaio_ok = 1;
225 	__aio_initbusy = 0;
226 	(void) cond_broadcast(&__aio_initcv);
227 	lmutex_unlock(&__aio_initlock);
228 	return (ret);
229 }
230 
231 /*
232  * Called from close() before actually performing the real _close().
233  */
234 void
235 _aio_close(int fd)
236 {
237 	if (fd < 0)	/* avoid cancelling everything */
238 		return;
239 	/*
240 	 * Cancel all outstanding aio requests for this file descriptor.
241 	 */
242 	if (__uaio_ok)
243 		(void) aiocancel_all(fd);
244 	/*
245 	 * If we have allocated the bit array, clear the bit for this file.
246 	 * The next open may re-use this file descriptor and the new file
247 	 * may have different kaio() behaviour.
248 	 */
249 	if (_kaio_supported != NULL)
250 		CLEAR_KAIO_SUPPORTED(fd);
251 }
252 
253 /*
254  * special kaio cleanup thread sits in a loop in the
255  * kernel waiting for pending kaio requests to complete.
256  */
257 void *
258 _kaio_cleanup_thread(void *arg)
259 {
260 	if (pthread_setspecific(_aio_key, arg) != 0)
261 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
262 	(void) _kaio(AIOSTART);
263 	return (arg);
264 }
265 
266 /*
267  * initialize kaio.
268  */
269 void
270 _kaio_init()
271 {
272 	int error;
273 	sigset_t oset;
274 
275 	lmutex_lock(&__aio_initlock);
276 	while (__aio_initbusy)
277 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
278 	if (_kaio_ok) {		/* already initialized */
279 		lmutex_unlock(&__aio_initlock);
280 		return;
281 	}
282 	__aio_initbusy = 1;
283 	lmutex_unlock(&__aio_initlock);
284 
285 	if (_kaio_supported_init() != 0)
286 		error = ENOMEM;
287 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
288 		error = ENOMEM;
289 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
290 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
291 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
292 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
293 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
294 	}
295 	if (error && _kaiowp != NULL) {
296 		_aio_worker_free(_kaiowp);
297 		_kaiowp = NULL;
298 	}
299 
300 	lmutex_lock(&__aio_initlock);
301 	if (error)
302 		_kaio_ok = -1;
303 	else
304 		_kaio_ok = 1;
305 	__aio_initbusy = 0;
306 	(void) cond_broadcast(&__aio_initcv);
307 	lmutex_unlock(&__aio_initlock);
308 }
309 
310 int
311 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
312     aio_result_t *resultp)
313 {
314 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
315 }
316 
317 int
318 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
319     aio_result_t *resultp)
320 {
321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
322 }
323 
324 #if !defined(_LP64)
325 int
326 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
327     aio_result_t *resultp)
328 {
329 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
330 }
331 
332 int
333 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
334     aio_result_t *resultp)
335 {
336 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
337 }
338 #endif	/* !defined(_LP64) */
339 
340 int
341 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
342     aio_result_t *resultp, int mode)
343 {
344 	aio_req_t *reqp;
345 	aio_args_t *ap;
346 	offset_t loffset;
347 	struct stat stat;
348 	int error = 0;
349 	int kerr;
350 	int umode;
351 
352 	switch (whence) {
353 
354 	case SEEK_SET:
355 		loffset = offset;
356 		break;
357 	case SEEK_CUR:
358 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
359 			error = -1;
360 		else
361 			loffset += offset;
362 		break;
363 	case SEEK_END:
364 		if (fstat(fd, &stat) == -1)
365 			error = -1;
366 		else
367 			loffset = offset + stat.st_size;
368 		break;
369 	default:
370 		errno = EINVAL;
371 		error = -1;
372 	}
373 
374 	if (error)
375 		return (error);
376 
377 	/* initialize kaio */
378 	if (!_kaio_ok)
379 		_kaio_init();
380 
381 	/*
382 	 * _aio_do_request() needs the original request code (mode) to be able
383 	 * to choose the appropiate 32/64 bit function.  All other functions
384 	 * only require the difference between READ and WRITE (umode).
385 	 */
386 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
387 		umode = mode - AIOAREAD64;
388 	else
389 		umode = mode;
390 
391 	/*
392 	 * Try kernel aio first.
393 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
394 	 */
395 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
396 		resultp->aio_errno = 0;
397 		sig_mutex_lock(&__aio_mutex);
398 		_kaio_outstand_cnt++;
399 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
400 		    (umode | AIO_POLL_BIT) : umode),
401 		    fd, buf, bufsz, loffset, resultp);
402 		if (kerr == 0) {
403 			sig_mutex_unlock(&__aio_mutex);
404 			return (0);
405 		}
406 		_kaio_outstand_cnt--;
407 		sig_mutex_unlock(&__aio_mutex);
408 		if (errno != ENOTSUP && errno != EBADFD)
409 			return (-1);
410 		if (errno == EBADFD)
411 			SET_KAIO_NOT_SUPPORTED(fd);
412 	}
413 
414 	if (!__uaio_ok && __uaio_init() == -1)
415 		return (-1);
416 
417 	if ((reqp = _aio_req_alloc()) == NULL) {
418 		errno = EAGAIN;
419 		return (-1);
420 	}
421 
422 	/*
423 	 * _aio_do_request() checks reqp->req_op to differentiate
424 	 * between 32 and 64 bit access.
425 	 */
426 	reqp->req_op = mode;
427 	reqp->req_resultp = resultp;
428 	ap = &reqp->req_args;
429 	ap->fd = fd;
430 	ap->buf = buf;
431 	ap->bufsz = bufsz;
432 	ap->offset = loffset;
433 
434 	if (_aio_hash_insert(resultp, reqp) != 0) {
435 		_aio_req_free(reqp);
436 		errno = EINVAL;
437 		return (-1);
438 	}
439 	/*
440 	 * _aio_req_add() only needs the difference between READ and
441 	 * WRITE to choose the right worker queue.
442 	 */
443 	_aio_req_add(reqp, &__nextworker_rw, umode);
444 	return (0);
445 }
446 
447 int
448 aiocancel(aio_result_t *resultp)
449 {
450 	aio_req_t *reqp;
451 	aio_worker_t *aiowp;
452 	int ret;
453 	int done = 0;
454 	int canceled = 0;
455 
456 	if (!__uaio_ok) {
457 		errno = EINVAL;
458 		return (-1);
459 	}
460 
461 	sig_mutex_lock(&__aio_mutex);
462 	reqp = _aio_hash_find(resultp);
463 	if (reqp == NULL) {
464 		if (_aio_outstand_cnt == _aio_req_done_cnt)
465 			errno = EINVAL;
466 		else
467 			errno = EACCES;
468 		ret = -1;
469 	} else {
470 		aiowp = reqp->req_worker;
471 		sig_mutex_lock(&aiowp->work_qlock1);
472 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
473 		sig_mutex_unlock(&aiowp->work_qlock1);
474 
475 		if (canceled) {
476 			ret = 0;
477 		} else {
478 			if (_aio_outstand_cnt == 0 ||
479 			    _aio_outstand_cnt == _aio_req_done_cnt)
480 				errno = EINVAL;
481 			else
482 				errno = EACCES;
483 			ret = -1;
484 		}
485 	}
486 	sig_mutex_unlock(&__aio_mutex);
487 	return (ret);
488 }
489 
490 /*
491  * This must be asynch safe
492  */
493 aio_result_t *
494 aiowait(struct timeval *uwait)
495 {
496 	aio_result_t *uresultp;
497 	aio_result_t *kresultp;
498 	aio_result_t *resultp;
499 	int dontblock;
500 	int timedwait = 0;
501 	int kaio_errno = 0;
502 	struct timeval twait;
503 	struct timeval *wait = NULL;
504 	hrtime_t hrtend;
505 	hrtime_t hres;
506 
507 	if (uwait) {
508 		/*
509 		 * Check for a valid specified wait time.
510 		 * If it is invalid, fail the call right away.
511 		 */
512 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
513 		    uwait->tv_usec >= MICROSEC) {
514 			errno = EINVAL;
515 			return ((aio_result_t *)-1);
516 		}
517 
518 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
519 			hrtend = gethrtime() +
520 				(hrtime_t)uwait->tv_sec * NANOSEC +
521 				(hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
522 			twait = *uwait;
523 			wait = &twait;
524 			timedwait++;
525 		} else {
526 			/* polling */
527 			sig_mutex_lock(&__aio_mutex);
528 			if (_kaio_outstand_cnt == 0) {
529 				kresultp = (aio_result_t *)-1;
530 			} else {
531 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
532 				    (struct timeval *)-1, 1);
533 				if (kresultp != (aio_result_t *)-1 &&
534 				    kresultp != NULL &&
535 				    kresultp != (aio_result_t *)1) {
536 					_kaio_outstand_cnt--;
537 					sig_mutex_unlock(&__aio_mutex);
538 					return (kresultp);
539 				}
540 			}
541 			uresultp = _aio_req_done();
542 			sig_mutex_unlock(&__aio_mutex);
543 			if (uresultp != NULL &&
544 			    uresultp != (aio_result_t *)-1) {
545 				return (uresultp);
546 			}
547 			if (uresultp == (aio_result_t *)-1 &&
548 			    kresultp == (aio_result_t *)-1) {
549 				errno = EINVAL;
550 				return ((aio_result_t *)-1);
551 			} else {
552 				return (NULL);
553 			}
554 		}
555 	}
556 
557 	for (;;) {
558 		sig_mutex_lock(&__aio_mutex);
559 		uresultp = _aio_req_done();
560 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
561 			sig_mutex_unlock(&__aio_mutex);
562 			resultp = uresultp;
563 			break;
564 		}
565 		_aiowait_flag++;
566 		dontblock = (uresultp == (aio_result_t *)-1);
567 		if (dontblock && _kaio_outstand_cnt == 0) {
568 			kresultp = (aio_result_t *)-1;
569 			kaio_errno = EINVAL;
570 		} else {
571 			sig_mutex_unlock(&__aio_mutex);
572 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
573 			    wait, dontblock);
574 			sig_mutex_lock(&__aio_mutex);
575 			kaio_errno = errno;
576 		}
577 		_aiowait_flag--;
578 		sig_mutex_unlock(&__aio_mutex);
579 		if (kresultp == (aio_result_t *)1) {
580 			/* aiowait() awakened by an aionotify() */
581 			continue;
582 		} else if (kresultp != NULL &&
583 		    kresultp != (aio_result_t *)-1) {
584 			resultp = kresultp;
585 			sig_mutex_lock(&__aio_mutex);
586 			_kaio_outstand_cnt--;
587 			sig_mutex_unlock(&__aio_mutex);
588 			break;
589 		} else if (kresultp == (aio_result_t *)-1 &&
590 		    kaio_errno == EINVAL &&
591 		    uresultp == (aio_result_t *)-1) {
592 			errno = kaio_errno;
593 			resultp = (aio_result_t *)-1;
594 			break;
595 		} else if (kresultp == (aio_result_t *)-1 &&
596 		    kaio_errno == EINTR) {
597 			errno = kaio_errno;
598 			resultp = (aio_result_t *)-1;
599 			break;
600 		} else if (timedwait) {
601 			hres = hrtend - gethrtime();
602 			if (hres <= 0) {
603 				/* time is up; return */
604 				resultp = NULL;
605 				break;
606 			} else {
607 				/*
608 				 * Some time left.  Round up the remaining time
609 				 * in nanoseconds to microsec.  Retry the call.
610 				 */
611 				hres += (NANOSEC / MICROSEC) - 1;
612 				wait->tv_sec = hres / NANOSEC;
613 				wait->tv_usec =
614 					(hres % NANOSEC) / (NANOSEC / MICROSEC);
615 			}
616 		} else {
617 			ASSERT(kresultp == NULL && uresultp == NULL);
618 			resultp = NULL;
619 			continue;
620 		}
621 	}
622 	return (resultp);
623 }
624 
625 /*
626  * _aio_get_timedelta calculates the remaining time and stores the result
627  * into timespec_t *wait.
628  */
629 
630 int
631 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
632 {
633 	int	ret = 0;
634 	struct	timeval cur;
635 	timespec_t curtime;
636 
637 	(void) gettimeofday(&cur, NULL);
638 	curtime.tv_sec = cur.tv_sec;
639 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
640 
641 	if (end->tv_sec >= curtime.tv_sec) {
642 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
643 		if (end->tv_nsec >= curtime.tv_nsec) {
644 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
645 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
646 				ret = -1;	/* timer expired */
647 		} else {
648 			if (end->tv_sec > curtime.tv_sec) {
649 				wait->tv_sec -= 1;
650 				wait->tv_nsec = NANOSEC -
651 				    (curtime.tv_nsec - end->tv_nsec);
652 			} else {
653 				ret = -1;	/* timer expired */
654 			}
655 		}
656 	} else {
657 		ret = -1;
658 	}
659 	return (ret);
660 }
661 
662 /*
663  * If closing by file descriptor: we will simply cancel all the outstanding
664  * aio`s and return.  Those aio's in question will have either noticed the
665  * cancellation notice before, during, or after initiating io.
666  */
667 int
668 aiocancel_all(int fd)
669 {
670 	aio_req_t *reqp;
671 	aio_req_t **reqpp;
672 	aio_worker_t *first;
673 	aio_worker_t *next;
674 	int canceled = 0;
675 	int done = 0;
676 	int cancelall = 0;
677 
678 	sig_mutex_lock(&__aio_mutex);
679 
680 	if (_aio_outstand_cnt == 0) {
681 		sig_mutex_unlock(&__aio_mutex);
682 		return (AIO_ALLDONE);
683 	}
684 
685 	/*
686 	 * Cancel requests from the read/write workers' queues.
687 	 */
688 	first = __nextworker_rw;
689 	next = first;
690 	do {
691 		_aio_cancel_work(next, fd, &canceled, &done);
692 	} while ((next = next->work_forw) != first);
693 
694 	/*
695 	 * finally, check if there are requests on the done queue that
696 	 * should be canceled.
697 	 */
698 	if (fd < 0)
699 		cancelall = 1;
700 	reqpp = &_aio_done_tail;
701 	while ((reqp = *reqpp) != NULL) {
702 		if (cancelall || reqp->req_args.fd == fd) {
703 			*reqpp = reqp->req_next;
704 			_aio_donecnt--;
705 			(void) _aio_hash_del(reqp->req_resultp);
706 			_aio_req_free(reqp);
707 		} else
708 			reqpp = &reqp->req_next;
709 	}
710 	if (cancelall) {
711 		ASSERT(_aio_donecnt == 0);
712 		_aio_done_head = NULL;
713 	}
714 	sig_mutex_unlock(&__aio_mutex);
715 
716 	if (canceled && done == 0)
717 		return (AIO_CANCELED);
718 	else if (done && canceled == 0)
719 		return (AIO_ALLDONE);
720 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
721 		return ((int)_kaio(AIOCANCEL, fd, NULL));
722 	return (AIO_NOTCANCELED);
723 }
724 
725 /*
726  * Cancel requests from a given work queue.  If the file descriptor
727  * parameter, fd, is non-negative, then only cancel those requests
728  * in this queue that are to this file descriptor.  If the fd
729  * parameter is -1, then cancel all requests.
730  */
731 static void
732 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
733 {
734 	aio_req_t *reqp;
735 
736 	sig_mutex_lock(&aiowp->work_qlock1);
737 	/*
738 	 * cancel queued requests first.
739 	 */
740 	reqp = aiowp->work_tail1;
741 	while (reqp != NULL) {
742 		if (fd < 0 || reqp->req_args.fd == fd) {
743 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
744 				/*
745 				 * Callers locks were dropped.
746 				 * reqp is invalid; start traversing
747 				 * the list from the beginning again.
748 				 */
749 				reqp = aiowp->work_tail1;
750 				continue;
751 			}
752 		}
753 		reqp = reqp->req_next;
754 	}
755 	/*
756 	 * Since the queued requests have been canceled, there can
757 	 * only be one inprogress request that should be canceled.
758 	 */
759 	if ((reqp = aiowp->work_req) != NULL &&
760 	    (fd < 0 || reqp->req_args.fd == fd))
761 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
762 	sig_mutex_unlock(&aiowp->work_qlock1);
763 }
764 
765 /*
766  * Cancel a request.  Return 1 if the callers locks were temporarily
767  * dropped, otherwise return 0.
768  */
769 int
770 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
771 {
772 	int ostate = reqp->req_state;
773 
774 	ASSERT(MUTEX_HELD(&__aio_mutex));
775 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
776 	if (ostate == AIO_REQ_CANCELED)
777 		return (0);
778 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
779 		(*done)++;
780 		return (0);
781 	}
782 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
783 		ASSERT(POSIX_AIO(reqp));
784 		/* Cancel the queued aio_fsync() request */
785 		if (!reqp->req_head->lio_canned) {
786 			reqp->req_head->lio_canned = 1;
787 			_aio_outstand_cnt--;
788 			(*canceled)++;
789 		}
790 		return (0);
791 	}
792 	reqp->req_state = AIO_REQ_CANCELED;
793 	_aio_req_del(aiowp, reqp, ostate);
794 	(void) _aio_hash_del(reqp->req_resultp);
795 	(*canceled)++;
796 	if (reqp == aiowp->work_req) {
797 		ASSERT(ostate == AIO_REQ_INPROGRESS);
798 		/*
799 		 * Set the result values now, before _aiodone() is called.
800 		 * We do this because the application can expect aio_return
801 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
802 		 * immediately after a successful return from aiocancel()
803 		 * or aio_cancel().
804 		 */
805 		_aio_set_result(reqp, -1, ECANCELED);
806 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
807 		return (0);
808 	}
809 	if (!POSIX_AIO(reqp)) {
810 		_aio_outstand_cnt--;
811 		_aio_set_result(reqp, -1, ECANCELED);
812 		return (0);
813 	}
814 	sig_mutex_unlock(&aiowp->work_qlock1);
815 	sig_mutex_unlock(&__aio_mutex);
816 	_aiodone(reqp, -1, ECANCELED);
817 	sig_mutex_lock(&__aio_mutex);
818 	sig_mutex_lock(&aiowp->work_qlock1);
819 	return (1);
820 }
821 
822 int
823 _aio_create_worker(aio_req_t *reqp, int mode)
824 {
825 	aio_worker_t *aiowp, **workers, **nextworker;
826 	int *aio_workerscnt;
827 	void *(*func)(void *);
828 	sigset_t oset;
829 	int error;
830 
831 	/*
832 	 * Put the new worker thread in the right queue.
833 	 */
834 	switch (mode) {
835 	case AIOREAD:
836 	case AIOWRITE:
837 	case AIOAREAD:
838 	case AIOAWRITE:
839 #if !defined(_LP64)
840 	case AIOAREAD64:
841 	case AIOAWRITE64:
842 #endif
843 		workers = &__workers_rw;
844 		nextworker = &__nextworker_rw;
845 		aio_workerscnt = &__rw_workerscnt;
846 		func = _aio_do_request;
847 		break;
848 	case AIONOTIFY:
849 		workers = &__workers_no;
850 		nextworker = &__nextworker_no;
851 		func = _aio_do_notify;
852 		aio_workerscnt = &__no_workerscnt;
853 		break;
854 	default:
855 		aio_panic("_aio_create_worker: invalid mode");
856 		break;
857 	}
858 
859 	if ((aiowp = _aio_worker_alloc()) == NULL)
860 		return (-1);
861 
862 	if (reqp) {
863 		reqp->req_state = AIO_REQ_QUEUED;
864 		reqp->req_worker = aiowp;
865 		aiowp->work_head1 = reqp;
866 		aiowp->work_tail1 = reqp;
867 		aiowp->work_next1 = reqp;
868 		aiowp->work_count1 = 1;
869 		aiowp->work_minload1 = 1;
870 	}
871 
872 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
873 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
874 		THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
875 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
876 	if (error) {
877 		if (reqp) {
878 			reqp->req_state = 0;
879 			reqp->req_worker = NULL;
880 		}
881 		_aio_worker_free(aiowp);
882 		return (-1);
883 	}
884 
885 	lmutex_lock(&__aio_mutex);
886 	(*aio_workerscnt)++;
887 	if (*workers == NULL) {
888 		aiowp->work_forw = aiowp;
889 		aiowp->work_backw = aiowp;
890 		*nextworker = aiowp;
891 		*workers = aiowp;
892 	} else {
893 		aiowp->work_backw = (*workers)->work_backw;
894 		aiowp->work_forw = (*workers);
895 		(*workers)->work_backw->work_forw = aiowp;
896 		(*workers)->work_backw = aiowp;
897 	}
898 	_aio_worker_cnt++;
899 	lmutex_unlock(&__aio_mutex);
900 
901 	(void) thr_continue(aiowp->work_tid);
902 
903 	return (0);
904 }
905 
906 /*
907  * This is the worker's main routine.
908  * The task of this function is to execute all queued requests;
909  * once the last pending request is executed this function will block
910  * in _aio_idle().  A new incoming request must wakeup this thread to
911  * restart the work.
912  * Every worker has an own work queue.  The queue lock is required
913  * to synchronize the addition of new requests for this worker or
914  * cancellation of pending/running requests.
915  *
916  * Cancellation scenarios:
917  * The cancellation of a request is being done asynchronously using
918  * _aio_cancel_req() from another thread context.
919  * A queued request can be cancelled in different manners :
920  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
921  *	- lock the queue -> remove the request -> unlock the queue
922  *	- this function/thread does not detect this cancellation process
923  * b) request is in progress (AIO_REQ_INPROGRESS) :
924  *	- this function first allow the cancellation of the running
925  *	  request with the flag "work_cancel_flg=1"
926  * 		see _aio_req_get() -> _aio_cancel_on()
927  *	  During this phase, it is allowed to interrupt the worker
928  *	  thread running the request (this thread) using the SIGAIOCANCEL
929  *	  signal.
930  *	  Once this thread returns from the kernel (because the request
931  *	  is just done), then it must disable a possible cancellation
932  *	  and proceed to finish the request.  To disable the cancellation
933  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
934  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
935  *	  same procedure as in a)
936  *
937  * To b)
938  *	This thread uses sigsetjmp() to define the position in the code, where
939  *	it wish to continue working in the case that a SIGAIOCANCEL signal
940  *	is detected.
941  *	Normally this thread should get the cancellation signal during the
942  *	kernel phase (reading or writing).  In that case the signal handler
943  *	aiosigcancelhndlr() is activated using the worker thread context,
944  *	which again will use the siglongjmp() function to break the standard
945  *	code flow and jump to the "sigsetjmp" position, provided that
946  *	"work_cancel_flg" is set to "1".
947  *	Because the "work_cancel_flg" is only manipulated by this worker
948  *	thread and it can only run on one CPU at a given time, it is not
949  *	necessary to protect that flag with the queue lock.
950  *	Returning from the kernel (read or write system call) we must
951  *	first disable the use of the SIGAIOCANCEL signal and accordingly
952  *	the use of the siglongjmp() function to prevent a possible deadlock:
953  *	- It can happens that this worker thread returns from the kernel and
954  *	  blocks in "work_qlock1",
955  *	- then a second thread cancels the apparently "in progress" request
956  *	  and sends the SIGAIOCANCEL signal to the worker thread,
957  *	- the worker thread gets assigned the "work_qlock1" and will returns
958  *	  from the kernel,
959  *	- the kernel detects the pending signal and activates the signal
960  *	  handler instead,
961  *	- if the "work_cancel_flg" is still set then the signal handler
962  *	  should use siglongjmp() to cancel the "in progress" request and
963  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
964  *	  for a second time => deadlock.
965  *	To avoid that situation we disable the cancellation of the request
966  *	in progress BEFORE we try to acquire the work_qlock1.
967  *	In that case the signal handler will not call siglongjmp() and the
968  *	worker thread will continue running the standard code flow.
969  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
970  *	an eventually required siglongjmp() freeing the work_qlock1 and
971  *	avoiding a deadlock.
972  */
973 void *
974 _aio_do_request(void *arglist)
975 {
976 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
977 	ulwp_t *self = curthread;
978 	struct aio_args *arg;
979 	aio_req_t *reqp;		/* current AIO request */
980 	ssize_t retval;
981 	int error;
982 
983 	if (pthread_setspecific(_aio_key, aiowp) != 0)
984 		aio_panic("_aio_do_request, pthread_setspecific()");
985 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
986 	ASSERT(aiowp->work_req == NULL);
987 
988 	/*
989 	 * We resume here when an operation is cancelled.
990 	 * On first entry, aiowp->work_req == NULL, so all
991 	 * we do is block SIGAIOCANCEL.
992 	 */
993 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
994 	ASSERT(self->ul_sigdefer == 0);
995 
996 	sigoff(self);	/* block SIGAIOCANCEL */
997 	if (aiowp->work_req != NULL)
998 		_aio_finish_request(aiowp, -1, ECANCELED);
999 
1000 	for (;;) {
1001 		/*
1002 		 * Put completed requests on aio_done_list.  This has
1003 		 * to be done as part of the main loop to ensure that
1004 		 * we don't artificially starve any aiowait'ers.
1005 		 */
1006 		if (aiowp->work_done1)
1007 			_aio_work_done(aiowp);
1008 
1009 top:
1010 		/* consume any deferred SIGAIOCANCEL signal here */
1011 		sigon(self);
1012 		sigoff(self);
1013 
1014 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1015 			if (_aio_idle(aiowp) != 0)
1016 				goto top;
1017 		}
1018 		arg = &reqp->req_args;
1019 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1020 		    reqp->req_state == AIO_REQ_CANCELED);
1021 		error = 0;
1022 
1023 		switch (reqp->req_op) {
1024 		case AIOREAD:
1025 		case AIOAREAD:
1026 			sigon(self);	/* unblock SIGAIOCANCEL */
1027 			retval = pread(arg->fd, arg->buf,
1028 			    arg->bufsz, arg->offset);
1029 			if (retval == -1) {
1030 				if (errno == ESPIPE) {
1031 					retval = read(arg->fd,
1032 					    arg->buf, arg->bufsz);
1033 					if (retval == -1)
1034 						error = errno;
1035 				} else {
1036 					error = errno;
1037 				}
1038 			}
1039 			sigoff(self);	/* block SIGAIOCANCEL */
1040 			break;
1041 		case AIOWRITE:
1042 		case AIOAWRITE:
1043 			sigon(self);	/* unblock SIGAIOCANCEL */
1044 			retval = pwrite(arg->fd, arg->buf,
1045 			    arg->bufsz, arg->offset);
1046 			if (retval == -1) {
1047 				if (errno == ESPIPE) {
1048 					retval = write(arg->fd,
1049 					    arg->buf, arg->bufsz);
1050 					if (retval == -1)
1051 						error = errno;
1052 				} else {
1053 					error = errno;
1054 				}
1055 			}
1056 			sigoff(self);	/* block SIGAIOCANCEL */
1057 			break;
1058 #if !defined(_LP64)
1059 		case AIOAREAD64:
1060 			sigon(self);	/* unblock SIGAIOCANCEL */
1061 			retval = pread64(arg->fd, arg->buf,
1062 			    arg->bufsz, arg->offset);
1063 			if (retval == -1) {
1064 				if (errno == ESPIPE) {
1065 					retval = read(arg->fd,
1066 					    arg->buf, arg->bufsz);
1067 					if (retval == -1)
1068 						error = errno;
1069 				} else {
1070 					error = errno;
1071 				}
1072 			}
1073 			sigoff(self);	/* block SIGAIOCANCEL */
1074 			break;
1075 		case AIOAWRITE64:
1076 			sigon(self);	/* unblock SIGAIOCANCEL */
1077 			retval = pwrite64(arg->fd, arg->buf,
1078 			    arg->bufsz, arg->offset);
1079 			if (retval == -1) {
1080 				if (errno == ESPIPE) {
1081 					retval = write(arg->fd,
1082 					    arg->buf, arg->bufsz);
1083 					if (retval == -1)
1084 						error = errno;
1085 				} else {
1086 					error = errno;
1087 				}
1088 			}
1089 			sigoff(self);	/* block SIGAIOCANCEL */
1090 			break;
1091 #endif	/* !defined(_LP64) */
1092 		case AIOFSYNC:
1093 			if (_aio_fsync_del(aiowp, reqp))
1094 				goto top;
1095 			ASSERT(reqp->req_head == NULL);
1096 			/*
1097 			 * All writes for this fsync request are now
1098 			 * acknowledged.  Now make these writes visible
1099 			 * and put the final request into the hash table.
1100 			 */
1101 			if (reqp->req_state == AIO_REQ_CANCELED) {
1102 				/* EMPTY */;
1103 			} else if (arg->offset == O_SYNC) {
1104 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1105 					error = errno;
1106 			} else {
1107 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1108 					error = errno;
1109 			}
1110 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1111 				aio_panic("_aio_do_request(): AIOFSYNC: "
1112 				    "request already in hash table");
1113 			break;
1114 		default:
1115 			aio_panic("_aio_do_request, bad op");
1116 		}
1117 
1118 		_aio_finish_request(aiowp, retval, error);
1119 	}
1120 	/* NOTREACHED */
1121 	return (NULL);
1122 }
1123 
1124 /*
1125  * Perform the tail processing for _aio_do_request().
1126  * The in-progress request may or may not have been cancelled.
1127  */
1128 static void
1129 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1130 {
1131 	aio_req_t *reqp;
1132 
1133 	sig_mutex_lock(&aiowp->work_qlock1);
1134 	if ((reqp = aiowp->work_req) == NULL)
1135 		sig_mutex_unlock(&aiowp->work_qlock1);
1136 	else {
1137 		aiowp->work_req = NULL;
1138 		if (reqp->req_state == AIO_REQ_CANCELED) {
1139 			retval = -1;
1140 			error = ECANCELED;
1141 		}
1142 		if (!POSIX_AIO(reqp)) {
1143 			sig_mutex_unlock(&aiowp->work_qlock1);
1144 			sig_mutex_lock(&__aio_mutex);
1145 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1146 				reqp->req_state = AIO_REQ_DONE;
1147 			_aio_req_done_cnt++;
1148 			_aio_set_result(reqp, retval, error);
1149 			if (error == ECANCELED)
1150 				_aio_outstand_cnt--;
1151 			sig_mutex_unlock(&__aio_mutex);
1152 		} else {
1153 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1154 				reqp->req_state = AIO_REQ_DONE;
1155 			sig_mutex_unlock(&aiowp->work_qlock1);
1156 			_aiodone(reqp, retval, error);
1157 		}
1158 	}
1159 }
1160 
1161 void
1162 _aio_req_mark_done(aio_req_t *reqp)
1163 {
1164 #if !defined(_LP64)
1165 	if (reqp->req_largefile)
1166 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1167 	else
1168 #endif
1169 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1170 }
1171 
1172 /*
1173  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1174  * hopefully to consume one of our queued signals.
1175  */
1176 static void
1177 _aio_delay(int ticks)
1178 {
1179 	(void) usleep(ticks * (MICROSEC / hz));
1180 }
1181 
1182 /*
1183  * Actually send the notifications.
1184  * We could block indefinitely here if the application
1185  * is not listening for the signal or port notifications.
1186  */
1187 static void
1188 send_notification(notif_param_t *npp)
1189 {
1190 	extern int __sigqueue(pid_t pid, int signo,
1191 		/* const union sigval */ void *value, int si_code, int block);
1192 
1193 	if (npp->np_signo)
1194 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1195 		    SI_ASYNCIO, 1);
1196 	else if (npp->np_port >= 0)
1197 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1198 		    npp->np_event, npp->np_object, npp->np_user);
1199 
1200 	if (npp->np_lio_signo)
1201 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1202 		    SI_ASYNCIO, 1);
1203 	else if (npp->np_lio_port >= 0)
1204 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1205 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1206 }
1207 
1208 /*
1209  * Asynchronous notification worker.
1210  */
1211 void *
1212 _aio_do_notify(void *arg)
1213 {
1214 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1215 	aio_req_t *reqp;
1216 
1217 	/*
1218 	 * This isn't really necessary.  All signals are blocked.
1219 	 */
1220 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1221 		aio_panic("_aio_do_notify, pthread_setspecific()");
1222 
1223 	/*
1224 	 * Notifications are never cancelled.
1225 	 * All signals remain blocked, forever.
1226 	 */
1227 	for (;;) {
1228 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1229 			if (_aio_idle(aiowp) != 0)
1230 				aio_panic("_aio_do_notify: _aio_idle() failed");
1231 		}
1232 		send_notification(&reqp->req_notify);
1233 		_aio_req_free(reqp);
1234 	}
1235 
1236 	/* NOTREACHED */
1237 	return (NULL);
1238 }
1239 
1240 /*
1241  * Do the completion semantics for a request that was either canceled
1242  * by _aio_cancel_req() or was completed by _aio_do_request().
1243  */
1244 static void
1245 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1246 {
1247 	aio_result_t *resultp = reqp->req_resultp;
1248 	int notify = 0;
1249 	aio_lio_t *head;
1250 	int sigev_none;
1251 	int sigev_signal;
1252 	int sigev_thread;
1253 	int sigev_port;
1254 	notif_param_t np;
1255 
1256 	/*
1257 	 * We call _aiodone() only for Posix I/O.
1258 	 */
1259 	ASSERT(POSIX_AIO(reqp));
1260 
1261 	sigev_none = 0;
1262 	sigev_signal = 0;
1263 	sigev_thread = 0;
1264 	sigev_port = 0;
1265 	np.np_signo = 0;
1266 	np.np_port = -1;
1267 	np.np_lio_signo = 0;
1268 	np.np_lio_port = -1;
1269 
1270 	switch (reqp->req_sigevent.sigev_notify) {
1271 	case SIGEV_NONE:
1272 		sigev_none = 1;
1273 		break;
1274 	case SIGEV_SIGNAL:
1275 		sigev_signal = 1;
1276 		break;
1277 	case SIGEV_THREAD:
1278 		sigev_thread = 1;
1279 		break;
1280 	case SIGEV_PORT:
1281 		sigev_port = 1;
1282 		break;
1283 	default:
1284 		aio_panic("_aiodone: improper sigev_notify");
1285 		break;
1286 	}
1287 
1288 	/*
1289 	 * Figure out the notification parameters while holding __aio_mutex.
1290 	 * Actually perform the notifications after dropping __aio_mutex.
1291 	 * This allows us to sleep for a long time (if the notifications
1292 	 * incur delays) without impeding other async I/O operations.
1293 	 */
1294 
1295 	sig_mutex_lock(&__aio_mutex);
1296 
1297 	if (sigev_signal) {
1298 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1299 			notify = 1;
1300 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1301 	} else if (sigev_thread | sigev_port) {
1302 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1303 			notify = 1;
1304 		np.np_event = reqp->req_op;
1305 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1306 			np.np_event = AIOFSYNC64;
1307 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1308 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1309 	}
1310 
1311 	if (resultp->aio_errno == EINPROGRESS)
1312 		_aio_set_result(reqp, retval, error);
1313 
1314 	_aio_outstand_cnt--;
1315 
1316 	head = reqp->req_head;
1317 	reqp->req_head = NULL;
1318 
1319 	if (sigev_none) {
1320 		_aio_enq_doneq(reqp);
1321 		reqp = NULL;
1322 	} else {
1323 		(void) _aio_hash_del(resultp);
1324 		_aio_req_mark_done(reqp);
1325 	}
1326 
1327 	_aio_waitn_wakeup();
1328 
1329 	/*
1330 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1331 	 * __aio_suspend() increments "_aio_kernel_suspend"
1332 	 * when they are waiting in the kernel for completed I/Os.
1333 	 *
1334 	 * _kaio(AIONOTIFY) awakes the corresponding function
1335 	 * in the kernel; then the corresponding __aio_waitn() or
1336 	 * __aio_suspend() function could reap the recently
1337 	 * completed I/Os (_aiodone()).
1338 	 */
1339 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1340 		(void) _kaio(AIONOTIFY);
1341 
1342 	sig_mutex_unlock(&__aio_mutex);
1343 
1344 	if (head != NULL) {
1345 		/*
1346 		 * If all the lio requests have completed,
1347 		 * prepare to notify the waiting thread.
1348 		 */
1349 		sig_mutex_lock(&head->lio_mutex);
1350 		ASSERT(head->lio_refcnt == head->lio_nent);
1351 		if (head->lio_refcnt == 1) {
1352 			int waiting = 0;
1353 			if (head->lio_mode == LIO_WAIT) {
1354 				if ((waiting = head->lio_waiting) != 0)
1355 					(void) cond_signal(&head->lio_cond_cv);
1356 			} else if (head->lio_port < 0) { /* none or signal */
1357 				if ((np.np_lio_signo = head->lio_signo) != 0)
1358 					notify = 1;
1359 				np.np_lio_user = head->lio_sigval.sival_ptr;
1360 			} else {			/* thread or port */
1361 				notify = 1;
1362 				np.np_lio_port = head->lio_port;
1363 				np.np_lio_event = head->lio_event;
1364 				np.np_lio_object =
1365 				    (uintptr_t)head->lio_sigevent;
1366 				np.np_lio_user = head->lio_sigval.sival_ptr;
1367 			}
1368 			head->lio_nent = head->lio_refcnt = 0;
1369 			sig_mutex_unlock(&head->lio_mutex);
1370 			if (waiting == 0)
1371 				_aio_lio_free(head);
1372 		} else {
1373 			head->lio_nent--;
1374 			head->lio_refcnt--;
1375 			sig_mutex_unlock(&head->lio_mutex);
1376 		}
1377 	}
1378 
1379 	/*
1380 	 * The request is completed; now perform the notifications.
1381 	 */
1382 	if (notify) {
1383 		if (reqp != NULL) {
1384 			/*
1385 			 * We usually put the request on the notification
1386 			 * queue because we don't want to block and delay
1387 			 * other operations behind us in the work queue.
1388 			 * Also we must never block on a cancel notification
1389 			 * because we are being called from an application
1390 			 * thread in this case and that could lead to deadlock
1391 			 * if no other thread is receiving notificatins.
1392 			 */
1393 			reqp->req_notify = np;
1394 			reqp->req_op = AIONOTIFY;
1395 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1396 			reqp = NULL;
1397 		} else {
1398 			/*
1399 			 * We already put the request on the done queue,
1400 			 * so we can't queue it to the notification queue.
1401 			 * Just do the notification directly.
1402 			 */
1403 			send_notification(&np);
1404 		}
1405 	}
1406 
1407 	if (reqp != NULL)
1408 		_aio_req_free(reqp);
1409 }
1410 
1411 /*
1412  * Delete fsync requests from list head until there is
1413  * only one left.  Return 0 when there is only one,
1414  * otherwise return a non-zero value.
1415  */
1416 static int
1417 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1418 {
1419 	aio_lio_t *head = reqp->req_head;
1420 	int rval = 0;
1421 
1422 	ASSERT(reqp == aiowp->work_req);
1423 	sig_mutex_lock(&aiowp->work_qlock1);
1424 	sig_mutex_lock(&head->lio_mutex);
1425 	if (head->lio_refcnt > 1) {
1426 		head->lio_refcnt--;
1427 		head->lio_nent--;
1428 		aiowp->work_req = NULL;
1429 		sig_mutex_unlock(&head->lio_mutex);
1430 		sig_mutex_unlock(&aiowp->work_qlock1);
1431 		sig_mutex_lock(&__aio_mutex);
1432 		_aio_outstand_cnt--;
1433 		_aio_waitn_wakeup();
1434 		sig_mutex_unlock(&__aio_mutex);
1435 		_aio_req_free(reqp);
1436 		return (1);
1437 	}
1438 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1439 	reqp->req_head = NULL;
1440 	if (head->lio_canned)
1441 		reqp->req_state = AIO_REQ_CANCELED;
1442 	if (head->lio_mode == LIO_DESTROY) {
1443 		aiowp->work_req = NULL;
1444 		rval = 1;
1445 	}
1446 	sig_mutex_unlock(&head->lio_mutex);
1447 	sig_mutex_unlock(&aiowp->work_qlock1);
1448 	head->lio_refcnt--;
1449 	head->lio_nent--;
1450 	_aio_lio_free(head);
1451 	if (rval != 0)
1452 		_aio_req_free(reqp);
1453 	return (rval);
1454 }
1455 
1456 /*
1457  * A worker is set idle when its work queue is empty.
1458  * The worker checks again that it has no more work
1459  * and then goes to sleep waiting for more work.
1460  */
1461 int
1462 _aio_idle(aio_worker_t *aiowp)
1463 {
1464 	int error = 0;
1465 
1466 	sig_mutex_lock(&aiowp->work_qlock1);
1467 	if (aiowp->work_count1 == 0) {
1468 		ASSERT(aiowp->work_minload1 == 0);
1469 		aiowp->work_idleflg = 1;
1470 		/*
1471 		 * A cancellation handler is not needed here.
1472 		 * aio worker threads are never cancelled via pthread_cancel().
1473 		 */
1474 		error = sig_cond_wait(&aiowp->work_idle_cv,
1475 		    &aiowp->work_qlock1);
1476 		/*
1477 		 * The idle flag is normally cleared before worker is awakened
1478 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1479 		 */
1480 		if (error)
1481 			aiowp->work_idleflg = 0;
1482 	}
1483 	sig_mutex_unlock(&aiowp->work_qlock1);
1484 	return (error);
1485 }
1486 
1487 /*
1488  * A worker's completed AIO requests are placed onto a global
1489  * done queue.  The application is only sent a SIGIO signal if
1490  * the process has a handler enabled and it is not waiting via
1491  * aiowait().
1492  */
1493 static void
1494 _aio_work_done(aio_worker_t *aiowp)
1495 {
1496 	aio_req_t *reqp;
1497 
1498 	sig_mutex_lock(&aiowp->work_qlock1);
1499 	reqp = aiowp->work_prev1;
1500 	reqp->req_next = NULL;
1501 	aiowp->work_done1 = 0;
1502 	aiowp->work_tail1 = aiowp->work_next1;
1503 	if (aiowp->work_tail1 == NULL)
1504 		aiowp->work_head1 = NULL;
1505 	aiowp->work_prev1 = NULL;
1506 	sig_mutex_unlock(&aiowp->work_qlock1);
1507 	sig_mutex_lock(&__aio_mutex);
1508 	_aio_donecnt++;
1509 	_aio_outstand_cnt--;
1510 	_aio_req_done_cnt--;
1511 	ASSERT(_aio_donecnt > 0 &&
1512 	    _aio_outstand_cnt >= 0 &&
1513 	    _aio_req_done_cnt >= 0);
1514 	ASSERT(reqp != NULL);
1515 
1516 	if (_aio_done_tail == NULL) {
1517 		_aio_done_head = _aio_done_tail = reqp;
1518 	} else {
1519 		_aio_done_head->req_next = reqp;
1520 		_aio_done_head = reqp;
1521 	}
1522 
1523 	if (_aiowait_flag) {
1524 		sig_mutex_unlock(&__aio_mutex);
1525 		(void) _kaio(AIONOTIFY);
1526 	} else {
1527 		sig_mutex_unlock(&__aio_mutex);
1528 		if (_sigio_enabled)
1529 			(void) kill(__pid, SIGIO);
1530 	}
1531 }
1532 
1533 /*
1534  * The done queue consists of AIO requests that are in either the
1535  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1536  * are discarded.  If the done queue is empty then NULL is returned.
1537  * Otherwise the address of a done aio_result_t is returned.
1538  */
1539 aio_result_t *
1540 _aio_req_done(void)
1541 {
1542 	aio_req_t *reqp;
1543 	aio_result_t *resultp;
1544 
1545 	ASSERT(MUTEX_HELD(&__aio_mutex));
1546 
1547 	if ((reqp = _aio_done_tail) != NULL) {
1548 		if ((_aio_done_tail = reqp->req_next) == NULL)
1549 			_aio_done_head = NULL;
1550 		ASSERT(_aio_donecnt > 0);
1551 		_aio_donecnt--;
1552 		(void) _aio_hash_del(reqp->req_resultp);
1553 		resultp = reqp->req_resultp;
1554 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1555 		_aio_req_free(reqp);
1556 		return (resultp);
1557 	}
1558 	/* is queue empty? */
1559 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1560 		return ((aio_result_t *)-1);
1561 	}
1562 	return (NULL);
1563 }
1564 
1565 /*
1566  * Set the return and errno values for the application's use.
1567  *
1568  * For the Posix interfaces, we must set the return value first followed
1569  * by the errno value because the Posix interfaces allow for a change
1570  * in the errno value from EINPROGRESS to something else to signal
1571  * the completion of the asynchronous request.
1572  *
1573  * The opposite is true for the Solaris interfaces.  These allow for
1574  * a change in the return value from AIO_INPROGRESS to something else
1575  * to signal the completion of the asynchronous request.
1576  */
1577 void
1578 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1579 {
1580 	aio_result_t *resultp = reqp->req_resultp;
1581 
1582 	if (POSIX_AIO(reqp)) {
1583 		resultp->aio_return = retval;
1584 		membar_producer();
1585 		resultp->aio_errno = error;
1586 	} else {
1587 		resultp->aio_errno = error;
1588 		membar_producer();
1589 		resultp->aio_return = retval;
1590 	}
1591 }
1592 
1593 /*
1594  * Add an AIO request onto the next work queue.
1595  * A circular list of workers is used to choose the next worker.
1596  */
1597 void
1598 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1599 {
1600 	ulwp_t *self = curthread;
1601 	aio_worker_t *aiowp;
1602 	aio_worker_t *first;
1603 	int load_bal_flg = 1;
1604 	int found;
1605 
1606 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1607 	reqp->req_next = NULL;
1608 	/*
1609 	 * Try to acquire the next worker's work queue.  If it is locked,
1610 	 * then search the list of workers until a queue is found unlocked,
1611 	 * or until the list is completely traversed at which point another
1612 	 * worker will be created.
1613 	 */
1614 	sigoff(self);		/* defer SIGIO */
1615 	sig_mutex_lock(&__aio_mutex);
1616 	first = aiowp = *nextworker;
1617 	if (mode != AIONOTIFY)
1618 		_aio_outstand_cnt++;
1619 	sig_mutex_unlock(&__aio_mutex);
1620 
1621 	switch (mode) {
1622 	case AIOREAD:
1623 	case AIOWRITE:
1624 	case AIOAREAD:
1625 	case AIOAWRITE:
1626 #if !defined(_LP64)
1627 	case AIOAREAD64:
1628 	case AIOAWRITE64:
1629 #endif
1630 		/* try to find an idle worker */
1631 		found = 0;
1632 		do {
1633 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1634 				if (aiowp->work_idleflg) {
1635 					found = 1;
1636 					break;
1637 				}
1638 				sig_mutex_unlock(&aiowp->work_qlock1);
1639 			}
1640 		} while ((aiowp = aiowp->work_forw) != first);
1641 
1642 		if (found) {
1643 			aiowp->work_minload1++;
1644 			break;
1645 		}
1646 
1647 		/* try to acquire some worker's queue lock */
1648 		do {
1649 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1650 				found = 1;
1651 				break;
1652 			}
1653 		} while ((aiowp = aiowp->work_forw) != first);
1654 
1655 		/*
1656 		 * Create more workers when the workers appear overloaded.
1657 		 * Either all the workers are busy draining their queues
1658 		 * or no worker's queue lock could be acquired.
1659 		 */
1660 		if (!found) {
1661 			if (_aio_worker_cnt < _max_workers) {
1662 				if (_aio_create_worker(reqp, mode))
1663 					aio_panic("_aio_req_add: add worker");
1664 				sigon(self);	/* reenable SIGIO */
1665 				return;
1666 			}
1667 
1668 			/*
1669 			 * No worker available and we have created
1670 			 * _max_workers, keep going through the
1671 			 * list slowly until we get a lock
1672 			 */
1673 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1674 				/*
1675 				 * give someone else a chance
1676 				 */
1677 				_aio_delay(1);
1678 				aiowp = aiowp->work_forw;
1679 			}
1680 		}
1681 
1682 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1683 		if (_aio_worker_cnt < _max_workers &&
1684 		    aiowp->work_minload1 >= _minworkload) {
1685 			sig_mutex_unlock(&aiowp->work_qlock1);
1686 			sig_mutex_lock(&__aio_mutex);
1687 			*nextworker = aiowp->work_forw;
1688 			sig_mutex_unlock(&__aio_mutex);
1689 			if (_aio_create_worker(reqp, mode))
1690 				aio_panic("aio_req_add: add worker");
1691 			sigon(self);	/* reenable SIGIO */
1692 			return;
1693 		}
1694 		aiowp->work_minload1++;
1695 		break;
1696 	case AIOFSYNC:
1697 	case AIONOTIFY:
1698 		load_bal_flg = 0;
1699 		sig_mutex_lock(&aiowp->work_qlock1);
1700 		break;
1701 	default:
1702 		aio_panic("_aio_req_add: invalid mode");
1703 		break;
1704 	}
1705 	/*
1706 	 * Put request onto worker's work queue.
1707 	 */
1708 	if (aiowp->work_tail1 == NULL) {
1709 		ASSERT(aiowp->work_count1 == 0);
1710 		aiowp->work_tail1 = reqp;
1711 		aiowp->work_next1 = reqp;
1712 	} else {
1713 		aiowp->work_head1->req_next = reqp;
1714 		if (aiowp->work_next1 == NULL)
1715 			aiowp->work_next1 = reqp;
1716 	}
1717 	reqp->req_state = AIO_REQ_QUEUED;
1718 	reqp->req_worker = aiowp;
1719 	aiowp->work_head1 = reqp;
1720 	/*
1721 	 * Awaken worker if it is not currently active.
1722 	 */
1723 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1724 		aiowp->work_idleflg = 0;
1725 		(void) cond_signal(&aiowp->work_idle_cv);
1726 	}
1727 	sig_mutex_unlock(&aiowp->work_qlock1);
1728 
1729 	if (load_bal_flg) {
1730 		sig_mutex_lock(&__aio_mutex);
1731 		*nextworker = aiowp->work_forw;
1732 		sig_mutex_unlock(&__aio_mutex);
1733 	}
1734 	sigon(self);	/* reenable SIGIO */
1735 }
1736 
1737 /*
1738  * Get an AIO request for a specified worker.
1739  * If the work queue is empty, return NULL.
1740  */
1741 aio_req_t *
1742 _aio_req_get(aio_worker_t *aiowp)
1743 {
1744 	aio_req_t *reqp;
1745 
1746 	sig_mutex_lock(&aiowp->work_qlock1);
1747 	if ((reqp = aiowp->work_next1) != NULL) {
1748 		/*
1749 		 * Remove a POSIX request from the queue; the
1750 		 * request queue is a singularly linked list
1751 		 * with a previous pointer.  The request is
1752 		 * removed by updating the previous pointer.
1753 		 *
1754 		 * Non-posix requests are left on the queue
1755 		 * to eventually be placed on the done queue.
1756 		 */
1757 
1758 		if (POSIX_AIO(reqp)) {
1759 			if (aiowp->work_prev1 == NULL) {
1760 				aiowp->work_tail1 = reqp->req_next;
1761 				if (aiowp->work_tail1 == NULL)
1762 					aiowp->work_head1 = NULL;
1763 			} else {
1764 				aiowp->work_prev1->req_next = reqp->req_next;
1765 				if (aiowp->work_head1 == reqp)
1766 					aiowp->work_head1 = reqp->req_next;
1767 			}
1768 
1769 		} else {
1770 			aiowp->work_prev1 = reqp;
1771 			ASSERT(aiowp->work_done1 >= 0);
1772 			aiowp->work_done1++;
1773 		}
1774 		ASSERT(reqp != reqp->req_next);
1775 		aiowp->work_next1 = reqp->req_next;
1776 		ASSERT(aiowp->work_count1 >= 1);
1777 		aiowp->work_count1--;
1778 		switch (reqp->req_op) {
1779 		case AIOREAD:
1780 		case AIOWRITE:
1781 		case AIOAREAD:
1782 		case AIOAWRITE:
1783 #if !defined(_LP64)
1784 		case AIOAREAD64:
1785 		case AIOAWRITE64:
1786 #endif
1787 			ASSERT(aiowp->work_minload1 > 0);
1788 			aiowp->work_minload1--;
1789 			break;
1790 		}
1791 		reqp->req_state = AIO_REQ_INPROGRESS;
1792 	}
1793 	aiowp->work_req = reqp;
1794 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1795 	sig_mutex_unlock(&aiowp->work_qlock1);
1796 	return (reqp);
1797 }
1798 
1799 static void
1800 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1801 {
1802 	aio_req_t **last;
1803 	aio_req_t *lastrp;
1804 	aio_req_t *next;
1805 
1806 	ASSERT(aiowp != NULL);
1807 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1808 	if (POSIX_AIO(reqp)) {
1809 		if (ostate != AIO_REQ_QUEUED)
1810 			return;
1811 	}
1812 	last = &aiowp->work_tail1;
1813 	lastrp = aiowp->work_tail1;
1814 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1815 	while ((next = *last) != NULL) {
1816 		if (next == reqp) {
1817 			*last = next->req_next;
1818 			if (aiowp->work_next1 == next)
1819 				aiowp->work_next1 = next->req_next;
1820 
1821 			if ((next->req_next != NULL) ||
1822 			    (aiowp->work_done1 == 0)) {
1823 				if (aiowp->work_head1 == next)
1824 					aiowp->work_head1 = next->req_next;
1825 				if (aiowp->work_prev1 == next)
1826 					aiowp->work_prev1 = next->req_next;
1827 			} else {
1828 				if (aiowp->work_head1 == next)
1829 					aiowp->work_head1 = lastrp;
1830 				if (aiowp->work_prev1 == next)
1831 					aiowp->work_prev1 = lastrp;
1832 			}
1833 
1834 			if (ostate == AIO_REQ_QUEUED) {
1835 				ASSERT(aiowp->work_count1 >= 1);
1836 				aiowp->work_count1--;
1837 				ASSERT(aiowp->work_minload1 >= 1);
1838 				aiowp->work_minload1--;
1839 			} else {
1840 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1841 				    !POSIX_AIO(reqp));
1842 				aiowp->work_done1--;
1843 			}
1844 			return;
1845 		}
1846 		last = &next->req_next;
1847 		lastrp = next;
1848 	}
1849 	/* NOTREACHED */
1850 }
1851 
1852 static void
1853 _aio_enq_doneq(aio_req_t *reqp)
1854 {
1855 	if (_aio_doneq == NULL) {
1856 		_aio_doneq = reqp;
1857 		reqp->req_next = reqp->req_prev = reqp;
1858 	} else {
1859 		reqp->req_next = _aio_doneq;
1860 		reqp->req_prev = _aio_doneq->req_prev;
1861 		_aio_doneq->req_prev->req_next = reqp;
1862 		_aio_doneq->req_prev = reqp;
1863 	}
1864 	reqp->req_state = AIO_REQ_DONEQ;
1865 	_aio_doneq_cnt++;
1866 }
1867 
1868 /*
1869  * caller owns the _aio_mutex
1870  */
1871 aio_req_t *
1872 _aio_req_remove(aio_req_t *reqp)
1873 {
1874 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1875 		return (NULL);
1876 
1877 	if (reqp) {
1878 		/* request in done queue */
1879 		if (_aio_doneq == reqp)
1880 			_aio_doneq = reqp->req_next;
1881 		if (_aio_doneq == reqp) {
1882 			/* only one request on queue */
1883 			_aio_doneq = NULL;
1884 		} else {
1885 			aio_req_t *tmp = reqp->req_next;
1886 			reqp->req_prev->req_next = tmp;
1887 			tmp->req_prev = reqp->req_prev;
1888 		}
1889 	} else if ((reqp = _aio_doneq) != NULL) {
1890 		if (reqp == reqp->req_next) {
1891 			/* only one request on queue */
1892 			_aio_doneq = NULL;
1893 		} else {
1894 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1895 			_aio_doneq->req_prev = reqp->req_prev;
1896 		}
1897 	}
1898 	if (reqp) {
1899 		_aio_doneq_cnt--;
1900 		reqp->req_next = reqp->req_prev = reqp;
1901 		reqp->req_state = AIO_REQ_DONE;
1902 	}
1903 	return (reqp);
1904 }
1905 
1906 /*
1907  * An AIO request is identified by an aio_result_t pointer.  The library
1908  * maps this aio_result_t pointer to its internal representation using a
1909  * hash table.  This function adds an aio_result_t pointer to the hash table.
1910  */
1911 static int
1912 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1913 {
1914 	aio_hash_t *hashp;
1915 	aio_req_t **prev;
1916 	aio_req_t *next;
1917 
1918 	hashp = _aio_hash + AIOHASH(resultp);
1919 	lmutex_lock(&hashp->hash_lock);
1920 	prev = &hashp->hash_ptr;
1921 	while ((next = *prev) != NULL) {
1922 		if (resultp == next->req_resultp) {
1923 			lmutex_unlock(&hashp->hash_lock);
1924 			return (-1);
1925 		}
1926 		prev = &next->req_link;
1927 	}
1928 	*prev = reqp;
1929 	ASSERT(reqp->req_link == NULL);
1930 	lmutex_unlock(&hashp->hash_lock);
1931 	return (0);
1932 }
1933 
1934 /*
1935  * Remove an entry from the hash table.
1936  */
1937 aio_req_t *
1938 _aio_hash_del(aio_result_t *resultp)
1939 {
1940 	aio_hash_t *hashp;
1941 	aio_req_t **prev;
1942 	aio_req_t *next = NULL;
1943 
1944 	if (_aio_hash != NULL) {
1945 		hashp = _aio_hash + AIOHASH(resultp);
1946 		lmutex_lock(&hashp->hash_lock);
1947 		prev = &hashp->hash_ptr;
1948 		while ((next = *prev) != NULL) {
1949 			if (resultp == next->req_resultp) {
1950 				*prev = next->req_link;
1951 				next->req_link = NULL;
1952 				break;
1953 			}
1954 			prev = &next->req_link;
1955 		}
1956 		lmutex_unlock(&hashp->hash_lock);
1957 	}
1958 	return (next);
1959 }
1960 
1961 /*
1962  *  find an entry in the hash table
1963  */
1964 aio_req_t *
1965 _aio_hash_find(aio_result_t *resultp)
1966 {
1967 	aio_hash_t *hashp;
1968 	aio_req_t **prev;
1969 	aio_req_t *next = NULL;
1970 
1971 	if (_aio_hash != NULL) {
1972 		hashp = _aio_hash + AIOHASH(resultp);
1973 		lmutex_lock(&hashp->hash_lock);
1974 		prev = &hashp->hash_ptr;
1975 		while ((next = *prev) != NULL) {
1976 			if (resultp == next->req_resultp)
1977 				break;
1978 			prev = &next->req_link;
1979 		}
1980 		lmutex_unlock(&hashp->hash_lock);
1981 	}
1982 	return (next);
1983 }
1984 
1985 /*
1986  * AIO interface for POSIX
1987  */
1988 int
1989 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
1990     int mode, int flg)
1991 {
1992 	aio_req_t *reqp;
1993 	aio_args_t *ap;
1994 	int kerr;
1995 
1996 	if (aiocbp == NULL) {
1997 		errno = EINVAL;
1998 		return (-1);
1999 	}
2000 
2001 	/* initialize kaio */
2002 	if (!_kaio_ok)
2003 		_kaio_init();
2004 
2005 	aiocbp->aio_state = NOCHECK;
2006 
2007 	/*
2008 	 * If we have been called because a list I/O
2009 	 * kaio() failed, we dont want to repeat the
2010 	 * system call
2011 	 */
2012 
2013 	if (flg & AIO_KAIO) {
2014 		/*
2015 		 * Try kernel aio first.
2016 		 * If errno is ENOTSUP/EBADFD,
2017 		 * fall back to the thread implementation.
2018 		 */
2019 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2020 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2021 			aiocbp->aio_state = CHECK;
2022 			kerr = (int)_kaio(mode, aiocbp);
2023 			if (kerr == 0)
2024 				return (0);
2025 			if (errno != ENOTSUP && errno != EBADFD) {
2026 				aiocbp->aio_resultp.aio_errno = errno;
2027 				aiocbp->aio_resultp.aio_return = -1;
2028 				aiocbp->aio_state = NOCHECK;
2029 				return (-1);
2030 			}
2031 			if (errno == EBADFD)
2032 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2033 		}
2034 	}
2035 
2036 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2037 	aiocbp->aio_state = USERAIO;
2038 
2039 	if (!__uaio_ok && __uaio_init() == -1)
2040 		return (-1);
2041 
2042 	if ((reqp = _aio_req_alloc()) == NULL) {
2043 		errno = EAGAIN;
2044 		return (-1);
2045 	}
2046 
2047 	/*
2048 	 * If an LIO request, add the list head to the aio request
2049 	 */
2050 	reqp->req_head = lio_head;
2051 	reqp->req_type = AIO_POSIX_REQ;
2052 	reqp->req_op = mode;
2053 	reqp->req_largefile = 0;
2054 
2055 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2056 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2057 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2058 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2059 		reqp->req_sigevent.sigev_signo =
2060 		    aiocbp->aio_sigevent.sigev_signo;
2061 		reqp->req_sigevent.sigev_value.sival_ptr =
2062 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2063 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2064 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2065 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2066 		/*
2067 		 * Reuse the sigevent structure to contain the port number
2068 		 * and the user value.  Same for SIGEV_THREAD, below.
2069 		 */
2070 		reqp->req_sigevent.sigev_signo =
2071 		    pn->portnfy_port;
2072 		reqp->req_sigevent.sigev_value.sival_ptr =
2073 		    pn->portnfy_user;
2074 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2075 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2076 		/*
2077 		 * The sigevent structure contains the port number
2078 		 * and the user value.  Same for SIGEV_PORT, above.
2079 		 */
2080 		reqp->req_sigevent.sigev_signo =
2081 		    aiocbp->aio_sigevent.sigev_signo;
2082 		reqp->req_sigevent.sigev_value.sival_ptr =
2083 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2084 	}
2085 
2086 	reqp->req_resultp = &aiocbp->aio_resultp;
2087 	reqp->req_aiocbp = aiocbp;
2088 	ap = &reqp->req_args;
2089 	ap->fd = aiocbp->aio_fildes;
2090 	ap->buf = (caddr_t)aiocbp->aio_buf;
2091 	ap->bufsz = aiocbp->aio_nbytes;
2092 	ap->offset = aiocbp->aio_offset;
2093 
2094 	if ((flg & AIO_NO_DUPS) &&
2095 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2096 		aio_panic("_aio_rw(): request already in hash table");
2097 		_aio_req_free(reqp);
2098 		errno = EINVAL;
2099 		return (-1);
2100 	}
2101 	_aio_req_add(reqp, nextworker, mode);
2102 	return (0);
2103 }
2104 
2105 #if !defined(_LP64)
2106 /*
2107  * 64-bit AIO interface for POSIX
2108  */
2109 int
2110 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2111     int mode, int flg)
2112 {
2113 	aio_req_t *reqp;
2114 	aio_args_t *ap;
2115 	int kerr;
2116 
2117 	if (aiocbp == NULL) {
2118 		errno = EINVAL;
2119 		return (-1);
2120 	}
2121 
2122 	/* initialize kaio */
2123 	if (!_kaio_ok)
2124 		_kaio_init();
2125 
2126 	aiocbp->aio_state = NOCHECK;
2127 
2128 	/*
2129 	 * If we have been called because a list I/O
2130 	 * kaio() failed, we dont want to repeat the
2131 	 * system call
2132 	 */
2133 
2134 	if (flg & AIO_KAIO) {
2135 		/*
2136 		 * Try kernel aio first.
2137 		 * If errno is ENOTSUP/EBADFD,
2138 		 * fall back to the thread implementation.
2139 		 */
2140 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2141 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2142 			aiocbp->aio_state = CHECK;
2143 			kerr = (int)_kaio(mode, aiocbp);
2144 			if (kerr == 0)
2145 				return (0);
2146 			if (errno != ENOTSUP && errno != EBADFD) {
2147 				aiocbp->aio_resultp.aio_errno = errno;
2148 				aiocbp->aio_resultp.aio_return = -1;
2149 				aiocbp->aio_state = NOCHECK;
2150 				return (-1);
2151 			}
2152 			if (errno == EBADFD)
2153 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2154 		}
2155 	}
2156 
2157 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2158 	aiocbp->aio_state = USERAIO;
2159 
2160 	if (!__uaio_ok && __uaio_init() == -1)
2161 		return (-1);
2162 
2163 	if ((reqp = _aio_req_alloc()) == NULL) {
2164 		errno = EAGAIN;
2165 		return (-1);
2166 	}
2167 
2168 	/*
2169 	 * If an LIO request, add the list head to the aio request
2170 	 */
2171 	reqp->req_head = lio_head;
2172 	reqp->req_type = AIO_POSIX_REQ;
2173 	reqp->req_op = mode;
2174 	reqp->req_largefile = 1;
2175 
2176 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2177 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2178 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2179 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2180 		reqp->req_sigevent.sigev_signo =
2181 		    aiocbp->aio_sigevent.sigev_signo;
2182 		reqp->req_sigevent.sigev_value.sival_ptr =
2183 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2184 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2185 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2186 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2187 		reqp->req_sigevent.sigev_signo =
2188 		    pn->portnfy_port;
2189 		reqp->req_sigevent.sigev_value.sival_ptr =
2190 		    pn->portnfy_user;
2191 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2192 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2193 		reqp->req_sigevent.sigev_signo =
2194 		    aiocbp->aio_sigevent.sigev_signo;
2195 		reqp->req_sigevent.sigev_value.sival_ptr =
2196 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2197 	}
2198 
2199 	reqp->req_resultp = &aiocbp->aio_resultp;
2200 	reqp->req_aiocbp = aiocbp;
2201 	ap = &reqp->req_args;
2202 	ap->fd = aiocbp->aio_fildes;
2203 	ap->buf = (caddr_t)aiocbp->aio_buf;
2204 	ap->bufsz = aiocbp->aio_nbytes;
2205 	ap->offset = aiocbp->aio_offset;
2206 
2207 	if ((flg & AIO_NO_DUPS) &&
2208 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2209 		aio_panic("_aio_rw64(): request already in hash table");
2210 		_aio_req_free(reqp);
2211 		errno = EINVAL;
2212 		return (-1);
2213 	}
2214 	_aio_req_add(reqp, nextworker, mode);
2215 	return (0);
2216 }
2217 #endif	/* !defined(_LP64) */
2218