xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision a194faf8907a6722dcf10ad16c6ca72c9b7bd0ba)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "synonyms.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fdsync(int, int);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48 
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53 
54 /*
55  * switch for kernel async I/O
56  */
57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58 
59 /*
60  * Key for thread-specific data
61  */
62 pthread_key_t _aio_key;
63 
64 /*
65  * Array for determining whether or not a file supports kaio.
66  * Initialized in _kaio_init().
67  */
68 uint32_t *_kaio_supported = NULL;
69 
70 /*
71  *  workers for read/write requests
72  * (__aio_mutex lock protects circular linked list of workers)
73  */
74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76 int __rw_workerscnt;		/* number of read/write workers */
77 
78 /*
79  * worker for notification requests.
80  */
81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83 int __no_workerscnt;		/* number of write workers */
84 
85 aio_req_t *_aio_done_tail;		/* list of done requests */
86 aio_req_t *_aio_done_head;
87 
88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91 
92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94 
95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97 
98 aio_hash_t *_aio_hash;
99 
100 aio_req_t *_aio_doneq;			/* double linked done queue list */
101 
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110 
111 int _max_workers = 256;			/* max number of workers permitted */
112 int _min_workers = 4;			/* min number of workers */
113 int _minworkload = 2;			/* min number of request in q */
114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
115 int __uaio_ok = 0;			/* AIO has been enabled */
116 sigset_t _worker_set;			/* worker's signal mask */
117 
118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119 int _aio_flags = 0;			/* see asyncio.h defines for */
120 
121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122 
123 int hz;					/* clock ticks per second */
124 
125 static int
126 _kaio_supported_init(void)
127 {
128 	void *ptr;
129 	size_t size;
130 
131 	if (_kaio_supported != NULL)	/* already initialized */
132 		return (0);
133 
134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 	if (ptr == MAP_FAILED)
138 		return (-1);
139 	_kaio_supported = ptr;
140 	return (0);
141 }
142 
143 /*
144  * The aio subsystem is initialized when an AIO request is made.
145  * Constants are initialized like the max number of workers that
146  * the subsystem can create, and the minimum number of workers
147  * permitted before imposing some restrictions.  Also, some
148  * workers are created.
149  */
150 int
151 __uaio_init(void)
152 {
153 	int ret = -1;
154 	int i;
155 
156 	lmutex_lock(&__aio_initlock);
157 	while (__aio_initbusy)
158 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
159 	if (__uaio_ok) {	/* already initialized */
160 		lmutex_unlock(&__aio_initlock);
161 		return (0);
162 	}
163 	__aio_initbusy = 1;
164 	lmutex_unlock(&__aio_initlock);
165 
166 	hz = (int)sysconf(_SC_CLK_TCK);
167 	__pid = getpid();
168 
169 	setup_cancelsig(SIGAIOCANCEL);
170 
171 	if (_kaio_supported_init() != 0)
172 		goto out;
173 
174 	/*
175 	 * Allocate and initialize the hash table.
176 	 * Do this only once, even if __uaio_init() is called twice.
177 	 */
178 	if (_aio_hash == NULL) {
179 		/* LINTED pointer cast */
180 		_aio_hash = (aio_hash_t *)mmap(NULL,
181 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
182 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
183 		if ((void *)_aio_hash == MAP_FAILED) {
184 			_aio_hash = NULL;
185 			goto out;
186 		}
187 		for (i = 0; i < HASHSZ; i++)
188 			(void) mutex_init(&_aio_hash[i].hash_lock,
189 			    USYNC_THREAD, NULL);
190 	}
191 
192 	/*
193 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
194 	 */
195 	(void) sigfillset(&_worker_set);
196 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
197 
198 	/*
199 	 * Create one worker to send asynchronous notifications.
200 	 * Do this only once, even if __uaio_init() is called twice.
201 	 */
202 	if (__no_workerscnt == 0 &&
203 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
204 		errno = EAGAIN;
205 		goto out;
206 	}
207 
208 	/*
209 	 * Create the minimum number of read/write workers.
210 	 * And later check whether atleast one worker is created;
211 	 * lwp_create() calls could fail because of segkp exhaustion.
212 	 */
213 	for (i = 0; i < _min_workers; i++)
214 		(void) _aio_create_worker(NULL, AIOREAD);
215 	if (__rw_workerscnt == 0) {
216 		errno = EAGAIN;
217 		goto out;
218 	}
219 
220 	ret = 0;
221 out:
222 	lmutex_lock(&__aio_initlock);
223 	if (ret == 0)
224 		__uaio_ok = 1;
225 	__aio_initbusy = 0;
226 	(void) cond_broadcast(&__aio_initcv);
227 	lmutex_unlock(&__aio_initlock);
228 	return (ret);
229 }
230 
231 /*
232  * Called from close() before actually performing the real _close().
233  */
234 void
235 _aio_close(int fd)
236 {
237 	if (fd < 0)	/* avoid cancelling everything */
238 		return;
239 	/*
240 	 * Cancel all outstanding aio requests for this file descriptor.
241 	 */
242 	if (__uaio_ok)
243 		(void) aiocancel_all(fd);
244 	/*
245 	 * If we have allocated the bit array, clear the bit for this file.
246 	 * The next open may re-use this file descriptor and the new file
247 	 * may have different kaio() behaviour.
248 	 */
249 	if (_kaio_supported != NULL)
250 		CLEAR_KAIO_SUPPORTED(fd);
251 }
252 
253 /*
254  * special kaio cleanup thread sits in a loop in the
255  * kernel waiting for pending kaio requests to complete.
256  */
257 void *
258 _kaio_cleanup_thread(void *arg)
259 {
260 	if (pthread_setspecific(_aio_key, arg) != 0)
261 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
262 	(void) _kaio(AIOSTART);
263 	return (arg);
264 }
265 
266 /*
267  * initialize kaio.
268  */
269 void
270 _kaio_init()
271 {
272 	int error;
273 	sigset_t oset;
274 
275 	lmutex_lock(&__aio_initlock);
276 	while (__aio_initbusy)
277 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
278 	if (_kaio_ok) {		/* already initialized */
279 		lmutex_unlock(&__aio_initlock);
280 		return;
281 	}
282 	__aio_initbusy = 1;
283 	lmutex_unlock(&__aio_initlock);
284 
285 	if (_kaio_supported_init() != 0)
286 		error = ENOMEM;
287 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
288 		error = ENOMEM;
289 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
290 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
291 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
292 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
293 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
294 	}
295 	if (error && _kaiowp != NULL) {
296 		_aio_worker_free(_kaiowp);
297 		_kaiowp = NULL;
298 	}
299 
300 	lmutex_lock(&__aio_initlock);
301 	if (error)
302 		_kaio_ok = -1;
303 	else
304 		_kaio_ok = 1;
305 	__aio_initbusy = 0;
306 	(void) cond_broadcast(&__aio_initcv);
307 	lmutex_unlock(&__aio_initlock);
308 }
309 
310 int
311 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
312     aio_result_t *resultp)
313 {
314 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
315 }
316 
317 int
318 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
319     aio_result_t *resultp)
320 {
321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
322 }
323 
324 #if !defined(_LP64)
325 int
326 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
327     aio_result_t *resultp)
328 {
329 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
330 }
331 
332 int
333 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
334     aio_result_t *resultp)
335 {
336 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
337 }
338 #endif	/* !defined(_LP64) */
339 
340 int
341 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
342     aio_result_t *resultp, int mode)
343 {
344 	aio_req_t *reqp;
345 	aio_args_t *ap;
346 	offset_t loffset;
347 	struct stat64 stat64;
348 	int error = 0;
349 	int kerr;
350 	int umode;
351 
352 	switch (whence) {
353 
354 	case SEEK_SET:
355 		loffset = offset;
356 		break;
357 	case SEEK_CUR:
358 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
359 			error = -1;
360 		else
361 			loffset += offset;
362 		break;
363 	case SEEK_END:
364 		if (fstat64(fd, &stat64) == -1)
365 			error = -1;
366 		else
367 			loffset = offset + stat64.st_size;
368 		break;
369 	default:
370 		errno = EINVAL;
371 		error = -1;
372 	}
373 
374 	if (error)
375 		return (error);
376 
377 	/* initialize kaio */
378 	if (!_kaio_ok)
379 		_kaio_init();
380 
381 	/*
382 	 * _aio_do_request() needs the original request code (mode) to be able
383 	 * to choose the appropiate 32/64 bit function.  All other functions
384 	 * only require the difference between READ and WRITE (umode).
385 	 */
386 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
387 		umode = mode - AIOAREAD64;
388 	else
389 		umode = mode;
390 
391 	/*
392 	 * Try kernel aio first.
393 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
394 	 */
395 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
396 		resultp->aio_errno = 0;
397 		sig_mutex_lock(&__aio_mutex);
398 		_kaio_outstand_cnt++;
399 		sig_mutex_unlock(&__aio_mutex);
400 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
401 		    (umode | AIO_POLL_BIT) : umode),
402 		    fd, buf, bufsz, loffset, resultp);
403 		if (kerr == 0) {
404 			return (0);
405 		}
406 		sig_mutex_lock(&__aio_mutex);
407 		_kaio_outstand_cnt--;
408 		sig_mutex_unlock(&__aio_mutex);
409 		if (errno != ENOTSUP && errno != EBADFD)
410 			return (-1);
411 		if (errno == EBADFD)
412 			SET_KAIO_NOT_SUPPORTED(fd);
413 	}
414 
415 	if (!__uaio_ok && __uaio_init() == -1)
416 		return (-1);
417 
418 	if ((reqp = _aio_req_alloc()) == NULL) {
419 		errno = EAGAIN;
420 		return (-1);
421 	}
422 
423 	/*
424 	 * _aio_do_request() checks reqp->req_op to differentiate
425 	 * between 32 and 64 bit access.
426 	 */
427 	reqp->req_op = mode;
428 	reqp->req_resultp = resultp;
429 	ap = &reqp->req_args;
430 	ap->fd = fd;
431 	ap->buf = buf;
432 	ap->bufsz = bufsz;
433 	ap->offset = loffset;
434 
435 	if (_aio_hash_insert(resultp, reqp) != 0) {
436 		_aio_req_free(reqp);
437 		errno = EINVAL;
438 		return (-1);
439 	}
440 	/*
441 	 * _aio_req_add() only needs the difference between READ and
442 	 * WRITE to choose the right worker queue.
443 	 */
444 	_aio_req_add(reqp, &__nextworker_rw, umode);
445 	return (0);
446 }
447 
448 int
449 aiocancel(aio_result_t *resultp)
450 {
451 	aio_req_t *reqp;
452 	aio_worker_t *aiowp;
453 	int ret;
454 	int done = 0;
455 	int canceled = 0;
456 
457 	if (!__uaio_ok) {
458 		errno = EINVAL;
459 		return (-1);
460 	}
461 
462 	sig_mutex_lock(&__aio_mutex);
463 	reqp = _aio_hash_find(resultp);
464 	if (reqp == NULL) {
465 		if (_aio_outstand_cnt == _aio_req_done_cnt)
466 			errno = EINVAL;
467 		else
468 			errno = EACCES;
469 		ret = -1;
470 	} else {
471 		aiowp = reqp->req_worker;
472 		sig_mutex_lock(&aiowp->work_qlock1);
473 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
474 		sig_mutex_unlock(&aiowp->work_qlock1);
475 
476 		if (canceled) {
477 			ret = 0;
478 		} else {
479 			if (_aio_outstand_cnt == 0 ||
480 			    _aio_outstand_cnt == _aio_req_done_cnt)
481 				errno = EINVAL;
482 			else
483 				errno = EACCES;
484 			ret = -1;
485 		}
486 	}
487 	sig_mutex_unlock(&__aio_mutex);
488 	return (ret);
489 }
490 
491 /*
492  * This must be asynch safe
493  */
494 aio_result_t *
495 aiowait(struct timeval *uwait)
496 {
497 	aio_result_t *uresultp;
498 	aio_result_t *kresultp;
499 	aio_result_t *resultp;
500 	int dontblock;
501 	int timedwait = 0;
502 	int kaio_errno = 0;
503 	struct timeval twait;
504 	struct timeval *wait = NULL;
505 	hrtime_t hrtend;
506 	hrtime_t hres;
507 
508 	if (uwait) {
509 		/*
510 		 * Check for a valid specified wait time.
511 		 * If it is invalid, fail the call right away.
512 		 */
513 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
514 		    uwait->tv_usec >= MICROSEC) {
515 			errno = EINVAL;
516 			return ((aio_result_t *)-1);
517 		}
518 
519 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
520 			hrtend = gethrtime() +
521 			    (hrtime_t)uwait->tv_sec * NANOSEC +
522 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
523 			twait = *uwait;
524 			wait = &twait;
525 			timedwait++;
526 		} else {
527 			/* polling */
528 			sig_mutex_lock(&__aio_mutex);
529 			if (_kaio_outstand_cnt == 0) {
530 				kresultp = (aio_result_t *)-1;
531 			} else {
532 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
533 				    (struct timeval *)-1, 1);
534 				if (kresultp != (aio_result_t *)-1 &&
535 				    kresultp != NULL &&
536 				    kresultp != (aio_result_t *)1) {
537 					_kaio_outstand_cnt--;
538 					sig_mutex_unlock(&__aio_mutex);
539 					return (kresultp);
540 				}
541 			}
542 			uresultp = _aio_req_done();
543 			sig_mutex_unlock(&__aio_mutex);
544 			if (uresultp != NULL &&
545 			    uresultp != (aio_result_t *)-1) {
546 				return (uresultp);
547 			}
548 			if (uresultp == (aio_result_t *)-1 &&
549 			    kresultp == (aio_result_t *)-1) {
550 				errno = EINVAL;
551 				return ((aio_result_t *)-1);
552 			} else {
553 				return (NULL);
554 			}
555 		}
556 	}
557 
558 	for (;;) {
559 		sig_mutex_lock(&__aio_mutex);
560 		uresultp = _aio_req_done();
561 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
562 			sig_mutex_unlock(&__aio_mutex);
563 			resultp = uresultp;
564 			break;
565 		}
566 		_aiowait_flag++;
567 		dontblock = (uresultp == (aio_result_t *)-1);
568 		if (dontblock && _kaio_outstand_cnt == 0) {
569 			kresultp = (aio_result_t *)-1;
570 			kaio_errno = EINVAL;
571 		} else {
572 			sig_mutex_unlock(&__aio_mutex);
573 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
574 			    wait, dontblock);
575 			sig_mutex_lock(&__aio_mutex);
576 			kaio_errno = errno;
577 		}
578 		_aiowait_flag--;
579 		sig_mutex_unlock(&__aio_mutex);
580 		if (kresultp == (aio_result_t *)1) {
581 			/* aiowait() awakened by an aionotify() */
582 			continue;
583 		} else if (kresultp != NULL &&
584 		    kresultp != (aio_result_t *)-1) {
585 			resultp = kresultp;
586 			sig_mutex_lock(&__aio_mutex);
587 			_kaio_outstand_cnt--;
588 			sig_mutex_unlock(&__aio_mutex);
589 			break;
590 		} else if (kresultp == (aio_result_t *)-1 &&
591 		    kaio_errno == EINVAL &&
592 		    uresultp == (aio_result_t *)-1) {
593 			errno = kaio_errno;
594 			resultp = (aio_result_t *)-1;
595 			break;
596 		} else if (kresultp == (aio_result_t *)-1 &&
597 		    kaio_errno == EINTR) {
598 			errno = kaio_errno;
599 			resultp = (aio_result_t *)-1;
600 			break;
601 		} else if (timedwait) {
602 			hres = hrtend - gethrtime();
603 			if (hres <= 0) {
604 				/* time is up; return */
605 				resultp = NULL;
606 				break;
607 			} else {
608 				/*
609 				 * Some time left.  Round up the remaining time
610 				 * in nanoseconds to microsec.  Retry the call.
611 				 */
612 				hres += (NANOSEC / MICROSEC) - 1;
613 				wait->tv_sec = hres / NANOSEC;
614 				wait->tv_usec =
615 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
616 			}
617 		} else {
618 			ASSERT(kresultp == NULL && uresultp == NULL);
619 			resultp = NULL;
620 			continue;
621 		}
622 	}
623 	return (resultp);
624 }
625 
626 /*
627  * _aio_get_timedelta calculates the remaining time and stores the result
628  * into timespec_t *wait.
629  */
630 
631 int
632 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
633 {
634 	int	ret = 0;
635 	struct	timeval cur;
636 	timespec_t curtime;
637 
638 	(void) gettimeofday(&cur, NULL);
639 	curtime.tv_sec = cur.tv_sec;
640 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
641 
642 	if (end->tv_sec >= curtime.tv_sec) {
643 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
644 		if (end->tv_nsec >= curtime.tv_nsec) {
645 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
646 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
647 				ret = -1;	/* timer expired */
648 		} else {
649 			if (end->tv_sec > curtime.tv_sec) {
650 				wait->tv_sec -= 1;
651 				wait->tv_nsec = NANOSEC -
652 				    (curtime.tv_nsec - end->tv_nsec);
653 			} else {
654 				ret = -1;	/* timer expired */
655 			}
656 		}
657 	} else {
658 		ret = -1;
659 	}
660 	return (ret);
661 }
662 
663 /*
664  * If closing by file descriptor: we will simply cancel all the outstanding
665  * aio`s and return.  Those aio's in question will have either noticed the
666  * cancellation notice before, during, or after initiating io.
667  */
668 int
669 aiocancel_all(int fd)
670 {
671 	aio_req_t *reqp;
672 	aio_req_t **reqpp;
673 	aio_worker_t *first;
674 	aio_worker_t *next;
675 	int canceled = 0;
676 	int done = 0;
677 	int cancelall = 0;
678 
679 	sig_mutex_lock(&__aio_mutex);
680 
681 	if (_aio_outstand_cnt == 0) {
682 		sig_mutex_unlock(&__aio_mutex);
683 		return (AIO_ALLDONE);
684 	}
685 
686 	/*
687 	 * Cancel requests from the read/write workers' queues.
688 	 */
689 	first = __nextworker_rw;
690 	next = first;
691 	do {
692 		_aio_cancel_work(next, fd, &canceled, &done);
693 	} while ((next = next->work_forw) != first);
694 
695 	/*
696 	 * finally, check if there are requests on the done queue that
697 	 * should be canceled.
698 	 */
699 	if (fd < 0)
700 		cancelall = 1;
701 	reqpp = &_aio_done_tail;
702 	while ((reqp = *reqpp) != NULL) {
703 		if (cancelall || reqp->req_args.fd == fd) {
704 			*reqpp = reqp->req_next;
705 			_aio_donecnt--;
706 			(void) _aio_hash_del(reqp->req_resultp);
707 			_aio_req_free(reqp);
708 		} else
709 			reqpp = &reqp->req_next;
710 	}
711 	if (cancelall) {
712 		ASSERT(_aio_donecnt == 0);
713 		_aio_done_head = NULL;
714 	}
715 	sig_mutex_unlock(&__aio_mutex);
716 
717 	if (canceled && done == 0)
718 		return (AIO_CANCELED);
719 	else if (done && canceled == 0)
720 		return (AIO_ALLDONE);
721 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
722 		return ((int)_kaio(AIOCANCEL, fd, NULL));
723 	return (AIO_NOTCANCELED);
724 }
725 
726 /*
727  * Cancel requests from a given work queue.  If the file descriptor
728  * parameter, fd, is non-negative, then only cancel those requests
729  * in this queue that are to this file descriptor.  If the fd
730  * parameter is -1, then cancel all requests.
731  */
732 static void
733 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
734 {
735 	aio_req_t *reqp;
736 
737 	sig_mutex_lock(&aiowp->work_qlock1);
738 	/*
739 	 * cancel queued requests first.
740 	 */
741 	reqp = aiowp->work_tail1;
742 	while (reqp != NULL) {
743 		if (fd < 0 || reqp->req_args.fd == fd) {
744 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
745 				/*
746 				 * Callers locks were dropped.
747 				 * reqp is invalid; start traversing
748 				 * the list from the beginning again.
749 				 */
750 				reqp = aiowp->work_tail1;
751 				continue;
752 			}
753 		}
754 		reqp = reqp->req_next;
755 	}
756 	/*
757 	 * Since the queued requests have been canceled, there can
758 	 * only be one inprogress request that should be canceled.
759 	 */
760 	if ((reqp = aiowp->work_req) != NULL &&
761 	    (fd < 0 || reqp->req_args.fd == fd))
762 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
763 	sig_mutex_unlock(&aiowp->work_qlock1);
764 }
765 
766 /*
767  * Cancel a request.  Return 1 if the callers locks were temporarily
768  * dropped, otherwise return 0.
769  */
770 int
771 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
772 {
773 	int ostate = reqp->req_state;
774 
775 	ASSERT(MUTEX_HELD(&__aio_mutex));
776 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
777 	if (ostate == AIO_REQ_CANCELED)
778 		return (0);
779 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
780 		(*done)++;
781 		return (0);
782 	}
783 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
784 		ASSERT(POSIX_AIO(reqp));
785 		/* Cancel the queued aio_fsync() request */
786 		if (!reqp->req_head->lio_canned) {
787 			reqp->req_head->lio_canned = 1;
788 			_aio_outstand_cnt--;
789 			(*canceled)++;
790 		}
791 		return (0);
792 	}
793 	reqp->req_state = AIO_REQ_CANCELED;
794 	_aio_req_del(aiowp, reqp, ostate);
795 	(void) _aio_hash_del(reqp->req_resultp);
796 	(*canceled)++;
797 	if (reqp == aiowp->work_req) {
798 		ASSERT(ostate == AIO_REQ_INPROGRESS);
799 		/*
800 		 * Set the result values now, before _aiodone() is called.
801 		 * We do this because the application can expect aio_return
802 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
803 		 * immediately after a successful return from aiocancel()
804 		 * or aio_cancel().
805 		 */
806 		_aio_set_result(reqp, -1, ECANCELED);
807 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
808 		return (0);
809 	}
810 	if (!POSIX_AIO(reqp)) {
811 		_aio_outstand_cnt--;
812 		_aio_set_result(reqp, -1, ECANCELED);
813 		return (0);
814 	}
815 	sig_mutex_unlock(&aiowp->work_qlock1);
816 	sig_mutex_unlock(&__aio_mutex);
817 	_aiodone(reqp, -1, ECANCELED);
818 	sig_mutex_lock(&__aio_mutex);
819 	sig_mutex_lock(&aiowp->work_qlock1);
820 	return (1);
821 }
822 
823 int
824 _aio_create_worker(aio_req_t *reqp, int mode)
825 {
826 	aio_worker_t *aiowp, **workers, **nextworker;
827 	int *aio_workerscnt;
828 	void *(*func)(void *);
829 	sigset_t oset;
830 	int error;
831 
832 	/*
833 	 * Put the new worker thread in the right queue.
834 	 */
835 	switch (mode) {
836 	case AIOREAD:
837 	case AIOWRITE:
838 	case AIOAREAD:
839 	case AIOAWRITE:
840 #if !defined(_LP64)
841 	case AIOAREAD64:
842 	case AIOAWRITE64:
843 #endif
844 		workers = &__workers_rw;
845 		nextworker = &__nextworker_rw;
846 		aio_workerscnt = &__rw_workerscnt;
847 		func = _aio_do_request;
848 		break;
849 	case AIONOTIFY:
850 		workers = &__workers_no;
851 		nextworker = &__nextworker_no;
852 		func = _aio_do_notify;
853 		aio_workerscnt = &__no_workerscnt;
854 		break;
855 	default:
856 		aio_panic("_aio_create_worker: invalid mode");
857 		break;
858 	}
859 
860 	if ((aiowp = _aio_worker_alloc()) == NULL)
861 		return (-1);
862 
863 	if (reqp) {
864 		reqp->req_state = AIO_REQ_QUEUED;
865 		reqp->req_worker = aiowp;
866 		aiowp->work_head1 = reqp;
867 		aiowp->work_tail1 = reqp;
868 		aiowp->work_next1 = reqp;
869 		aiowp->work_count1 = 1;
870 		aiowp->work_minload1 = 1;
871 	}
872 
873 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
874 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
875 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
876 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
877 	if (error) {
878 		if (reqp) {
879 			reqp->req_state = 0;
880 			reqp->req_worker = NULL;
881 		}
882 		_aio_worker_free(aiowp);
883 		return (-1);
884 	}
885 
886 	lmutex_lock(&__aio_mutex);
887 	(*aio_workerscnt)++;
888 	if (*workers == NULL) {
889 		aiowp->work_forw = aiowp;
890 		aiowp->work_backw = aiowp;
891 		*nextworker = aiowp;
892 		*workers = aiowp;
893 	} else {
894 		aiowp->work_backw = (*workers)->work_backw;
895 		aiowp->work_forw = (*workers);
896 		(*workers)->work_backw->work_forw = aiowp;
897 		(*workers)->work_backw = aiowp;
898 	}
899 	_aio_worker_cnt++;
900 	lmutex_unlock(&__aio_mutex);
901 
902 	(void) thr_continue(aiowp->work_tid);
903 
904 	return (0);
905 }
906 
907 /*
908  * This is the worker's main routine.
909  * The task of this function is to execute all queued requests;
910  * once the last pending request is executed this function will block
911  * in _aio_idle().  A new incoming request must wakeup this thread to
912  * restart the work.
913  * Every worker has an own work queue.  The queue lock is required
914  * to synchronize the addition of new requests for this worker or
915  * cancellation of pending/running requests.
916  *
917  * Cancellation scenarios:
918  * The cancellation of a request is being done asynchronously using
919  * _aio_cancel_req() from another thread context.
920  * A queued request can be cancelled in different manners :
921  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
922  *	- lock the queue -> remove the request -> unlock the queue
923  *	- this function/thread does not detect this cancellation process
924  * b) request is in progress (AIO_REQ_INPROGRESS) :
925  *	- this function first allow the cancellation of the running
926  *	  request with the flag "work_cancel_flg=1"
927  * 		see _aio_req_get() -> _aio_cancel_on()
928  *	  During this phase, it is allowed to interrupt the worker
929  *	  thread running the request (this thread) using the SIGAIOCANCEL
930  *	  signal.
931  *	  Once this thread returns from the kernel (because the request
932  *	  is just done), then it must disable a possible cancellation
933  *	  and proceed to finish the request.  To disable the cancellation
934  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
935  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
936  *	  same procedure as in a)
937  *
938  * To b)
939  *	This thread uses sigsetjmp() to define the position in the code, where
940  *	it wish to continue working in the case that a SIGAIOCANCEL signal
941  *	is detected.
942  *	Normally this thread should get the cancellation signal during the
943  *	kernel phase (reading or writing).  In that case the signal handler
944  *	aiosigcancelhndlr() is activated using the worker thread context,
945  *	which again will use the siglongjmp() function to break the standard
946  *	code flow and jump to the "sigsetjmp" position, provided that
947  *	"work_cancel_flg" is set to "1".
948  *	Because the "work_cancel_flg" is only manipulated by this worker
949  *	thread and it can only run on one CPU at a given time, it is not
950  *	necessary to protect that flag with the queue lock.
951  *	Returning from the kernel (read or write system call) we must
952  *	first disable the use of the SIGAIOCANCEL signal and accordingly
953  *	the use of the siglongjmp() function to prevent a possible deadlock:
954  *	- It can happens that this worker thread returns from the kernel and
955  *	  blocks in "work_qlock1",
956  *	- then a second thread cancels the apparently "in progress" request
957  *	  and sends the SIGAIOCANCEL signal to the worker thread,
958  *	- the worker thread gets assigned the "work_qlock1" and will returns
959  *	  from the kernel,
960  *	- the kernel detects the pending signal and activates the signal
961  *	  handler instead,
962  *	- if the "work_cancel_flg" is still set then the signal handler
963  *	  should use siglongjmp() to cancel the "in progress" request and
964  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
965  *	  for a second time => deadlock.
966  *	To avoid that situation we disable the cancellation of the request
967  *	in progress BEFORE we try to acquire the work_qlock1.
968  *	In that case the signal handler will not call siglongjmp() and the
969  *	worker thread will continue running the standard code flow.
970  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
971  *	an eventually required siglongjmp() freeing the work_qlock1 and
972  *	avoiding a deadlock.
973  */
974 void *
975 _aio_do_request(void *arglist)
976 {
977 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
978 	ulwp_t *self = curthread;
979 	struct aio_args *arg;
980 	aio_req_t *reqp;		/* current AIO request */
981 	ssize_t retval;
982 	int error;
983 
984 	if (pthread_setspecific(_aio_key, aiowp) != 0)
985 		aio_panic("_aio_do_request, pthread_setspecific()");
986 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
987 	ASSERT(aiowp->work_req == NULL);
988 
989 	/*
990 	 * We resume here when an operation is cancelled.
991 	 * On first entry, aiowp->work_req == NULL, so all
992 	 * we do is block SIGAIOCANCEL.
993 	 */
994 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
995 	ASSERT(self->ul_sigdefer == 0);
996 
997 	sigoff(self);	/* block SIGAIOCANCEL */
998 	if (aiowp->work_req != NULL)
999 		_aio_finish_request(aiowp, -1, ECANCELED);
1000 
1001 	for (;;) {
1002 		/*
1003 		 * Put completed requests on aio_done_list.  This has
1004 		 * to be done as part of the main loop to ensure that
1005 		 * we don't artificially starve any aiowait'ers.
1006 		 */
1007 		if (aiowp->work_done1)
1008 			_aio_work_done(aiowp);
1009 
1010 top:
1011 		/* consume any deferred SIGAIOCANCEL signal here */
1012 		sigon(self);
1013 		sigoff(self);
1014 
1015 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1016 			if (_aio_idle(aiowp) != 0)
1017 				goto top;
1018 		}
1019 		arg = &reqp->req_args;
1020 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1021 		    reqp->req_state == AIO_REQ_CANCELED);
1022 		error = 0;
1023 
1024 		switch (reqp->req_op) {
1025 		case AIOREAD:
1026 		case AIOAREAD:
1027 			sigon(self);	/* unblock SIGAIOCANCEL */
1028 			retval = pread(arg->fd, arg->buf,
1029 			    arg->bufsz, arg->offset);
1030 			if (retval == -1) {
1031 				if (errno == ESPIPE) {
1032 					retval = read(arg->fd,
1033 					    arg->buf, arg->bufsz);
1034 					if (retval == -1)
1035 						error = errno;
1036 				} else {
1037 					error = errno;
1038 				}
1039 			}
1040 			sigoff(self);	/* block SIGAIOCANCEL */
1041 			break;
1042 		case AIOWRITE:
1043 		case AIOAWRITE:
1044 			sigon(self);	/* unblock SIGAIOCANCEL */
1045 			retval = pwrite(arg->fd, arg->buf,
1046 			    arg->bufsz, arg->offset);
1047 			if (retval == -1) {
1048 				if (errno == ESPIPE) {
1049 					retval = write(arg->fd,
1050 					    arg->buf, arg->bufsz);
1051 					if (retval == -1)
1052 						error = errno;
1053 				} else {
1054 					error = errno;
1055 				}
1056 			}
1057 			sigoff(self);	/* block SIGAIOCANCEL */
1058 			break;
1059 #if !defined(_LP64)
1060 		case AIOAREAD64:
1061 			sigon(self);	/* unblock SIGAIOCANCEL */
1062 			retval = pread64(arg->fd, arg->buf,
1063 			    arg->bufsz, arg->offset);
1064 			if (retval == -1) {
1065 				if (errno == ESPIPE) {
1066 					retval = read(arg->fd,
1067 					    arg->buf, arg->bufsz);
1068 					if (retval == -1)
1069 						error = errno;
1070 				} else {
1071 					error = errno;
1072 				}
1073 			}
1074 			sigoff(self);	/* block SIGAIOCANCEL */
1075 			break;
1076 		case AIOAWRITE64:
1077 			sigon(self);	/* unblock SIGAIOCANCEL */
1078 			retval = pwrite64(arg->fd, arg->buf,
1079 			    arg->bufsz, arg->offset);
1080 			if (retval == -1) {
1081 				if (errno == ESPIPE) {
1082 					retval = write(arg->fd,
1083 					    arg->buf, arg->bufsz);
1084 					if (retval == -1)
1085 						error = errno;
1086 				} else {
1087 					error = errno;
1088 				}
1089 			}
1090 			sigoff(self);	/* block SIGAIOCANCEL */
1091 			break;
1092 #endif	/* !defined(_LP64) */
1093 		case AIOFSYNC:
1094 			if (_aio_fsync_del(aiowp, reqp))
1095 				goto top;
1096 			ASSERT(reqp->req_head == NULL);
1097 			/*
1098 			 * All writes for this fsync request are now
1099 			 * acknowledged.  Now make these writes visible
1100 			 * and put the final request into the hash table.
1101 			 */
1102 			if (reqp->req_state == AIO_REQ_CANCELED) {
1103 				/* EMPTY */;
1104 			} else if (arg->offset == O_SYNC) {
1105 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1106 					error = errno;
1107 			} else {
1108 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1109 					error = errno;
1110 			}
1111 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1112 				aio_panic("_aio_do_request(): AIOFSYNC: "
1113 				    "request already in hash table");
1114 			break;
1115 		default:
1116 			aio_panic("_aio_do_request, bad op");
1117 		}
1118 
1119 		_aio_finish_request(aiowp, retval, error);
1120 	}
1121 	/* NOTREACHED */
1122 	return (NULL);
1123 }
1124 
1125 /*
1126  * Perform the tail processing for _aio_do_request().
1127  * The in-progress request may or may not have been cancelled.
1128  */
1129 static void
1130 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1131 {
1132 	aio_req_t *reqp;
1133 
1134 	sig_mutex_lock(&aiowp->work_qlock1);
1135 	if ((reqp = aiowp->work_req) == NULL)
1136 		sig_mutex_unlock(&aiowp->work_qlock1);
1137 	else {
1138 		aiowp->work_req = NULL;
1139 		if (reqp->req_state == AIO_REQ_CANCELED) {
1140 			retval = -1;
1141 			error = ECANCELED;
1142 		}
1143 		if (!POSIX_AIO(reqp)) {
1144 			int notify;
1145 			sig_mutex_unlock(&aiowp->work_qlock1);
1146 			sig_mutex_lock(&__aio_mutex);
1147 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1148 				reqp->req_state = AIO_REQ_DONE;
1149 			/*
1150 			 * If it was canceled, this request will not be
1151 			 * added to done list. Just free it.
1152 			 */
1153 			if (error == ECANCELED) {
1154 				_aio_outstand_cnt--;
1155 				_aio_req_free(reqp);
1156 			} else {
1157 				_aio_set_result(reqp, retval, error);
1158 				_aio_req_done_cnt++;
1159 			}
1160 			/*
1161 			 * Notify any thread that may have blocked
1162 			 * because it saw an outstanding request.
1163 			 */
1164 			notify = 0;
1165 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1166 				notify = 1;
1167 			}
1168 			sig_mutex_unlock(&__aio_mutex);
1169 			if (notify) {
1170 				(void) _kaio(AIONOTIFY);
1171 			}
1172 		} else {
1173 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1174 				reqp->req_state = AIO_REQ_DONE;
1175 			sig_mutex_unlock(&aiowp->work_qlock1);
1176 			_aiodone(reqp, retval, error);
1177 		}
1178 	}
1179 }
1180 
1181 void
1182 _aio_req_mark_done(aio_req_t *reqp)
1183 {
1184 #if !defined(_LP64)
1185 	if (reqp->req_largefile)
1186 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1187 	else
1188 #endif
1189 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1190 }
1191 
1192 /*
1193  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1194  * hopefully to consume one of our queued signals.
1195  */
1196 static void
1197 _aio_delay(int ticks)
1198 {
1199 	(void) usleep(ticks * (MICROSEC / hz));
1200 }
1201 
1202 /*
1203  * Actually send the notifications.
1204  * We could block indefinitely here if the application
1205  * is not listening for the signal or port notifications.
1206  */
1207 static void
1208 send_notification(notif_param_t *npp)
1209 {
1210 	extern int __sigqueue(pid_t pid, int signo,
1211 	    /* const union sigval */ void *value, int si_code, int block);
1212 
1213 	if (npp->np_signo)
1214 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1215 		    SI_ASYNCIO, 1);
1216 	else if (npp->np_port >= 0)
1217 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1218 		    npp->np_event, npp->np_object, npp->np_user);
1219 
1220 	if (npp->np_lio_signo)
1221 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1222 		    SI_ASYNCIO, 1);
1223 	else if (npp->np_lio_port >= 0)
1224 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1225 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1226 }
1227 
1228 /*
1229  * Asynchronous notification worker.
1230  */
1231 void *
1232 _aio_do_notify(void *arg)
1233 {
1234 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1235 	aio_req_t *reqp;
1236 
1237 	/*
1238 	 * This isn't really necessary.  All signals are blocked.
1239 	 */
1240 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1241 		aio_panic("_aio_do_notify, pthread_setspecific()");
1242 
1243 	/*
1244 	 * Notifications are never cancelled.
1245 	 * All signals remain blocked, forever.
1246 	 */
1247 	for (;;) {
1248 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1249 			if (_aio_idle(aiowp) != 0)
1250 				aio_panic("_aio_do_notify: _aio_idle() failed");
1251 		}
1252 		send_notification(&reqp->req_notify);
1253 		_aio_req_free(reqp);
1254 	}
1255 
1256 	/* NOTREACHED */
1257 	return (NULL);
1258 }
1259 
1260 /*
1261  * Do the completion semantics for a request that was either canceled
1262  * by _aio_cancel_req() or was completed by _aio_do_request().
1263  */
1264 static void
1265 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1266 {
1267 	aio_result_t *resultp = reqp->req_resultp;
1268 	int notify = 0;
1269 	aio_lio_t *head;
1270 	int sigev_none;
1271 	int sigev_signal;
1272 	int sigev_thread;
1273 	int sigev_port;
1274 	notif_param_t np;
1275 
1276 	/*
1277 	 * We call _aiodone() only for Posix I/O.
1278 	 */
1279 	ASSERT(POSIX_AIO(reqp));
1280 
1281 	sigev_none = 0;
1282 	sigev_signal = 0;
1283 	sigev_thread = 0;
1284 	sigev_port = 0;
1285 	np.np_signo = 0;
1286 	np.np_port = -1;
1287 	np.np_lio_signo = 0;
1288 	np.np_lio_port = -1;
1289 
1290 	switch (reqp->req_sigevent.sigev_notify) {
1291 	case SIGEV_NONE:
1292 		sigev_none = 1;
1293 		break;
1294 	case SIGEV_SIGNAL:
1295 		sigev_signal = 1;
1296 		break;
1297 	case SIGEV_THREAD:
1298 		sigev_thread = 1;
1299 		break;
1300 	case SIGEV_PORT:
1301 		sigev_port = 1;
1302 		break;
1303 	default:
1304 		aio_panic("_aiodone: improper sigev_notify");
1305 		break;
1306 	}
1307 
1308 	/*
1309 	 * Figure out the notification parameters while holding __aio_mutex.
1310 	 * Actually perform the notifications after dropping __aio_mutex.
1311 	 * This allows us to sleep for a long time (if the notifications
1312 	 * incur delays) without impeding other async I/O operations.
1313 	 */
1314 
1315 	sig_mutex_lock(&__aio_mutex);
1316 
1317 	if (sigev_signal) {
1318 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1319 			notify = 1;
1320 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1321 	} else if (sigev_thread | sigev_port) {
1322 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1323 			notify = 1;
1324 		np.np_event = reqp->req_op;
1325 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1326 			np.np_event = AIOFSYNC64;
1327 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1328 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1329 	}
1330 
1331 	if (resultp->aio_errno == EINPROGRESS)
1332 		_aio_set_result(reqp, retval, error);
1333 
1334 	_aio_outstand_cnt--;
1335 
1336 	head = reqp->req_head;
1337 	reqp->req_head = NULL;
1338 
1339 	if (sigev_none) {
1340 		_aio_enq_doneq(reqp);
1341 		reqp = NULL;
1342 	} else {
1343 		(void) _aio_hash_del(resultp);
1344 		_aio_req_mark_done(reqp);
1345 	}
1346 
1347 	_aio_waitn_wakeup();
1348 
1349 	/*
1350 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1351 	 * __aio_suspend() increments "_aio_kernel_suspend"
1352 	 * when they are waiting in the kernel for completed I/Os.
1353 	 *
1354 	 * _kaio(AIONOTIFY) awakes the corresponding function
1355 	 * in the kernel; then the corresponding __aio_waitn() or
1356 	 * __aio_suspend() function could reap the recently
1357 	 * completed I/Os (_aiodone()).
1358 	 */
1359 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1360 		(void) _kaio(AIONOTIFY);
1361 
1362 	sig_mutex_unlock(&__aio_mutex);
1363 
1364 	if (head != NULL) {
1365 		/*
1366 		 * If all the lio requests have completed,
1367 		 * prepare to notify the waiting thread.
1368 		 */
1369 		sig_mutex_lock(&head->lio_mutex);
1370 		ASSERT(head->lio_refcnt == head->lio_nent);
1371 		if (head->lio_refcnt == 1) {
1372 			int waiting = 0;
1373 			if (head->lio_mode == LIO_WAIT) {
1374 				if ((waiting = head->lio_waiting) != 0)
1375 					(void) cond_signal(&head->lio_cond_cv);
1376 			} else if (head->lio_port < 0) { /* none or signal */
1377 				if ((np.np_lio_signo = head->lio_signo) != 0)
1378 					notify = 1;
1379 				np.np_lio_user = head->lio_sigval.sival_ptr;
1380 			} else {			/* thread or port */
1381 				notify = 1;
1382 				np.np_lio_port = head->lio_port;
1383 				np.np_lio_event = head->lio_event;
1384 				np.np_lio_object =
1385 				    (uintptr_t)head->lio_sigevent;
1386 				np.np_lio_user = head->lio_sigval.sival_ptr;
1387 			}
1388 			head->lio_nent = head->lio_refcnt = 0;
1389 			sig_mutex_unlock(&head->lio_mutex);
1390 			if (waiting == 0)
1391 				_aio_lio_free(head);
1392 		} else {
1393 			head->lio_nent--;
1394 			head->lio_refcnt--;
1395 			sig_mutex_unlock(&head->lio_mutex);
1396 		}
1397 	}
1398 
1399 	/*
1400 	 * The request is completed; now perform the notifications.
1401 	 */
1402 	if (notify) {
1403 		if (reqp != NULL) {
1404 			/*
1405 			 * We usually put the request on the notification
1406 			 * queue because we don't want to block and delay
1407 			 * other operations behind us in the work queue.
1408 			 * Also we must never block on a cancel notification
1409 			 * because we are being called from an application
1410 			 * thread in this case and that could lead to deadlock
1411 			 * if no other thread is receiving notificatins.
1412 			 */
1413 			reqp->req_notify = np;
1414 			reqp->req_op = AIONOTIFY;
1415 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1416 			reqp = NULL;
1417 		} else {
1418 			/*
1419 			 * We already put the request on the done queue,
1420 			 * so we can't queue it to the notification queue.
1421 			 * Just do the notification directly.
1422 			 */
1423 			send_notification(&np);
1424 		}
1425 	}
1426 
1427 	if (reqp != NULL)
1428 		_aio_req_free(reqp);
1429 }
1430 
1431 /*
1432  * Delete fsync requests from list head until there is
1433  * only one left.  Return 0 when there is only one,
1434  * otherwise return a non-zero value.
1435  */
1436 static int
1437 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1438 {
1439 	aio_lio_t *head = reqp->req_head;
1440 	int rval = 0;
1441 
1442 	ASSERT(reqp == aiowp->work_req);
1443 	sig_mutex_lock(&aiowp->work_qlock1);
1444 	sig_mutex_lock(&head->lio_mutex);
1445 	if (head->lio_refcnt > 1) {
1446 		head->lio_refcnt--;
1447 		head->lio_nent--;
1448 		aiowp->work_req = NULL;
1449 		sig_mutex_unlock(&head->lio_mutex);
1450 		sig_mutex_unlock(&aiowp->work_qlock1);
1451 		sig_mutex_lock(&__aio_mutex);
1452 		_aio_outstand_cnt--;
1453 		_aio_waitn_wakeup();
1454 		sig_mutex_unlock(&__aio_mutex);
1455 		_aio_req_free(reqp);
1456 		return (1);
1457 	}
1458 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1459 	reqp->req_head = NULL;
1460 	if (head->lio_canned)
1461 		reqp->req_state = AIO_REQ_CANCELED;
1462 	if (head->lio_mode == LIO_DESTROY) {
1463 		aiowp->work_req = NULL;
1464 		rval = 1;
1465 	}
1466 	sig_mutex_unlock(&head->lio_mutex);
1467 	sig_mutex_unlock(&aiowp->work_qlock1);
1468 	head->lio_refcnt--;
1469 	head->lio_nent--;
1470 	_aio_lio_free(head);
1471 	if (rval != 0)
1472 		_aio_req_free(reqp);
1473 	return (rval);
1474 }
1475 
1476 /*
1477  * A worker is set idle when its work queue is empty.
1478  * The worker checks again that it has no more work
1479  * and then goes to sleep waiting for more work.
1480  */
1481 int
1482 _aio_idle(aio_worker_t *aiowp)
1483 {
1484 	int error = 0;
1485 
1486 	sig_mutex_lock(&aiowp->work_qlock1);
1487 	if (aiowp->work_count1 == 0) {
1488 		ASSERT(aiowp->work_minload1 == 0);
1489 		aiowp->work_idleflg = 1;
1490 		/*
1491 		 * A cancellation handler is not needed here.
1492 		 * aio worker threads are never cancelled via pthread_cancel().
1493 		 */
1494 		error = sig_cond_wait(&aiowp->work_idle_cv,
1495 		    &aiowp->work_qlock1);
1496 		/*
1497 		 * The idle flag is normally cleared before worker is awakened
1498 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1499 		 */
1500 		if (error)
1501 			aiowp->work_idleflg = 0;
1502 	}
1503 	sig_mutex_unlock(&aiowp->work_qlock1);
1504 	return (error);
1505 }
1506 
1507 /*
1508  * A worker's completed AIO requests are placed onto a global
1509  * done queue.  The application is only sent a SIGIO signal if
1510  * the process has a handler enabled and it is not waiting via
1511  * aiowait().
1512  */
1513 static void
1514 _aio_work_done(aio_worker_t *aiowp)
1515 {
1516 	aio_req_t *reqp;
1517 
1518 	sig_mutex_lock(&aiowp->work_qlock1);
1519 	reqp = aiowp->work_prev1;
1520 	reqp->req_next = NULL;
1521 	aiowp->work_done1 = 0;
1522 	aiowp->work_tail1 = aiowp->work_next1;
1523 	if (aiowp->work_tail1 == NULL)
1524 		aiowp->work_head1 = NULL;
1525 	aiowp->work_prev1 = NULL;
1526 	sig_mutex_unlock(&aiowp->work_qlock1);
1527 	sig_mutex_lock(&__aio_mutex);
1528 	_aio_donecnt++;
1529 	_aio_outstand_cnt--;
1530 	_aio_req_done_cnt--;
1531 	ASSERT(_aio_donecnt > 0 &&
1532 	    _aio_outstand_cnt >= 0 &&
1533 	    _aio_req_done_cnt >= 0);
1534 	ASSERT(reqp != NULL);
1535 
1536 	if (_aio_done_tail == NULL) {
1537 		_aio_done_head = _aio_done_tail = reqp;
1538 	} else {
1539 		_aio_done_head->req_next = reqp;
1540 		_aio_done_head = reqp;
1541 	}
1542 
1543 	if (_aiowait_flag) {
1544 		sig_mutex_unlock(&__aio_mutex);
1545 		(void) _kaio(AIONOTIFY);
1546 	} else {
1547 		sig_mutex_unlock(&__aio_mutex);
1548 		if (_sigio_enabled)
1549 			(void) kill(__pid, SIGIO);
1550 	}
1551 }
1552 
1553 /*
1554  * The done queue consists of AIO requests that are in either the
1555  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1556  * are discarded.  If the done queue is empty then NULL is returned.
1557  * Otherwise the address of a done aio_result_t is returned.
1558  */
1559 aio_result_t *
1560 _aio_req_done(void)
1561 {
1562 	aio_req_t *reqp;
1563 	aio_result_t *resultp;
1564 
1565 	ASSERT(MUTEX_HELD(&__aio_mutex));
1566 
1567 	if ((reqp = _aio_done_tail) != NULL) {
1568 		if ((_aio_done_tail = reqp->req_next) == NULL)
1569 			_aio_done_head = NULL;
1570 		ASSERT(_aio_donecnt > 0);
1571 		_aio_donecnt--;
1572 		(void) _aio_hash_del(reqp->req_resultp);
1573 		resultp = reqp->req_resultp;
1574 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1575 		_aio_req_free(reqp);
1576 		return (resultp);
1577 	}
1578 	/* is queue empty? */
1579 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1580 		return ((aio_result_t *)-1);
1581 	}
1582 	return (NULL);
1583 }
1584 
1585 /*
1586  * Set the return and errno values for the application's use.
1587  *
1588  * For the Posix interfaces, we must set the return value first followed
1589  * by the errno value because the Posix interfaces allow for a change
1590  * in the errno value from EINPROGRESS to something else to signal
1591  * the completion of the asynchronous request.
1592  *
1593  * The opposite is true for the Solaris interfaces.  These allow for
1594  * a change in the return value from AIO_INPROGRESS to something else
1595  * to signal the completion of the asynchronous request.
1596  */
1597 void
1598 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1599 {
1600 	aio_result_t *resultp = reqp->req_resultp;
1601 
1602 	if (POSIX_AIO(reqp)) {
1603 		resultp->aio_return = retval;
1604 		membar_producer();
1605 		resultp->aio_errno = error;
1606 	} else {
1607 		resultp->aio_errno = error;
1608 		membar_producer();
1609 		resultp->aio_return = retval;
1610 	}
1611 }
1612 
1613 /*
1614  * Add an AIO request onto the next work queue.
1615  * A circular list of workers is used to choose the next worker.
1616  */
1617 void
1618 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1619 {
1620 	ulwp_t *self = curthread;
1621 	aio_worker_t *aiowp;
1622 	aio_worker_t *first;
1623 	int load_bal_flg = 1;
1624 	int found;
1625 
1626 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1627 	reqp->req_next = NULL;
1628 	/*
1629 	 * Try to acquire the next worker's work queue.  If it is locked,
1630 	 * then search the list of workers until a queue is found unlocked,
1631 	 * or until the list is completely traversed at which point another
1632 	 * worker will be created.
1633 	 */
1634 	sigoff(self);		/* defer SIGIO */
1635 	sig_mutex_lock(&__aio_mutex);
1636 	first = aiowp = *nextworker;
1637 	if (mode != AIONOTIFY)
1638 		_aio_outstand_cnt++;
1639 	sig_mutex_unlock(&__aio_mutex);
1640 
1641 	switch (mode) {
1642 	case AIOREAD:
1643 	case AIOWRITE:
1644 	case AIOAREAD:
1645 	case AIOAWRITE:
1646 #if !defined(_LP64)
1647 	case AIOAREAD64:
1648 	case AIOAWRITE64:
1649 #endif
1650 		/* try to find an idle worker */
1651 		found = 0;
1652 		do {
1653 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1654 				if (aiowp->work_idleflg) {
1655 					found = 1;
1656 					break;
1657 				}
1658 				sig_mutex_unlock(&aiowp->work_qlock1);
1659 			}
1660 		} while ((aiowp = aiowp->work_forw) != first);
1661 
1662 		if (found) {
1663 			aiowp->work_minload1++;
1664 			break;
1665 		}
1666 
1667 		/* try to acquire some worker's queue lock */
1668 		do {
1669 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1670 				found = 1;
1671 				break;
1672 			}
1673 		} while ((aiowp = aiowp->work_forw) != first);
1674 
1675 		/*
1676 		 * Create more workers when the workers appear overloaded.
1677 		 * Either all the workers are busy draining their queues
1678 		 * or no worker's queue lock could be acquired.
1679 		 */
1680 		if (!found) {
1681 			if (_aio_worker_cnt < _max_workers) {
1682 				if (_aio_create_worker(reqp, mode))
1683 					aio_panic("_aio_req_add: add worker");
1684 				sigon(self);	/* reenable SIGIO */
1685 				return;
1686 			}
1687 
1688 			/*
1689 			 * No worker available and we have created
1690 			 * _max_workers, keep going through the
1691 			 * list slowly until we get a lock
1692 			 */
1693 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1694 				/*
1695 				 * give someone else a chance
1696 				 */
1697 				_aio_delay(1);
1698 				aiowp = aiowp->work_forw;
1699 			}
1700 		}
1701 
1702 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1703 		if (_aio_worker_cnt < _max_workers &&
1704 		    aiowp->work_minload1 >= _minworkload) {
1705 			sig_mutex_unlock(&aiowp->work_qlock1);
1706 			sig_mutex_lock(&__aio_mutex);
1707 			*nextworker = aiowp->work_forw;
1708 			sig_mutex_unlock(&__aio_mutex);
1709 			if (_aio_create_worker(reqp, mode))
1710 				aio_panic("aio_req_add: add worker");
1711 			sigon(self);	/* reenable SIGIO */
1712 			return;
1713 		}
1714 		aiowp->work_minload1++;
1715 		break;
1716 	case AIOFSYNC:
1717 	case AIONOTIFY:
1718 		load_bal_flg = 0;
1719 		sig_mutex_lock(&aiowp->work_qlock1);
1720 		break;
1721 	default:
1722 		aio_panic("_aio_req_add: invalid mode");
1723 		break;
1724 	}
1725 	/*
1726 	 * Put request onto worker's work queue.
1727 	 */
1728 	if (aiowp->work_tail1 == NULL) {
1729 		ASSERT(aiowp->work_count1 == 0);
1730 		aiowp->work_tail1 = reqp;
1731 		aiowp->work_next1 = reqp;
1732 	} else {
1733 		aiowp->work_head1->req_next = reqp;
1734 		if (aiowp->work_next1 == NULL)
1735 			aiowp->work_next1 = reqp;
1736 	}
1737 	reqp->req_state = AIO_REQ_QUEUED;
1738 	reqp->req_worker = aiowp;
1739 	aiowp->work_head1 = reqp;
1740 	/*
1741 	 * Awaken worker if it is not currently active.
1742 	 */
1743 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1744 		aiowp->work_idleflg = 0;
1745 		(void) cond_signal(&aiowp->work_idle_cv);
1746 	}
1747 	sig_mutex_unlock(&aiowp->work_qlock1);
1748 
1749 	if (load_bal_flg) {
1750 		sig_mutex_lock(&__aio_mutex);
1751 		*nextworker = aiowp->work_forw;
1752 		sig_mutex_unlock(&__aio_mutex);
1753 	}
1754 	sigon(self);	/* reenable SIGIO */
1755 }
1756 
1757 /*
1758  * Get an AIO request for a specified worker.
1759  * If the work queue is empty, return NULL.
1760  */
1761 aio_req_t *
1762 _aio_req_get(aio_worker_t *aiowp)
1763 {
1764 	aio_req_t *reqp;
1765 
1766 	sig_mutex_lock(&aiowp->work_qlock1);
1767 	if ((reqp = aiowp->work_next1) != NULL) {
1768 		/*
1769 		 * Remove a POSIX request from the queue; the
1770 		 * request queue is a singularly linked list
1771 		 * with a previous pointer.  The request is
1772 		 * removed by updating the previous pointer.
1773 		 *
1774 		 * Non-posix requests are left on the queue
1775 		 * to eventually be placed on the done queue.
1776 		 */
1777 
1778 		if (POSIX_AIO(reqp)) {
1779 			if (aiowp->work_prev1 == NULL) {
1780 				aiowp->work_tail1 = reqp->req_next;
1781 				if (aiowp->work_tail1 == NULL)
1782 					aiowp->work_head1 = NULL;
1783 			} else {
1784 				aiowp->work_prev1->req_next = reqp->req_next;
1785 				if (aiowp->work_head1 == reqp)
1786 					aiowp->work_head1 = reqp->req_next;
1787 			}
1788 
1789 		} else {
1790 			aiowp->work_prev1 = reqp;
1791 			ASSERT(aiowp->work_done1 >= 0);
1792 			aiowp->work_done1++;
1793 		}
1794 		ASSERT(reqp != reqp->req_next);
1795 		aiowp->work_next1 = reqp->req_next;
1796 		ASSERT(aiowp->work_count1 >= 1);
1797 		aiowp->work_count1--;
1798 		switch (reqp->req_op) {
1799 		case AIOREAD:
1800 		case AIOWRITE:
1801 		case AIOAREAD:
1802 		case AIOAWRITE:
1803 #if !defined(_LP64)
1804 		case AIOAREAD64:
1805 		case AIOAWRITE64:
1806 #endif
1807 			ASSERT(aiowp->work_minload1 > 0);
1808 			aiowp->work_minload1--;
1809 			break;
1810 		}
1811 		reqp->req_state = AIO_REQ_INPROGRESS;
1812 	}
1813 	aiowp->work_req = reqp;
1814 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1815 	sig_mutex_unlock(&aiowp->work_qlock1);
1816 	return (reqp);
1817 }
1818 
1819 static void
1820 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1821 {
1822 	aio_req_t **last;
1823 	aio_req_t *lastrp;
1824 	aio_req_t *next;
1825 
1826 	ASSERT(aiowp != NULL);
1827 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1828 	if (POSIX_AIO(reqp)) {
1829 		if (ostate != AIO_REQ_QUEUED)
1830 			return;
1831 	}
1832 	last = &aiowp->work_tail1;
1833 	lastrp = aiowp->work_tail1;
1834 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1835 	while ((next = *last) != NULL) {
1836 		if (next == reqp) {
1837 			*last = next->req_next;
1838 			if (aiowp->work_next1 == next)
1839 				aiowp->work_next1 = next->req_next;
1840 
1841 			if ((next->req_next != NULL) ||
1842 			    (aiowp->work_done1 == 0)) {
1843 				if (aiowp->work_head1 == next)
1844 					aiowp->work_head1 = next->req_next;
1845 				if (aiowp->work_prev1 == next)
1846 					aiowp->work_prev1 = next->req_next;
1847 			} else {
1848 				if (aiowp->work_head1 == next)
1849 					aiowp->work_head1 = lastrp;
1850 				if (aiowp->work_prev1 == next)
1851 					aiowp->work_prev1 = lastrp;
1852 			}
1853 
1854 			if (ostate == AIO_REQ_QUEUED) {
1855 				ASSERT(aiowp->work_count1 >= 1);
1856 				aiowp->work_count1--;
1857 				ASSERT(aiowp->work_minload1 >= 1);
1858 				aiowp->work_minload1--;
1859 			} else {
1860 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1861 				    !POSIX_AIO(reqp));
1862 				aiowp->work_done1--;
1863 			}
1864 			return;
1865 		}
1866 		last = &next->req_next;
1867 		lastrp = next;
1868 	}
1869 	/* NOTREACHED */
1870 }
1871 
1872 static void
1873 _aio_enq_doneq(aio_req_t *reqp)
1874 {
1875 	if (_aio_doneq == NULL) {
1876 		_aio_doneq = reqp;
1877 		reqp->req_next = reqp->req_prev = reqp;
1878 	} else {
1879 		reqp->req_next = _aio_doneq;
1880 		reqp->req_prev = _aio_doneq->req_prev;
1881 		_aio_doneq->req_prev->req_next = reqp;
1882 		_aio_doneq->req_prev = reqp;
1883 	}
1884 	reqp->req_state = AIO_REQ_DONEQ;
1885 	_aio_doneq_cnt++;
1886 }
1887 
1888 /*
1889  * caller owns the _aio_mutex
1890  */
1891 aio_req_t *
1892 _aio_req_remove(aio_req_t *reqp)
1893 {
1894 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1895 		return (NULL);
1896 
1897 	if (reqp) {
1898 		/* request in done queue */
1899 		if (_aio_doneq == reqp)
1900 			_aio_doneq = reqp->req_next;
1901 		if (_aio_doneq == reqp) {
1902 			/* only one request on queue */
1903 			_aio_doneq = NULL;
1904 		} else {
1905 			aio_req_t *tmp = reqp->req_next;
1906 			reqp->req_prev->req_next = tmp;
1907 			tmp->req_prev = reqp->req_prev;
1908 		}
1909 	} else if ((reqp = _aio_doneq) != NULL) {
1910 		if (reqp == reqp->req_next) {
1911 			/* only one request on queue */
1912 			_aio_doneq = NULL;
1913 		} else {
1914 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1915 			_aio_doneq->req_prev = reqp->req_prev;
1916 		}
1917 	}
1918 	if (reqp) {
1919 		_aio_doneq_cnt--;
1920 		reqp->req_next = reqp->req_prev = reqp;
1921 		reqp->req_state = AIO_REQ_DONE;
1922 	}
1923 	return (reqp);
1924 }
1925 
1926 /*
1927  * An AIO request is identified by an aio_result_t pointer.  The library
1928  * maps this aio_result_t pointer to its internal representation using a
1929  * hash table.  This function adds an aio_result_t pointer to the hash table.
1930  */
1931 static int
1932 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1933 {
1934 	aio_hash_t *hashp;
1935 	aio_req_t **prev;
1936 	aio_req_t *next;
1937 
1938 	hashp = _aio_hash + AIOHASH(resultp);
1939 	lmutex_lock(&hashp->hash_lock);
1940 	prev = &hashp->hash_ptr;
1941 	while ((next = *prev) != NULL) {
1942 		if (resultp == next->req_resultp) {
1943 			lmutex_unlock(&hashp->hash_lock);
1944 			return (-1);
1945 		}
1946 		prev = &next->req_link;
1947 	}
1948 	*prev = reqp;
1949 	ASSERT(reqp->req_link == NULL);
1950 	lmutex_unlock(&hashp->hash_lock);
1951 	return (0);
1952 }
1953 
1954 /*
1955  * Remove an entry from the hash table.
1956  */
1957 aio_req_t *
1958 _aio_hash_del(aio_result_t *resultp)
1959 {
1960 	aio_hash_t *hashp;
1961 	aio_req_t **prev;
1962 	aio_req_t *next = NULL;
1963 
1964 	if (_aio_hash != NULL) {
1965 		hashp = _aio_hash + AIOHASH(resultp);
1966 		lmutex_lock(&hashp->hash_lock);
1967 		prev = &hashp->hash_ptr;
1968 		while ((next = *prev) != NULL) {
1969 			if (resultp == next->req_resultp) {
1970 				*prev = next->req_link;
1971 				next->req_link = NULL;
1972 				break;
1973 			}
1974 			prev = &next->req_link;
1975 		}
1976 		lmutex_unlock(&hashp->hash_lock);
1977 	}
1978 	return (next);
1979 }
1980 
1981 /*
1982  *  find an entry in the hash table
1983  */
1984 aio_req_t *
1985 _aio_hash_find(aio_result_t *resultp)
1986 {
1987 	aio_hash_t *hashp;
1988 	aio_req_t **prev;
1989 	aio_req_t *next = NULL;
1990 
1991 	if (_aio_hash != NULL) {
1992 		hashp = _aio_hash + AIOHASH(resultp);
1993 		lmutex_lock(&hashp->hash_lock);
1994 		prev = &hashp->hash_ptr;
1995 		while ((next = *prev) != NULL) {
1996 			if (resultp == next->req_resultp)
1997 				break;
1998 			prev = &next->req_link;
1999 		}
2000 		lmutex_unlock(&hashp->hash_lock);
2001 	}
2002 	return (next);
2003 }
2004 
2005 /*
2006  * AIO interface for POSIX
2007  */
2008 int
2009 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2010     int mode, int flg)
2011 {
2012 	aio_req_t *reqp;
2013 	aio_args_t *ap;
2014 	int kerr;
2015 
2016 	if (aiocbp == NULL) {
2017 		errno = EINVAL;
2018 		return (-1);
2019 	}
2020 
2021 	/* initialize kaio */
2022 	if (!_kaio_ok)
2023 		_kaio_init();
2024 
2025 	aiocbp->aio_state = NOCHECK;
2026 
2027 	/*
2028 	 * If we have been called because a list I/O
2029 	 * kaio() failed, we dont want to repeat the
2030 	 * system call
2031 	 */
2032 
2033 	if (flg & AIO_KAIO) {
2034 		/*
2035 		 * Try kernel aio first.
2036 		 * If errno is ENOTSUP/EBADFD,
2037 		 * fall back to the thread implementation.
2038 		 */
2039 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2040 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2041 			aiocbp->aio_state = CHECK;
2042 			kerr = (int)_kaio(mode, aiocbp);
2043 			if (kerr == 0)
2044 				return (0);
2045 			if (errno != ENOTSUP && errno != EBADFD) {
2046 				aiocbp->aio_resultp.aio_errno = errno;
2047 				aiocbp->aio_resultp.aio_return = -1;
2048 				aiocbp->aio_state = NOCHECK;
2049 				return (-1);
2050 			}
2051 			if (errno == EBADFD)
2052 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2053 		}
2054 	}
2055 
2056 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2057 	aiocbp->aio_state = USERAIO;
2058 
2059 	if (!__uaio_ok && __uaio_init() == -1)
2060 		return (-1);
2061 
2062 	if ((reqp = _aio_req_alloc()) == NULL) {
2063 		errno = EAGAIN;
2064 		return (-1);
2065 	}
2066 
2067 	/*
2068 	 * If an LIO request, add the list head to the aio request
2069 	 */
2070 	reqp->req_head = lio_head;
2071 	reqp->req_type = AIO_POSIX_REQ;
2072 	reqp->req_op = mode;
2073 	reqp->req_largefile = 0;
2074 
2075 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2076 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2077 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2078 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2079 		reqp->req_sigevent.sigev_signo =
2080 		    aiocbp->aio_sigevent.sigev_signo;
2081 		reqp->req_sigevent.sigev_value.sival_ptr =
2082 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2083 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2084 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2085 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2086 		/*
2087 		 * Reuse the sigevent structure to contain the port number
2088 		 * and the user value.  Same for SIGEV_THREAD, below.
2089 		 */
2090 		reqp->req_sigevent.sigev_signo =
2091 		    pn->portnfy_port;
2092 		reqp->req_sigevent.sigev_value.sival_ptr =
2093 		    pn->portnfy_user;
2094 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2095 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2096 		/*
2097 		 * The sigevent structure contains the port number
2098 		 * and the user value.  Same for SIGEV_PORT, above.
2099 		 */
2100 		reqp->req_sigevent.sigev_signo =
2101 		    aiocbp->aio_sigevent.sigev_signo;
2102 		reqp->req_sigevent.sigev_value.sival_ptr =
2103 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2104 	}
2105 
2106 	reqp->req_resultp = &aiocbp->aio_resultp;
2107 	reqp->req_aiocbp = aiocbp;
2108 	ap = &reqp->req_args;
2109 	ap->fd = aiocbp->aio_fildes;
2110 	ap->buf = (caddr_t)aiocbp->aio_buf;
2111 	ap->bufsz = aiocbp->aio_nbytes;
2112 	ap->offset = aiocbp->aio_offset;
2113 
2114 	if ((flg & AIO_NO_DUPS) &&
2115 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2116 		aio_panic("_aio_rw(): request already in hash table");
2117 		_aio_req_free(reqp);
2118 		errno = EINVAL;
2119 		return (-1);
2120 	}
2121 	_aio_req_add(reqp, nextworker, mode);
2122 	return (0);
2123 }
2124 
2125 #if !defined(_LP64)
2126 /*
2127  * 64-bit AIO interface for POSIX
2128  */
2129 int
2130 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2131     int mode, int flg)
2132 {
2133 	aio_req_t *reqp;
2134 	aio_args_t *ap;
2135 	int kerr;
2136 
2137 	if (aiocbp == NULL) {
2138 		errno = EINVAL;
2139 		return (-1);
2140 	}
2141 
2142 	/* initialize kaio */
2143 	if (!_kaio_ok)
2144 		_kaio_init();
2145 
2146 	aiocbp->aio_state = NOCHECK;
2147 
2148 	/*
2149 	 * If we have been called because a list I/O
2150 	 * kaio() failed, we dont want to repeat the
2151 	 * system call
2152 	 */
2153 
2154 	if (flg & AIO_KAIO) {
2155 		/*
2156 		 * Try kernel aio first.
2157 		 * If errno is ENOTSUP/EBADFD,
2158 		 * fall back to the thread implementation.
2159 		 */
2160 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2161 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2162 			aiocbp->aio_state = CHECK;
2163 			kerr = (int)_kaio(mode, aiocbp);
2164 			if (kerr == 0)
2165 				return (0);
2166 			if (errno != ENOTSUP && errno != EBADFD) {
2167 				aiocbp->aio_resultp.aio_errno = errno;
2168 				aiocbp->aio_resultp.aio_return = -1;
2169 				aiocbp->aio_state = NOCHECK;
2170 				return (-1);
2171 			}
2172 			if (errno == EBADFD)
2173 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2174 		}
2175 	}
2176 
2177 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2178 	aiocbp->aio_state = USERAIO;
2179 
2180 	if (!__uaio_ok && __uaio_init() == -1)
2181 		return (-1);
2182 
2183 	if ((reqp = _aio_req_alloc()) == NULL) {
2184 		errno = EAGAIN;
2185 		return (-1);
2186 	}
2187 
2188 	/*
2189 	 * If an LIO request, add the list head to the aio request
2190 	 */
2191 	reqp->req_head = lio_head;
2192 	reqp->req_type = AIO_POSIX_REQ;
2193 	reqp->req_op = mode;
2194 	reqp->req_largefile = 1;
2195 
2196 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2197 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2198 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2199 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2200 		reqp->req_sigevent.sigev_signo =
2201 		    aiocbp->aio_sigevent.sigev_signo;
2202 		reqp->req_sigevent.sigev_value.sival_ptr =
2203 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2204 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2205 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2206 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2207 		reqp->req_sigevent.sigev_signo =
2208 		    pn->portnfy_port;
2209 		reqp->req_sigevent.sigev_value.sival_ptr =
2210 		    pn->portnfy_user;
2211 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2212 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2213 		reqp->req_sigevent.sigev_signo =
2214 		    aiocbp->aio_sigevent.sigev_signo;
2215 		reqp->req_sigevent.sigev_value.sival_ptr =
2216 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2217 	}
2218 
2219 	reqp->req_resultp = &aiocbp->aio_resultp;
2220 	reqp->req_aiocbp = aiocbp;
2221 	ap = &reqp->req_args;
2222 	ap->fd = aiocbp->aio_fildes;
2223 	ap->buf = (caddr_t)aiocbp->aio_buf;
2224 	ap->bufsz = aiocbp->aio_nbytes;
2225 	ap->offset = aiocbp->aio_offset;
2226 
2227 	if ((flg & AIO_NO_DUPS) &&
2228 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2229 		aio_panic("_aio_rw64(): request already in hash table");
2230 		_aio_req_free(reqp);
2231 		errno = EINVAL;
2232 		return (-1);
2233 	}
2234 	_aio_req_add(reqp, nextworker, mode);
2235 	return (0);
2236 }
2237 #endif	/* !defined(_LP64) */
2238