xref: /titanic_50/usr/src/lib/libc/port/aio/aio.c (revision a574db851cdc636fc3939b68e80d79fe7fbd57f2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "synonyms.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fdsync(int, int);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48 
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53 
54 /*
55  * switch for kernel async I/O
56  */
57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58 
59 /*
60  * Key for thread-specific data
61  */
62 pthread_key_t _aio_key;
63 
64 /*
65  * Array for determining whether or not a file supports kaio.
66  * Initialized in _kaio_init().
67  */
68 uint32_t *_kaio_supported = NULL;
69 
70 /*
71  *  workers for read/write requests
72  * (__aio_mutex lock protects circular linked list of workers)
73  */
74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76 int __rw_workerscnt;		/* number of read/write workers */
77 
78 /*
79  * worker for notification requests.
80  */
81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83 int __no_workerscnt;		/* number of write workers */
84 
85 aio_req_t *_aio_done_tail;		/* list of done requests */
86 aio_req_t *_aio_done_head;
87 
88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91 
92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94 
95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97 
98 aio_hash_t *_aio_hash;
99 
100 aio_req_t *_aio_doneq;			/* double linked done queue list */
101 
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110 
111 int _max_workers = 256;			/* max number of workers permitted */
112 int _min_workers = 4;			/* min number of workers */
113 int _minworkload = 2;			/* min number of request in q */
114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
115 int __uaio_ok = 0;			/* AIO has been enabled */
116 sigset_t _worker_set;			/* worker's signal mask */
117 
118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119 int _aio_flags = 0;			/* see asyncio.h defines for */
120 
121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122 
123 int hz;					/* clock ticks per second */
124 
125 static int
126 _kaio_supported_init(void)
127 {
128 	void *ptr;
129 	size_t size;
130 
131 	if (_kaio_supported != NULL)	/* already initialized */
132 		return (0);
133 
134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 	if (ptr == MAP_FAILED)
138 		return (-1);
139 	_kaio_supported = ptr;
140 	return (0);
141 }
142 
143 /*
144  * The aio subsystem is initialized when an AIO request is made.
145  * Constants are initialized like the max number of workers that
146  * the subsystem can create, and the minimum number of workers
147  * permitted before imposing some restrictions.  Also, some
148  * workers are created.
149  */
150 int
151 __uaio_init(void)
152 {
153 	int ret = -1;
154 	int i;
155 	int cancel_state;
156 
157 	lmutex_lock(&__aio_initlock);
158 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
159 	while (__aio_initbusy)
160 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
161 	(void) pthread_setcancelstate(cancel_state, NULL);
162 	if (__uaio_ok) {	/* already initialized */
163 		lmutex_unlock(&__aio_initlock);
164 		return (0);
165 	}
166 	__aio_initbusy = 1;
167 	lmutex_unlock(&__aio_initlock);
168 
169 	hz = (int)sysconf(_SC_CLK_TCK);
170 	__pid = getpid();
171 
172 	setup_cancelsig(SIGAIOCANCEL);
173 
174 	if (_kaio_supported_init() != 0)
175 		goto out;
176 
177 	/*
178 	 * Allocate and initialize the hash table.
179 	 * Do this only once, even if __uaio_init() is called twice.
180 	 */
181 	if (_aio_hash == NULL) {
182 		/* LINTED pointer cast */
183 		_aio_hash = (aio_hash_t *)mmap(NULL,
184 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
185 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
186 		if ((void *)_aio_hash == MAP_FAILED) {
187 			_aio_hash = NULL;
188 			goto out;
189 		}
190 		for (i = 0; i < HASHSZ; i++)
191 			(void) mutex_init(&_aio_hash[i].hash_lock,
192 			    USYNC_THREAD, NULL);
193 	}
194 
195 	/*
196 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
197 	 */
198 	(void) sigfillset(&_worker_set);
199 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
200 
201 	/*
202 	 * Create one worker to send asynchronous notifications.
203 	 * Do this only once, even if __uaio_init() is called twice.
204 	 */
205 	if (__no_workerscnt == 0 &&
206 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
207 		errno = EAGAIN;
208 		goto out;
209 	}
210 
211 	/*
212 	 * Create the minimum number of read/write workers.
213 	 * And later check whether atleast one worker is created;
214 	 * lwp_create() calls could fail because of segkp exhaustion.
215 	 */
216 	for (i = 0; i < _min_workers; i++)
217 		(void) _aio_create_worker(NULL, AIOREAD);
218 	if (__rw_workerscnt == 0) {
219 		errno = EAGAIN;
220 		goto out;
221 	}
222 
223 	ret = 0;
224 out:
225 	lmutex_lock(&__aio_initlock);
226 	if (ret == 0)
227 		__uaio_ok = 1;
228 	__aio_initbusy = 0;
229 	(void) cond_broadcast(&__aio_initcv);
230 	lmutex_unlock(&__aio_initlock);
231 	return (ret);
232 }
233 
234 /*
235  * Called from close() before actually performing the real _close().
236  */
237 void
238 _aio_close(int fd)
239 {
240 	if (fd < 0)	/* avoid cancelling everything */
241 		return;
242 	/*
243 	 * Cancel all outstanding aio requests for this file descriptor.
244 	 */
245 	if (__uaio_ok)
246 		(void) aiocancel_all(fd);
247 	/*
248 	 * If we have allocated the bit array, clear the bit for this file.
249 	 * The next open may re-use this file descriptor and the new file
250 	 * may have different kaio() behaviour.
251 	 */
252 	if (_kaio_supported != NULL)
253 		CLEAR_KAIO_SUPPORTED(fd);
254 }
255 
256 /*
257  * special kaio cleanup thread sits in a loop in the
258  * kernel waiting for pending kaio requests to complete.
259  */
260 void *
261 _kaio_cleanup_thread(void *arg)
262 {
263 	if (pthread_setspecific(_aio_key, arg) != 0)
264 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
265 	(void) _kaio(AIOSTART);
266 	return (arg);
267 }
268 
269 /*
270  * initialize kaio.
271  */
272 void
273 _kaio_init()
274 {
275 	int error;
276 	sigset_t oset;
277 	int cancel_state;
278 
279 	lmutex_lock(&__aio_initlock);
280 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
281 	while (__aio_initbusy)
282 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
283 	(void) pthread_setcancelstate(cancel_state, NULL);
284 	if (_kaio_ok) {		/* already initialized */
285 		lmutex_unlock(&__aio_initlock);
286 		return;
287 	}
288 	__aio_initbusy = 1;
289 	lmutex_unlock(&__aio_initlock);
290 
291 	if (_kaio_supported_init() != 0)
292 		error = ENOMEM;
293 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
294 		error = ENOMEM;
295 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
296 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
297 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
298 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
299 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
300 	}
301 	if (error && _kaiowp != NULL) {
302 		_aio_worker_free(_kaiowp);
303 		_kaiowp = NULL;
304 	}
305 
306 	lmutex_lock(&__aio_initlock);
307 	if (error)
308 		_kaio_ok = -1;
309 	else
310 		_kaio_ok = 1;
311 	__aio_initbusy = 0;
312 	(void) cond_broadcast(&__aio_initcv);
313 	lmutex_unlock(&__aio_initlock);
314 }
315 
316 int
317 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
318     aio_result_t *resultp)
319 {
320 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
321 }
322 
323 int
324 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
325     aio_result_t *resultp)
326 {
327 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
328 }
329 
330 #if !defined(_LP64)
331 int
332 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
333     aio_result_t *resultp)
334 {
335 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
336 }
337 
338 int
339 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
340     aio_result_t *resultp)
341 {
342 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
343 }
344 #endif	/* !defined(_LP64) */
345 
346 int
347 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
348     aio_result_t *resultp, int mode)
349 {
350 	aio_req_t *reqp;
351 	aio_args_t *ap;
352 	offset_t loffset;
353 	struct stat64 stat64;
354 	int error = 0;
355 	int kerr;
356 	int umode;
357 
358 	switch (whence) {
359 
360 	case SEEK_SET:
361 		loffset = offset;
362 		break;
363 	case SEEK_CUR:
364 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
365 			error = -1;
366 		else
367 			loffset += offset;
368 		break;
369 	case SEEK_END:
370 		if (fstat64(fd, &stat64) == -1)
371 			error = -1;
372 		else
373 			loffset = offset + stat64.st_size;
374 		break;
375 	default:
376 		errno = EINVAL;
377 		error = -1;
378 	}
379 
380 	if (error)
381 		return (error);
382 
383 	/* initialize kaio */
384 	if (!_kaio_ok)
385 		_kaio_init();
386 
387 	/*
388 	 * _aio_do_request() needs the original request code (mode) to be able
389 	 * to choose the appropiate 32/64 bit function.  All other functions
390 	 * only require the difference between READ and WRITE (umode).
391 	 */
392 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
393 		umode = mode - AIOAREAD64;
394 	else
395 		umode = mode;
396 
397 	/*
398 	 * Try kernel aio first.
399 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
400 	 */
401 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
402 		resultp->aio_errno = 0;
403 		sig_mutex_lock(&__aio_mutex);
404 		_kaio_outstand_cnt++;
405 		sig_mutex_unlock(&__aio_mutex);
406 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
407 		    (umode | AIO_POLL_BIT) : umode),
408 		    fd, buf, bufsz, loffset, resultp);
409 		if (kerr == 0) {
410 			return (0);
411 		}
412 		sig_mutex_lock(&__aio_mutex);
413 		_kaio_outstand_cnt--;
414 		sig_mutex_unlock(&__aio_mutex);
415 		if (errno != ENOTSUP && errno != EBADFD)
416 			return (-1);
417 		if (errno == EBADFD)
418 			SET_KAIO_NOT_SUPPORTED(fd);
419 	}
420 
421 	if (!__uaio_ok && __uaio_init() == -1)
422 		return (-1);
423 
424 	if ((reqp = _aio_req_alloc()) == NULL) {
425 		errno = EAGAIN;
426 		return (-1);
427 	}
428 
429 	/*
430 	 * _aio_do_request() checks reqp->req_op to differentiate
431 	 * between 32 and 64 bit access.
432 	 */
433 	reqp->req_op = mode;
434 	reqp->req_resultp = resultp;
435 	ap = &reqp->req_args;
436 	ap->fd = fd;
437 	ap->buf = buf;
438 	ap->bufsz = bufsz;
439 	ap->offset = loffset;
440 
441 	if (_aio_hash_insert(resultp, reqp) != 0) {
442 		_aio_req_free(reqp);
443 		errno = EINVAL;
444 		return (-1);
445 	}
446 	/*
447 	 * _aio_req_add() only needs the difference between READ and
448 	 * WRITE to choose the right worker queue.
449 	 */
450 	_aio_req_add(reqp, &__nextworker_rw, umode);
451 	return (0);
452 }
453 
454 int
455 aiocancel(aio_result_t *resultp)
456 {
457 	aio_req_t *reqp;
458 	aio_worker_t *aiowp;
459 	int ret;
460 	int done = 0;
461 	int canceled = 0;
462 
463 	if (!__uaio_ok) {
464 		errno = EINVAL;
465 		return (-1);
466 	}
467 
468 	sig_mutex_lock(&__aio_mutex);
469 	reqp = _aio_hash_find(resultp);
470 	if (reqp == NULL) {
471 		if (_aio_outstand_cnt == _aio_req_done_cnt)
472 			errno = EINVAL;
473 		else
474 			errno = EACCES;
475 		ret = -1;
476 	} else {
477 		aiowp = reqp->req_worker;
478 		sig_mutex_lock(&aiowp->work_qlock1);
479 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
480 		sig_mutex_unlock(&aiowp->work_qlock1);
481 
482 		if (canceled) {
483 			ret = 0;
484 		} else {
485 			if (_aio_outstand_cnt == 0 ||
486 			    _aio_outstand_cnt == _aio_req_done_cnt)
487 				errno = EINVAL;
488 			else
489 				errno = EACCES;
490 			ret = -1;
491 		}
492 	}
493 	sig_mutex_unlock(&__aio_mutex);
494 	return (ret);
495 }
496 
497 /* ARGSUSED */
498 static void
499 _aiowait_cleanup(void *arg)
500 {
501 	sig_mutex_lock(&__aio_mutex);
502 	_aiowait_flag--;
503 	sig_mutex_unlock(&__aio_mutex);
504 }
505 
506 /*
507  * This must be asynch safe and cancel safe
508  */
509 aio_result_t *
510 aiowait(struct timeval *uwait)
511 {
512 	aio_result_t *uresultp;
513 	aio_result_t *kresultp;
514 	aio_result_t *resultp;
515 	int dontblock;
516 	int timedwait = 0;
517 	int kaio_errno = 0;
518 	struct timeval twait;
519 	struct timeval *wait = NULL;
520 	hrtime_t hrtend;
521 	hrtime_t hres;
522 
523 	if (uwait) {
524 		/*
525 		 * Check for a valid specified wait time.
526 		 * If it is invalid, fail the call right away.
527 		 */
528 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
529 		    uwait->tv_usec >= MICROSEC) {
530 			errno = EINVAL;
531 			return ((aio_result_t *)-1);
532 		}
533 
534 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
535 			hrtend = gethrtime() +
536 			    (hrtime_t)uwait->tv_sec * NANOSEC +
537 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
538 			twait = *uwait;
539 			wait = &twait;
540 			timedwait++;
541 		} else {
542 			/* polling */
543 			sig_mutex_lock(&__aio_mutex);
544 			if (_kaio_outstand_cnt == 0) {
545 				kresultp = (aio_result_t *)-1;
546 			} else {
547 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
548 				    (struct timeval *)-1, 1);
549 				if (kresultp != (aio_result_t *)-1 &&
550 				    kresultp != NULL &&
551 				    kresultp != (aio_result_t *)1) {
552 					_kaio_outstand_cnt--;
553 					sig_mutex_unlock(&__aio_mutex);
554 					return (kresultp);
555 				}
556 			}
557 			uresultp = _aio_req_done();
558 			sig_mutex_unlock(&__aio_mutex);
559 			if (uresultp != NULL &&
560 			    uresultp != (aio_result_t *)-1) {
561 				return (uresultp);
562 			}
563 			if (uresultp == (aio_result_t *)-1 &&
564 			    kresultp == (aio_result_t *)-1) {
565 				errno = EINVAL;
566 				return ((aio_result_t *)-1);
567 			} else {
568 				return (NULL);
569 			}
570 		}
571 	}
572 
573 	for (;;) {
574 		sig_mutex_lock(&__aio_mutex);
575 		uresultp = _aio_req_done();
576 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
577 			sig_mutex_unlock(&__aio_mutex);
578 			resultp = uresultp;
579 			break;
580 		}
581 		_aiowait_flag++;
582 		dontblock = (uresultp == (aio_result_t *)-1);
583 		if (dontblock && _kaio_outstand_cnt == 0) {
584 			kresultp = (aio_result_t *)-1;
585 			kaio_errno = EINVAL;
586 		} else {
587 			sig_mutex_unlock(&__aio_mutex);
588 			pthread_cleanup_push(_aiowait_cleanup, NULL);
589 			_cancel_prologue();
590 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
591 			    wait, dontblock);
592 			_cancel_epilogue();
593 			pthread_cleanup_pop(0);
594 			sig_mutex_lock(&__aio_mutex);
595 			kaio_errno = errno;
596 		}
597 		_aiowait_flag--;
598 		sig_mutex_unlock(&__aio_mutex);
599 		if (kresultp == (aio_result_t *)1) {
600 			/* aiowait() awakened by an aionotify() */
601 			continue;
602 		} else if (kresultp != NULL &&
603 		    kresultp != (aio_result_t *)-1) {
604 			resultp = kresultp;
605 			sig_mutex_lock(&__aio_mutex);
606 			_kaio_outstand_cnt--;
607 			sig_mutex_unlock(&__aio_mutex);
608 			break;
609 		} else if (kresultp == (aio_result_t *)-1 &&
610 		    kaio_errno == EINVAL &&
611 		    uresultp == (aio_result_t *)-1) {
612 			errno = kaio_errno;
613 			resultp = (aio_result_t *)-1;
614 			break;
615 		} else if (kresultp == (aio_result_t *)-1 &&
616 		    kaio_errno == EINTR) {
617 			errno = kaio_errno;
618 			resultp = (aio_result_t *)-1;
619 			break;
620 		} else if (timedwait) {
621 			hres = hrtend - gethrtime();
622 			if (hres <= 0) {
623 				/* time is up; return */
624 				resultp = NULL;
625 				break;
626 			} else {
627 				/*
628 				 * Some time left.  Round up the remaining time
629 				 * in nanoseconds to microsec.  Retry the call.
630 				 */
631 				hres += (NANOSEC / MICROSEC) - 1;
632 				wait->tv_sec = hres / NANOSEC;
633 				wait->tv_usec =
634 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
635 			}
636 		} else {
637 			ASSERT(kresultp == NULL && uresultp == NULL);
638 			resultp = NULL;
639 			continue;
640 		}
641 	}
642 	return (resultp);
643 }
644 
645 /*
646  * _aio_get_timedelta calculates the remaining time and stores the result
647  * into timespec_t *wait.
648  */
649 
650 int
651 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
652 {
653 	int	ret = 0;
654 	struct	timeval cur;
655 	timespec_t curtime;
656 
657 	(void) gettimeofday(&cur, NULL);
658 	curtime.tv_sec = cur.tv_sec;
659 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
660 
661 	if (end->tv_sec >= curtime.tv_sec) {
662 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
663 		if (end->tv_nsec >= curtime.tv_nsec) {
664 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
665 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
666 				ret = -1;	/* timer expired */
667 		} else {
668 			if (end->tv_sec > curtime.tv_sec) {
669 				wait->tv_sec -= 1;
670 				wait->tv_nsec = NANOSEC -
671 				    (curtime.tv_nsec - end->tv_nsec);
672 			} else {
673 				ret = -1;	/* timer expired */
674 			}
675 		}
676 	} else {
677 		ret = -1;
678 	}
679 	return (ret);
680 }
681 
682 /*
683  * If closing by file descriptor: we will simply cancel all the outstanding
684  * aio`s and return.  Those aio's in question will have either noticed the
685  * cancellation notice before, during, or after initiating io.
686  */
687 int
688 aiocancel_all(int fd)
689 {
690 	aio_req_t *reqp;
691 	aio_req_t **reqpp;
692 	aio_worker_t *first;
693 	aio_worker_t *next;
694 	int canceled = 0;
695 	int done = 0;
696 	int cancelall = 0;
697 
698 	sig_mutex_lock(&__aio_mutex);
699 
700 	if (_aio_outstand_cnt == 0) {
701 		sig_mutex_unlock(&__aio_mutex);
702 		return (AIO_ALLDONE);
703 	}
704 
705 	/*
706 	 * Cancel requests from the read/write workers' queues.
707 	 */
708 	first = __nextworker_rw;
709 	next = first;
710 	do {
711 		_aio_cancel_work(next, fd, &canceled, &done);
712 	} while ((next = next->work_forw) != first);
713 
714 	/*
715 	 * finally, check if there are requests on the done queue that
716 	 * should be canceled.
717 	 */
718 	if (fd < 0)
719 		cancelall = 1;
720 	reqpp = &_aio_done_tail;
721 	while ((reqp = *reqpp) != NULL) {
722 		if (cancelall || reqp->req_args.fd == fd) {
723 			*reqpp = reqp->req_next;
724 			_aio_donecnt--;
725 			(void) _aio_hash_del(reqp->req_resultp);
726 			_aio_req_free(reqp);
727 		} else
728 			reqpp = &reqp->req_next;
729 	}
730 	if (cancelall) {
731 		ASSERT(_aio_donecnt == 0);
732 		_aio_done_head = NULL;
733 	}
734 	sig_mutex_unlock(&__aio_mutex);
735 
736 	if (canceled && done == 0)
737 		return (AIO_CANCELED);
738 	else if (done && canceled == 0)
739 		return (AIO_ALLDONE);
740 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
741 		return ((int)_kaio(AIOCANCEL, fd, NULL));
742 	return (AIO_NOTCANCELED);
743 }
744 
745 /*
746  * Cancel requests from a given work queue.  If the file descriptor
747  * parameter, fd, is non-negative, then only cancel those requests
748  * in this queue that are to this file descriptor.  If the fd
749  * parameter is -1, then cancel all requests.
750  */
751 static void
752 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
753 {
754 	aio_req_t *reqp;
755 
756 	sig_mutex_lock(&aiowp->work_qlock1);
757 	/*
758 	 * cancel queued requests first.
759 	 */
760 	reqp = aiowp->work_tail1;
761 	while (reqp != NULL) {
762 		if (fd < 0 || reqp->req_args.fd == fd) {
763 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
764 				/*
765 				 * Callers locks were dropped.
766 				 * reqp is invalid; start traversing
767 				 * the list from the beginning again.
768 				 */
769 				reqp = aiowp->work_tail1;
770 				continue;
771 			}
772 		}
773 		reqp = reqp->req_next;
774 	}
775 	/*
776 	 * Since the queued requests have been canceled, there can
777 	 * only be one inprogress request that should be canceled.
778 	 */
779 	if ((reqp = aiowp->work_req) != NULL &&
780 	    (fd < 0 || reqp->req_args.fd == fd))
781 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
782 	sig_mutex_unlock(&aiowp->work_qlock1);
783 }
784 
785 /*
786  * Cancel a request.  Return 1 if the callers locks were temporarily
787  * dropped, otherwise return 0.
788  */
789 int
790 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
791 {
792 	int ostate = reqp->req_state;
793 
794 	ASSERT(MUTEX_HELD(&__aio_mutex));
795 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
796 	if (ostate == AIO_REQ_CANCELED)
797 		return (0);
798 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
799 		(*done)++;
800 		return (0);
801 	}
802 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
803 		ASSERT(POSIX_AIO(reqp));
804 		/* Cancel the queued aio_fsync() request */
805 		if (!reqp->req_head->lio_canned) {
806 			reqp->req_head->lio_canned = 1;
807 			_aio_outstand_cnt--;
808 			(*canceled)++;
809 		}
810 		return (0);
811 	}
812 	reqp->req_state = AIO_REQ_CANCELED;
813 	_aio_req_del(aiowp, reqp, ostate);
814 	(void) _aio_hash_del(reqp->req_resultp);
815 	(*canceled)++;
816 	if (reqp == aiowp->work_req) {
817 		ASSERT(ostate == AIO_REQ_INPROGRESS);
818 		/*
819 		 * Set the result values now, before _aiodone() is called.
820 		 * We do this because the application can expect aio_return
821 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
822 		 * immediately after a successful return from aiocancel()
823 		 * or aio_cancel().
824 		 */
825 		_aio_set_result(reqp, -1, ECANCELED);
826 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
827 		return (0);
828 	}
829 	if (!POSIX_AIO(reqp)) {
830 		_aio_outstand_cnt--;
831 		_aio_set_result(reqp, -1, ECANCELED);
832 		return (0);
833 	}
834 	sig_mutex_unlock(&aiowp->work_qlock1);
835 	sig_mutex_unlock(&__aio_mutex);
836 	_aiodone(reqp, -1, ECANCELED);
837 	sig_mutex_lock(&__aio_mutex);
838 	sig_mutex_lock(&aiowp->work_qlock1);
839 	return (1);
840 }
841 
842 int
843 _aio_create_worker(aio_req_t *reqp, int mode)
844 {
845 	aio_worker_t *aiowp, **workers, **nextworker;
846 	int *aio_workerscnt;
847 	void *(*func)(void *);
848 	sigset_t oset;
849 	int error;
850 
851 	/*
852 	 * Put the new worker thread in the right queue.
853 	 */
854 	switch (mode) {
855 	case AIOREAD:
856 	case AIOWRITE:
857 	case AIOAREAD:
858 	case AIOAWRITE:
859 #if !defined(_LP64)
860 	case AIOAREAD64:
861 	case AIOAWRITE64:
862 #endif
863 		workers = &__workers_rw;
864 		nextworker = &__nextworker_rw;
865 		aio_workerscnt = &__rw_workerscnt;
866 		func = _aio_do_request;
867 		break;
868 	case AIONOTIFY:
869 		workers = &__workers_no;
870 		nextworker = &__nextworker_no;
871 		func = _aio_do_notify;
872 		aio_workerscnt = &__no_workerscnt;
873 		break;
874 	default:
875 		aio_panic("_aio_create_worker: invalid mode");
876 		break;
877 	}
878 
879 	if ((aiowp = _aio_worker_alloc()) == NULL)
880 		return (-1);
881 
882 	if (reqp) {
883 		reqp->req_state = AIO_REQ_QUEUED;
884 		reqp->req_worker = aiowp;
885 		aiowp->work_head1 = reqp;
886 		aiowp->work_tail1 = reqp;
887 		aiowp->work_next1 = reqp;
888 		aiowp->work_count1 = 1;
889 		aiowp->work_minload1 = 1;
890 	}
891 
892 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
893 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
894 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
895 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
896 	if (error) {
897 		if (reqp) {
898 			reqp->req_state = 0;
899 			reqp->req_worker = NULL;
900 		}
901 		_aio_worker_free(aiowp);
902 		return (-1);
903 	}
904 
905 	lmutex_lock(&__aio_mutex);
906 	(*aio_workerscnt)++;
907 	if (*workers == NULL) {
908 		aiowp->work_forw = aiowp;
909 		aiowp->work_backw = aiowp;
910 		*nextworker = aiowp;
911 		*workers = aiowp;
912 	} else {
913 		aiowp->work_backw = (*workers)->work_backw;
914 		aiowp->work_forw = (*workers);
915 		(*workers)->work_backw->work_forw = aiowp;
916 		(*workers)->work_backw = aiowp;
917 	}
918 	_aio_worker_cnt++;
919 	lmutex_unlock(&__aio_mutex);
920 
921 	(void) thr_continue(aiowp->work_tid);
922 
923 	return (0);
924 }
925 
926 /*
927  * This is the worker's main routine.
928  * The task of this function is to execute all queued requests;
929  * once the last pending request is executed this function will block
930  * in _aio_idle().  A new incoming request must wakeup this thread to
931  * restart the work.
932  * Every worker has an own work queue.  The queue lock is required
933  * to synchronize the addition of new requests for this worker or
934  * cancellation of pending/running requests.
935  *
936  * Cancellation scenarios:
937  * The cancellation of a request is being done asynchronously using
938  * _aio_cancel_req() from another thread context.
939  * A queued request can be cancelled in different manners :
940  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
941  *	- lock the queue -> remove the request -> unlock the queue
942  *	- this function/thread does not detect this cancellation process
943  * b) request is in progress (AIO_REQ_INPROGRESS) :
944  *	- this function first allow the cancellation of the running
945  *	  request with the flag "work_cancel_flg=1"
946  * 		see _aio_req_get() -> _aio_cancel_on()
947  *	  During this phase, it is allowed to interrupt the worker
948  *	  thread running the request (this thread) using the SIGAIOCANCEL
949  *	  signal.
950  *	  Once this thread returns from the kernel (because the request
951  *	  is just done), then it must disable a possible cancellation
952  *	  and proceed to finish the request.  To disable the cancellation
953  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
954  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
955  *	  same procedure as in a)
956  *
957  * To b)
958  *	This thread uses sigsetjmp() to define the position in the code, where
959  *	it wish to continue working in the case that a SIGAIOCANCEL signal
960  *	is detected.
961  *	Normally this thread should get the cancellation signal during the
962  *	kernel phase (reading or writing).  In that case the signal handler
963  *	aiosigcancelhndlr() is activated using the worker thread context,
964  *	which again will use the siglongjmp() function to break the standard
965  *	code flow and jump to the "sigsetjmp" position, provided that
966  *	"work_cancel_flg" is set to "1".
967  *	Because the "work_cancel_flg" is only manipulated by this worker
968  *	thread and it can only run on one CPU at a given time, it is not
969  *	necessary to protect that flag with the queue lock.
970  *	Returning from the kernel (read or write system call) we must
971  *	first disable the use of the SIGAIOCANCEL signal and accordingly
972  *	the use of the siglongjmp() function to prevent a possible deadlock:
973  *	- It can happens that this worker thread returns from the kernel and
974  *	  blocks in "work_qlock1",
975  *	- then a second thread cancels the apparently "in progress" request
976  *	  and sends the SIGAIOCANCEL signal to the worker thread,
977  *	- the worker thread gets assigned the "work_qlock1" and will returns
978  *	  from the kernel,
979  *	- the kernel detects the pending signal and activates the signal
980  *	  handler instead,
981  *	- if the "work_cancel_flg" is still set then the signal handler
982  *	  should use siglongjmp() to cancel the "in progress" request and
983  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
984  *	  for a second time => deadlock.
985  *	To avoid that situation we disable the cancellation of the request
986  *	in progress BEFORE we try to acquire the work_qlock1.
987  *	In that case the signal handler will not call siglongjmp() and the
988  *	worker thread will continue running the standard code flow.
989  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
990  *	an eventually required siglongjmp() freeing the work_qlock1 and
991  *	avoiding a deadlock.
992  */
993 void *
994 _aio_do_request(void *arglist)
995 {
996 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
997 	ulwp_t *self = curthread;
998 	struct aio_args *arg;
999 	aio_req_t *reqp;		/* current AIO request */
1000 	ssize_t retval;
1001 	int error;
1002 
1003 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1004 		aio_panic("_aio_do_request, pthread_setspecific()");
1005 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1006 	ASSERT(aiowp->work_req == NULL);
1007 
1008 	/*
1009 	 * We resume here when an operation is cancelled.
1010 	 * On first entry, aiowp->work_req == NULL, so all
1011 	 * we do is block SIGAIOCANCEL.
1012 	 */
1013 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
1014 	ASSERT(self->ul_sigdefer == 0);
1015 
1016 	sigoff(self);	/* block SIGAIOCANCEL */
1017 	if (aiowp->work_req != NULL)
1018 		_aio_finish_request(aiowp, -1, ECANCELED);
1019 
1020 	for (;;) {
1021 		/*
1022 		 * Put completed requests on aio_done_list.  This has
1023 		 * to be done as part of the main loop to ensure that
1024 		 * we don't artificially starve any aiowait'ers.
1025 		 */
1026 		if (aiowp->work_done1)
1027 			_aio_work_done(aiowp);
1028 
1029 top:
1030 		/* consume any deferred SIGAIOCANCEL signal here */
1031 		sigon(self);
1032 		sigoff(self);
1033 
1034 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1035 			if (_aio_idle(aiowp) != 0)
1036 				goto top;
1037 		}
1038 		arg = &reqp->req_args;
1039 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1040 		    reqp->req_state == AIO_REQ_CANCELED);
1041 		error = 0;
1042 
1043 		switch (reqp->req_op) {
1044 		case AIOREAD:
1045 		case AIOAREAD:
1046 			sigon(self);	/* unblock SIGAIOCANCEL */
1047 			retval = pread(arg->fd, arg->buf,
1048 			    arg->bufsz, arg->offset);
1049 			if (retval == -1) {
1050 				if (errno == ESPIPE) {
1051 					retval = read(arg->fd,
1052 					    arg->buf, arg->bufsz);
1053 					if (retval == -1)
1054 						error = errno;
1055 				} else {
1056 					error = errno;
1057 				}
1058 			}
1059 			sigoff(self);	/* block SIGAIOCANCEL */
1060 			break;
1061 		case AIOWRITE:
1062 		case AIOAWRITE:
1063 			sigon(self);	/* unblock SIGAIOCANCEL */
1064 			retval = pwrite(arg->fd, arg->buf,
1065 			    arg->bufsz, arg->offset);
1066 			if (retval == -1) {
1067 				if (errno == ESPIPE) {
1068 					retval = write(arg->fd,
1069 					    arg->buf, arg->bufsz);
1070 					if (retval == -1)
1071 						error = errno;
1072 				} else {
1073 					error = errno;
1074 				}
1075 			}
1076 			sigoff(self);	/* block SIGAIOCANCEL */
1077 			break;
1078 #if !defined(_LP64)
1079 		case AIOAREAD64:
1080 			sigon(self);	/* unblock SIGAIOCANCEL */
1081 			retval = pread64(arg->fd, arg->buf,
1082 			    arg->bufsz, arg->offset);
1083 			if (retval == -1) {
1084 				if (errno == ESPIPE) {
1085 					retval = read(arg->fd,
1086 					    arg->buf, arg->bufsz);
1087 					if (retval == -1)
1088 						error = errno;
1089 				} else {
1090 					error = errno;
1091 				}
1092 			}
1093 			sigoff(self);	/* block SIGAIOCANCEL */
1094 			break;
1095 		case AIOAWRITE64:
1096 			sigon(self);	/* unblock SIGAIOCANCEL */
1097 			retval = pwrite64(arg->fd, arg->buf,
1098 			    arg->bufsz, arg->offset);
1099 			if (retval == -1) {
1100 				if (errno == ESPIPE) {
1101 					retval = write(arg->fd,
1102 					    arg->buf, arg->bufsz);
1103 					if (retval == -1)
1104 						error = errno;
1105 				} else {
1106 					error = errno;
1107 				}
1108 			}
1109 			sigoff(self);	/* block SIGAIOCANCEL */
1110 			break;
1111 #endif	/* !defined(_LP64) */
1112 		case AIOFSYNC:
1113 			if (_aio_fsync_del(aiowp, reqp))
1114 				goto top;
1115 			ASSERT(reqp->req_head == NULL);
1116 			/*
1117 			 * All writes for this fsync request are now
1118 			 * acknowledged.  Now make these writes visible
1119 			 * and put the final request into the hash table.
1120 			 */
1121 			if (reqp->req_state == AIO_REQ_CANCELED) {
1122 				/* EMPTY */;
1123 			} else if (arg->offset == O_SYNC) {
1124 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1125 					error = errno;
1126 			} else {
1127 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1128 					error = errno;
1129 			}
1130 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1131 				aio_panic("_aio_do_request(): AIOFSYNC: "
1132 				    "request already in hash table");
1133 			break;
1134 		default:
1135 			aio_panic("_aio_do_request, bad op");
1136 		}
1137 
1138 		_aio_finish_request(aiowp, retval, error);
1139 	}
1140 	/* NOTREACHED */
1141 	return (NULL);
1142 }
1143 
1144 /*
1145  * Perform the tail processing for _aio_do_request().
1146  * The in-progress request may or may not have been cancelled.
1147  */
1148 static void
1149 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1150 {
1151 	aio_req_t *reqp;
1152 
1153 	sig_mutex_lock(&aiowp->work_qlock1);
1154 	if ((reqp = aiowp->work_req) == NULL)
1155 		sig_mutex_unlock(&aiowp->work_qlock1);
1156 	else {
1157 		aiowp->work_req = NULL;
1158 		if (reqp->req_state == AIO_REQ_CANCELED) {
1159 			retval = -1;
1160 			error = ECANCELED;
1161 		}
1162 		if (!POSIX_AIO(reqp)) {
1163 			int notify;
1164 			sig_mutex_unlock(&aiowp->work_qlock1);
1165 			sig_mutex_lock(&__aio_mutex);
1166 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1167 				reqp->req_state = AIO_REQ_DONE;
1168 			/*
1169 			 * If it was canceled, this request will not be
1170 			 * added to done list. Just free it.
1171 			 */
1172 			if (error == ECANCELED) {
1173 				_aio_outstand_cnt--;
1174 				_aio_req_free(reqp);
1175 			} else {
1176 				_aio_set_result(reqp, retval, error);
1177 				_aio_req_done_cnt++;
1178 			}
1179 			/*
1180 			 * Notify any thread that may have blocked
1181 			 * because it saw an outstanding request.
1182 			 */
1183 			notify = 0;
1184 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1185 				notify = 1;
1186 			}
1187 			sig_mutex_unlock(&__aio_mutex);
1188 			if (notify) {
1189 				(void) _kaio(AIONOTIFY);
1190 			}
1191 		} else {
1192 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1193 				reqp->req_state = AIO_REQ_DONE;
1194 			sig_mutex_unlock(&aiowp->work_qlock1);
1195 			_aiodone(reqp, retval, error);
1196 		}
1197 	}
1198 }
1199 
1200 void
1201 _aio_req_mark_done(aio_req_t *reqp)
1202 {
1203 #if !defined(_LP64)
1204 	if (reqp->req_largefile)
1205 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1206 	else
1207 #endif
1208 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1209 }
1210 
1211 /*
1212  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1213  * hopefully to consume one of our queued signals.
1214  */
1215 static void
1216 _aio_delay(int ticks)
1217 {
1218 	(void) usleep(ticks * (MICROSEC / hz));
1219 }
1220 
1221 /*
1222  * Actually send the notifications.
1223  * We could block indefinitely here if the application
1224  * is not listening for the signal or port notifications.
1225  */
1226 static void
1227 send_notification(notif_param_t *npp)
1228 {
1229 	extern int __sigqueue(pid_t pid, int signo,
1230 	    /* const union sigval */ void *value, int si_code, int block);
1231 
1232 	if (npp->np_signo)
1233 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1234 		    SI_ASYNCIO, 1);
1235 	else if (npp->np_port >= 0)
1236 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1237 		    npp->np_event, npp->np_object, npp->np_user);
1238 
1239 	if (npp->np_lio_signo)
1240 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1241 		    SI_ASYNCIO, 1);
1242 	else if (npp->np_lio_port >= 0)
1243 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1244 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1245 }
1246 
1247 /*
1248  * Asynchronous notification worker.
1249  */
1250 void *
1251 _aio_do_notify(void *arg)
1252 {
1253 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1254 	aio_req_t *reqp;
1255 
1256 	/*
1257 	 * This isn't really necessary.  All signals are blocked.
1258 	 */
1259 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1260 		aio_panic("_aio_do_notify, pthread_setspecific()");
1261 
1262 	/*
1263 	 * Notifications are never cancelled.
1264 	 * All signals remain blocked, forever.
1265 	 */
1266 	for (;;) {
1267 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1268 			if (_aio_idle(aiowp) != 0)
1269 				aio_panic("_aio_do_notify: _aio_idle() failed");
1270 		}
1271 		send_notification(&reqp->req_notify);
1272 		_aio_req_free(reqp);
1273 	}
1274 
1275 	/* NOTREACHED */
1276 	return (NULL);
1277 }
1278 
1279 /*
1280  * Do the completion semantics for a request that was either canceled
1281  * by _aio_cancel_req() or was completed by _aio_do_request().
1282  */
1283 static void
1284 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1285 {
1286 	aio_result_t *resultp = reqp->req_resultp;
1287 	int notify = 0;
1288 	aio_lio_t *head;
1289 	int sigev_none;
1290 	int sigev_signal;
1291 	int sigev_thread;
1292 	int sigev_port;
1293 	notif_param_t np;
1294 
1295 	/*
1296 	 * We call _aiodone() only for Posix I/O.
1297 	 */
1298 	ASSERT(POSIX_AIO(reqp));
1299 
1300 	sigev_none = 0;
1301 	sigev_signal = 0;
1302 	sigev_thread = 0;
1303 	sigev_port = 0;
1304 	np.np_signo = 0;
1305 	np.np_port = -1;
1306 	np.np_lio_signo = 0;
1307 	np.np_lio_port = -1;
1308 
1309 	switch (reqp->req_sigevent.sigev_notify) {
1310 	case SIGEV_NONE:
1311 		sigev_none = 1;
1312 		break;
1313 	case SIGEV_SIGNAL:
1314 		sigev_signal = 1;
1315 		break;
1316 	case SIGEV_THREAD:
1317 		sigev_thread = 1;
1318 		break;
1319 	case SIGEV_PORT:
1320 		sigev_port = 1;
1321 		break;
1322 	default:
1323 		aio_panic("_aiodone: improper sigev_notify");
1324 		break;
1325 	}
1326 
1327 	/*
1328 	 * Figure out the notification parameters while holding __aio_mutex.
1329 	 * Actually perform the notifications after dropping __aio_mutex.
1330 	 * This allows us to sleep for a long time (if the notifications
1331 	 * incur delays) without impeding other async I/O operations.
1332 	 */
1333 
1334 	sig_mutex_lock(&__aio_mutex);
1335 
1336 	if (sigev_signal) {
1337 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1338 			notify = 1;
1339 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1340 	} else if (sigev_thread | sigev_port) {
1341 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1342 			notify = 1;
1343 		np.np_event = reqp->req_op;
1344 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1345 			np.np_event = AIOFSYNC64;
1346 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1347 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1348 	}
1349 
1350 	if (resultp->aio_errno == EINPROGRESS)
1351 		_aio_set_result(reqp, retval, error);
1352 
1353 	_aio_outstand_cnt--;
1354 
1355 	head = reqp->req_head;
1356 	reqp->req_head = NULL;
1357 
1358 	if (sigev_none) {
1359 		_aio_enq_doneq(reqp);
1360 		reqp = NULL;
1361 	} else {
1362 		(void) _aio_hash_del(resultp);
1363 		_aio_req_mark_done(reqp);
1364 	}
1365 
1366 	_aio_waitn_wakeup();
1367 
1368 	/*
1369 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1370 	 * __aio_suspend() increments "_aio_kernel_suspend"
1371 	 * when they are waiting in the kernel for completed I/Os.
1372 	 *
1373 	 * _kaio(AIONOTIFY) awakes the corresponding function
1374 	 * in the kernel; then the corresponding __aio_waitn() or
1375 	 * __aio_suspend() function could reap the recently
1376 	 * completed I/Os (_aiodone()).
1377 	 */
1378 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1379 		(void) _kaio(AIONOTIFY);
1380 
1381 	sig_mutex_unlock(&__aio_mutex);
1382 
1383 	if (head != NULL) {
1384 		/*
1385 		 * If all the lio requests have completed,
1386 		 * prepare to notify the waiting thread.
1387 		 */
1388 		sig_mutex_lock(&head->lio_mutex);
1389 		ASSERT(head->lio_refcnt == head->lio_nent);
1390 		if (head->lio_refcnt == 1) {
1391 			int waiting = 0;
1392 			if (head->lio_mode == LIO_WAIT) {
1393 				if ((waiting = head->lio_waiting) != 0)
1394 					(void) cond_signal(&head->lio_cond_cv);
1395 			} else if (head->lio_port < 0) { /* none or signal */
1396 				if ((np.np_lio_signo = head->lio_signo) != 0)
1397 					notify = 1;
1398 				np.np_lio_user = head->lio_sigval.sival_ptr;
1399 			} else {			/* thread or port */
1400 				notify = 1;
1401 				np.np_lio_port = head->lio_port;
1402 				np.np_lio_event = head->lio_event;
1403 				np.np_lio_object =
1404 				    (uintptr_t)head->lio_sigevent;
1405 				np.np_lio_user = head->lio_sigval.sival_ptr;
1406 			}
1407 			head->lio_nent = head->lio_refcnt = 0;
1408 			sig_mutex_unlock(&head->lio_mutex);
1409 			if (waiting == 0)
1410 				_aio_lio_free(head);
1411 		} else {
1412 			head->lio_nent--;
1413 			head->lio_refcnt--;
1414 			sig_mutex_unlock(&head->lio_mutex);
1415 		}
1416 	}
1417 
1418 	/*
1419 	 * The request is completed; now perform the notifications.
1420 	 */
1421 	if (notify) {
1422 		if (reqp != NULL) {
1423 			/*
1424 			 * We usually put the request on the notification
1425 			 * queue because we don't want to block and delay
1426 			 * other operations behind us in the work queue.
1427 			 * Also we must never block on a cancel notification
1428 			 * because we are being called from an application
1429 			 * thread in this case and that could lead to deadlock
1430 			 * if no other thread is receiving notificatins.
1431 			 */
1432 			reqp->req_notify = np;
1433 			reqp->req_op = AIONOTIFY;
1434 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1435 			reqp = NULL;
1436 		} else {
1437 			/*
1438 			 * We already put the request on the done queue,
1439 			 * so we can't queue it to the notification queue.
1440 			 * Just do the notification directly.
1441 			 */
1442 			send_notification(&np);
1443 		}
1444 	}
1445 
1446 	if (reqp != NULL)
1447 		_aio_req_free(reqp);
1448 }
1449 
1450 /*
1451  * Delete fsync requests from list head until there is
1452  * only one left.  Return 0 when there is only one,
1453  * otherwise return a non-zero value.
1454  */
1455 static int
1456 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1457 {
1458 	aio_lio_t *head = reqp->req_head;
1459 	int rval = 0;
1460 
1461 	ASSERT(reqp == aiowp->work_req);
1462 	sig_mutex_lock(&aiowp->work_qlock1);
1463 	sig_mutex_lock(&head->lio_mutex);
1464 	if (head->lio_refcnt > 1) {
1465 		head->lio_refcnt--;
1466 		head->lio_nent--;
1467 		aiowp->work_req = NULL;
1468 		sig_mutex_unlock(&head->lio_mutex);
1469 		sig_mutex_unlock(&aiowp->work_qlock1);
1470 		sig_mutex_lock(&__aio_mutex);
1471 		_aio_outstand_cnt--;
1472 		_aio_waitn_wakeup();
1473 		sig_mutex_unlock(&__aio_mutex);
1474 		_aio_req_free(reqp);
1475 		return (1);
1476 	}
1477 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1478 	reqp->req_head = NULL;
1479 	if (head->lio_canned)
1480 		reqp->req_state = AIO_REQ_CANCELED;
1481 	if (head->lio_mode == LIO_DESTROY) {
1482 		aiowp->work_req = NULL;
1483 		rval = 1;
1484 	}
1485 	sig_mutex_unlock(&head->lio_mutex);
1486 	sig_mutex_unlock(&aiowp->work_qlock1);
1487 	head->lio_refcnt--;
1488 	head->lio_nent--;
1489 	_aio_lio_free(head);
1490 	if (rval != 0)
1491 		_aio_req_free(reqp);
1492 	return (rval);
1493 }
1494 
1495 /*
1496  * A worker is set idle when its work queue is empty.
1497  * The worker checks again that it has no more work
1498  * and then goes to sleep waiting for more work.
1499  */
1500 int
1501 _aio_idle(aio_worker_t *aiowp)
1502 {
1503 	int error = 0;
1504 
1505 	sig_mutex_lock(&aiowp->work_qlock1);
1506 	if (aiowp->work_count1 == 0) {
1507 		ASSERT(aiowp->work_minload1 == 0);
1508 		aiowp->work_idleflg = 1;
1509 		/*
1510 		 * A cancellation handler is not needed here.
1511 		 * aio worker threads are never cancelled via pthread_cancel().
1512 		 */
1513 		error = sig_cond_wait(&aiowp->work_idle_cv,
1514 		    &aiowp->work_qlock1);
1515 		/*
1516 		 * The idle flag is normally cleared before worker is awakened
1517 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1518 		 */
1519 		if (error)
1520 			aiowp->work_idleflg = 0;
1521 	}
1522 	sig_mutex_unlock(&aiowp->work_qlock1);
1523 	return (error);
1524 }
1525 
1526 /*
1527  * A worker's completed AIO requests are placed onto a global
1528  * done queue.  The application is only sent a SIGIO signal if
1529  * the process has a handler enabled and it is not waiting via
1530  * aiowait().
1531  */
1532 static void
1533 _aio_work_done(aio_worker_t *aiowp)
1534 {
1535 	aio_req_t *reqp;
1536 
1537 	sig_mutex_lock(&aiowp->work_qlock1);
1538 	reqp = aiowp->work_prev1;
1539 	reqp->req_next = NULL;
1540 	aiowp->work_done1 = 0;
1541 	aiowp->work_tail1 = aiowp->work_next1;
1542 	if (aiowp->work_tail1 == NULL)
1543 		aiowp->work_head1 = NULL;
1544 	aiowp->work_prev1 = NULL;
1545 	sig_mutex_unlock(&aiowp->work_qlock1);
1546 	sig_mutex_lock(&__aio_mutex);
1547 	_aio_donecnt++;
1548 	_aio_outstand_cnt--;
1549 	_aio_req_done_cnt--;
1550 	ASSERT(_aio_donecnt > 0 &&
1551 	    _aio_outstand_cnt >= 0 &&
1552 	    _aio_req_done_cnt >= 0);
1553 	ASSERT(reqp != NULL);
1554 
1555 	if (_aio_done_tail == NULL) {
1556 		_aio_done_head = _aio_done_tail = reqp;
1557 	} else {
1558 		_aio_done_head->req_next = reqp;
1559 		_aio_done_head = reqp;
1560 	}
1561 
1562 	if (_aiowait_flag) {
1563 		sig_mutex_unlock(&__aio_mutex);
1564 		(void) _kaio(AIONOTIFY);
1565 	} else {
1566 		sig_mutex_unlock(&__aio_mutex);
1567 		if (_sigio_enabled)
1568 			(void) kill(__pid, SIGIO);
1569 	}
1570 }
1571 
1572 /*
1573  * The done queue consists of AIO requests that are in either the
1574  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1575  * are discarded.  If the done queue is empty then NULL is returned.
1576  * Otherwise the address of a done aio_result_t is returned.
1577  */
1578 aio_result_t *
1579 _aio_req_done(void)
1580 {
1581 	aio_req_t *reqp;
1582 	aio_result_t *resultp;
1583 
1584 	ASSERT(MUTEX_HELD(&__aio_mutex));
1585 
1586 	if ((reqp = _aio_done_tail) != NULL) {
1587 		if ((_aio_done_tail = reqp->req_next) == NULL)
1588 			_aio_done_head = NULL;
1589 		ASSERT(_aio_donecnt > 0);
1590 		_aio_donecnt--;
1591 		(void) _aio_hash_del(reqp->req_resultp);
1592 		resultp = reqp->req_resultp;
1593 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1594 		_aio_req_free(reqp);
1595 		return (resultp);
1596 	}
1597 	/* is queue empty? */
1598 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1599 		return ((aio_result_t *)-1);
1600 	}
1601 	return (NULL);
1602 }
1603 
1604 /*
1605  * Set the return and errno values for the application's use.
1606  *
1607  * For the Posix interfaces, we must set the return value first followed
1608  * by the errno value because the Posix interfaces allow for a change
1609  * in the errno value from EINPROGRESS to something else to signal
1610  * the completion of the asynchronous request.
1611  *
1612  * The opposite is true for the Solaris interfaces.  These allow for
1613  * a change in the return value from AIO_INPROGRESS to something else
1614  * to signal the completion of the asynchronous request.
1615  */
1616 void
1617 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1618 {
1619 	aio_result_t *resultp = reqp->req_resultp;
1620 
1621 	if (POSIX_AIO(reqp)) {
1622 		resultp->aio_return = retval;
1623 		membar_producer();
1624 		resultp->aio_errno = error;
1625 	} else {
1626 		resultp->aio_errno = error;
1627 		membar_producer();
1628 		resultp->aio_return = retval;
1629 	}
1630 }
1631 
1632 /*
1633  * Add an AIO request onto the next work queue.
1634  * A circular list of workers is used to choose the next worker.
1635  */
1636 void
1637 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1638 {
1639 	ulwp_t *self = curthread;
1640 	aio_worker_t *aiowp;
1641 	aio_worker_t *first;
1642 	int load_bal_flg = 1;
1643 	int found;
1644 
1645 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1646 	reqp->req_next = NULL;
1647 	/*
1648 	 * Try to acquire the next worker's work queue.  If it is locked,
1649 	 * then search the list of workers until a queue is found unlocked,
1650 	 * or until the list is completely traversed at which point another
1651 	 * worker will be created.
1652 	 */
1653 	sigoff(self);		/* defer SIGIO */
1654 	sig_mutex_lock(&__aio_mutex);
1655 	first = aiowp = *nextworker;
1656 	if (mode != AIONOTIFY)
1657 		_aio_outstand_cnt++;
1658 	sig_mutex_unlock(&__aio_mutex);
1659 
1660 	switch (mode) {
1661 	case AIOREAD:
1662 	case AIOWRITE:
1663 	case AIOAREAD:
1664 	case AIOAWRITE:
1665 #if !defined(_LP64)
1666 	case AIOAREAD64:
1667 	case AIOAWRITE64:
1668 #endif
1669 		/* try to find an idle worker */
1670 		found = 0;
1671 		do {
1672 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1673 				if (aiowp->work_idleflg) {
1674 					found = 1;
1675 					break;
1676 				}
1677 				sig_mutex_unlock(&aiowp->work_qlock1);
1678 			}
1679 		} while ((aiowp = aiowp->work_forw) != first);
1680 
1681 		if (found) {
1682 			aiowp->work_minload1++;
1683 			break;
1684 		}
1685 
1686 		/* try to acquire some worker's queue lock */
1687 		do {
1688 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1689 				found = 1;
1690 				break;
1691 			}
1692 		} while ((aiowp = aiowp->work_forw) != first);
1693 
1694 		/*
1695 		 * Create more workers when the workers appear overloaded.
1696 		 * Either all the workers are busy draining their queues
1697 		 * or no worker's queue lock could be acquired.
1698 		 */
1699 		if (!found) {
1700 			if (_aio_worker_cnt < _max_workers) {
1701 				if (_aio_create_worker(reqp, mode))
1702 					aio_panic("_aio_req_add: add worker");
1703 				sigon(self);	/* reenable SIGIO */
1704 				return;
1705 			}
1706 
1707 			/*
1708 			 * No worker available and we have created
1709 			 * _max_workers, keep going through the
1710 			 * list slowly until we get a lock
1711 			 */
1712 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1713 				/*
1714 				 * give someone else a chance
1715 				 */
1716 				_aio_delay(1);
1717 				aiowp = aiowp->work_forw;
1718 			}
1719 		}
1720 
1721 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1722 		if (_aio_worker_cnt < _max_workers &&
1723 		    aiowp->work_minload1 >= _minworkload) {
1724 			sig_mutex_unlock(&aiowp->work_qlock1);
1725 			sig_mutex_lock(&__aio_mutex);
1726 			*nextworker = aiowp->work_forw;
1727 			sig_mutex_unlock(&__aio_mutex);
1728 			if (_aio_create_worker(reqp, mode))
1729 				aio_panic("aio_req_add: add worker");
1730 			sigon(self);	/* reenable SIGIO */
1731 			return;
1732 		}
1733 		aiowp->work_minload1++;
1734 		break;
1735 	case AIOFSYNC:
1736 	case AIONOTIFY:
1737 		load_bal_flg = 0;
1738 		sig_mutex_lock(&aiowp->work_qlock1);
1739 		break;
1740 	default:
1741 		aio_panic("_aio_req_add: invalid mode");
1742 		break;
1743 	}
1744 	/*
1745 	 * Put request onto worker's work queue.
1746 	 */
1747 	if (aiowp->work_tail1 == NULL) {
1748 		ASSERT(aiowp->work_count1 == 0);
1749 		aiowp->work_tail1 = reqp;
1750 		aiowp->work_next1 = reqp;
1751 	} else {
1752 		aiowp->work_head1->req_next = reqp;
1753 		if (aiowp->work_next1 == NULL)
1754 			aiowp->work_next1 = reqp;
1755 	}
1756 	reqp->req_state = AIO_REQ_QUEUED;
1757 	reqp->req_worker = aiowp;
1758 	aiowp->work_head1 = reqp;
1759 	/*
1760 	 * Awaken worker if it is not currently active.
1761 	 */
1762 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1763 		aiowp->work_idleflg = 0;
1764 		(void) cond_signal(&aiowp->work_idle_cv);
1765 	}
1766 	sig_mutex_unlock(&aiowp->work_qlock1);
1767 
1768 	if (load_bal_flg) {
1769 		sig_mutex_lock(&__aio_mutex);
1770 		*nextworker = aiowp->work_forw;
1771 		sig_mutex_unlock(&__aio_mutex);
1772 	}
1773 	sigon(self);	/* reenable SIGIO */
1774 }
1775 
1776 /*
1777  * Get an AIO request for a specified worker.
1778  * If the work queue is empty, return NULL.
1779  */
1780 aio_req_t *
1781 _aio_req_get(aio_worker_t *aiowp)
1782 {
1783 	aio_req_t *reqp;
1784 
1785 	sig_mutex_lock(&aiowp->work_qlock1);
1786 	if ((reqp = aiowp->work_next1) != NULL) {
1787 		/*
1788 		 * Remove a POSIX request from the queue; the
1789 		 * request queue is a singularly linked list
1790 		 * with a previous pointer.  The request is
1791 		 * removed by updating the previous pointer.
1792 		 *
1793 		 * Non-posix requests are left on the queue
1794 		 * to eventually be placed on the done queue.
1795 		 */
1796 
1797 		if (POSIX_AIO(reqp)) {
1798 			if (aiowp->work_prev1 == NULL) {
1799 				aiowp->work_tail1 = reqp->req_next;
1800 				if (aiowp->work_tail1 == NULL)
1801 					aiowp->work_head1 = NULL;
1802 			} else {
1803 				aiowp->work_prev1->req_next = reqp->req_next;
1804 				if (aiowp->work_head1 == reqp)
1805 					aiowp->work_head1 = reqp->req_next;
1806 			}
1807 
1808 		} else {
1809 			aiowp->work_prev1 = reqp;
1810 			ASSERT(aiowp->work_done1 >= 0);
1811 			aiowp->work_done1++;
1812 		}
1813 		ASSERT(reqp != reqp->req_next);
1814 		aiowp->work_next1 = reqp->req_next;
1815 		ASSERT(aiowp->work_count1 >= 1);
1816 		aiowp->work_count1--;
1817 		switch (reqp->req_op) {
1818 		case AIOREAD:
1819 		case AIOWRITE:
1820 		case AIOAREAD:
1821 		case AIOAWRITE:
1822 #if !defined(_LP64)
1823 		case AIOAREAD64:
1824 		case AIOAWRITE64:
1825 #endif
1826 			ASSERT(aiowp->work_minload1 > 0);
1827 			aiowp->work_minload1--;
1828 			break;
1829 		}
1830 		reqp->req_state = AIO_REQ_INPROGRESS;
1831 	}
1832 	aiowp->work_req = reqp;
1833 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1834 	sig_mutex_unlock(&aiowp->work_qlock1);
1835 	return (reqp);
1836 }
1837 
1838 static void
1839 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1840 {
1841 	aio_req_t **last;
1842 	aio_req_t *lastrp;
1843 	aio_req_t *next;
1844 
1845 	ASSERT(aiowp != NULL);
1846 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1847 	if (POSIX_AIO(reqp)) {
1848 		if (ostate != AIO_REQ_QUEUED)
1849 			return;
1850 	}
1851 	last = &aiowp->work_tail1;
1852 	lastrp = aiowp->work_tail1;
1853 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1854 	while ((next = *last) != NULL) {
1855 		if (next == reqp) {
1856 			*last = next->req_next;
1857 			if (aiowp->work_next1 == next)
1858 				aiowp->work_next1 = next->req_next;
1859 
1860 			if ((next->req_next != NULL) ||
1861 			    (aiowp->work_done1 == 0)) {
1862 				if (aiowp->work_head1 == next)
1863 					aiowp->work_head1 = next->req_next;
1864 				if (aiowp->work_prev1 == next)
1865 					aiowp->work_prev1 = next->req_next;
1866 			} else {
1867 				if (aiowp->work_head1 == next)
1868 					aiowp->work_head1 = lastrp;
1869 				if (aiowp->work_prev1 == next)
1870 					aiowp->work_prev1 = lastrp;
1871 			}
1872 
1873 			if (ostate == AIO_REQ_QUEUED) {
1874 				ASSERT(aiowp->work_count1 >= 1);
1875 				aiowp->work_count1--;
1876 				ASSERT(aiowp->work_minload1 >= 1);
1877 				aiowp->work_minload1--;
1878 			} else {
1879 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1880 				    !POSIX_AIO(reqp));
1881 				aiowp->work_done1--;
1882 			}
1883 			return;
1884 		}
1885 		last = &next->req_next;
1886 		lastrp = next;
1887 	}
1888 	/* NOTREACHED */
1889 }
1890 
1891 static void
1892 _aio_enq_doneq(aio_req_t *reqp)
1893 {
1894 	if (_aio_doneq == NULL) {
1895 		_aio_doneq = reqp;
1896 		reqp->req_next = reqp->req_prev = reqp;
1897 	} else {
1898 		reqp->req_next = _aio_doneq;
1899 		reqp->req_prev = _aio_doneq->req_prev;
1900 		_aio_doneq->req_prev->req_next = reqp;
1901 		_aio_doneq->req_prev = reqp;
1902 	}
1903 	reqp->req_state = AIO_REQ_DONEQ;
1904 	_aio_doneq_cnt++;
1905 }
1906 
1907 /*
1908  * caller owns the _aio_mutex
1909  */
1910 aio_req_t *
1911 _aio_req_remove(aio_req_t *reqp)
1912 {
1913 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1914 		return (NULL);
1915 
1916 	if (reqp) {
1917 		/* request in done queue */
1918 		if (_aio_doneq == reqp)
1919 			_aio_doneq = reqp->req_next;
1920 		if (_aio_doneq == reqp) {
1921 			/* only one request on queue */
1922 			_aio_doneq = NULL;
1923 		} else {
1924 			aio_req_t *tmp = reqp->req_next;
1925 			reqp->req_prev->req_next = tmp;
1926 			tmp->req_prev = reqp->req_prev;
1927 		}
1928 	} else if ((reqp = _aio_doneq) != NULL) {
1929 		if (reqp == reqp->req_next) {
1930 			/* only one request on queue */
1931 			_aio_doneq = NULL;
1932 		} else {
1933 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1934 			_aio_doneq->req_prev = reqp->req_prev;
1935 		}
1936 	}
1937 	if (reqp) {
1938 		_aio_doneq_cnt--;
1939 		reqp->req_next = reqp->req_prev = reqp;
1940 		reqp->req_state = AIO_REQ_DONE;
1941 	}
1942 	return (reqp);
1943 }
1944 
1945 /*
1946  * An AIO request is identified by an aio_result_t pointer.  The library
1947  * maps this aio_result_t pointer to its internal representation using a
1948  * hash table.  This function adds an aio_result_t pointer to the hash table.
1949  */
1950 static int
1951 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1952 {
1953 	aio_hash_t *hashp;
1954 	aio_req_t **prev;
1955 	aio_req_t *next;
1956 
1957 	hashp = _aio_hash + AIOHASH(resultp);
1958 	lmutex_lock(&hashp->hash_lock);
1959 	prev = &hashp->hash_ptr;
1960 	while ((next = *prev) != NULL) {
1961 		if (resultp == next->req_resultp) {
1962 			lmutex_unlock(&hashp->hash_lock);
1963 			return (-1);
1964 		}
1965 		prev = &next->req_link;
1966 	}
1967 	*prev = reqp;
1968 	ASSERT(reqp->req_link == NULL);
1969 	lmutex_unlock(&hashp->hash_lock);
1970 	return (0);
1971 }
1972 
1973 /*
1974  * Remove an entry from the hash table.
1975  */
1976 aio_req_t *
1977 _aio_hash_del(aio_result_t *resultp)
1978 {
1979 	aio_hash_t *hashp;
1980 	aio_req_t **prev;
1981 	aio_req_t *next = NULL;
1982 
1983 	if (_aio_hash != NULL) {
1984 		hashp = _aio_hash + AIOHASH(resultp);
1985 		lmutex_lock(&hashp->hash_lock);
1986 		prev = &hashp->hash_ptr;
1987 		while ((next = *prev) != NULL) {
1988 			if (resultp == next->req_resultp) {
1989 				*prev = next->req_link;
1990 				next->req_link = NULL;
1991 				break;
1992 			}
1993 			prev = &next->req_link;
1994 		}
1995 		lmutex_unlock(&hashp->hash_lock);
1996 	}
1997 	return (next);
1998 }
1999 
2000 /*
2001  *  find an entry in the hash table
2002  */
2003 aio_req_t *
2004 _aio_hash_find(aio_result_t *resultp)
2005 {
2006 	aio_hash_t *hashp;
2007 	aio_req_t **prev;
2008 	aio_req_t *next = NULL;
2009 
2010 	if (_aio_hash != NULL) {
2011 		hashp = _aio_hash + AIOHASH(resultp);
2012 		lmutex_lock(&hashp->hash_lock);
2013 		prev = &hashp->hash_ptr;
2014 		while ((next = *prev) != NULL) {
2015 			if (resultp == next->req_resultp)
2016 				break;
2017 			prev = &next->req_link;
2018 		}
2019 		lmutex_unlock(&hashp->hash_lock);
2020 	}
2021 	return (next);
2022 }
2023 
2024 /*
2025  * AIO interface for POSIX
2026  */
2027 int
2028 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2029     int mode, int flg)
2030 {
2031 	aio_req_t *reqp;
2032 	aio_args_t *ap;
2033 	int kerr;
2034 
2035 	if (aiocbp == NULL) {
2036 		errno = EINVAL;
2037 		return (-1);
2038 	}
2039 
2040 	/* initialize kaio */
2041 	if (!_kaio_ok)
2042 		_kaio_init();
2043 
2044 	aiocbp->aio_state = NOCHECK;
2045 
2046 	/*
2047 	 * If we have been called because a list I/O
2048 	 * kaio() failed, we dont want to repeat the
2049 	 * system call
2050 	 */
2051 
2052 	if (flg & AIO_KAIO) {
2053 		/*
2054 		 * Try kernel aio first.
2055 		 * If errno is ENOTSUP/EBADFD,
2056 		 * fall back to the thread implementation.
2057 		 */
2058 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2059 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2060 			aiocbp->aio_state = CHECK;
2061 			kerr = (int)_kaio(mode, aiocbp);
2062 			if (kerr == 0)
2063 				return (0);
2064 			if (errno != ENOTSUP && errno != EBADFD) {
2065 				aiocbp->aio_resultp.aio_errno = errno;
2066 				aiocbp->aio_resultp.aio_return = -1;
2067 				aiocbp->aio_state = NOCHECK;
2068 				return (-1);
2069 			}
2070 			if (errno == EBADFD)
2071 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2072 		}
2073 	}
2074 
2075 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2076 	aiocbp->aio_state = USERAIO;
2077 
2078 	if (!__uaio_ok && __uaio_init() == -1)
2079 		return (-1);
2080 
2081 	if ((reqp = _aio_req_alloc()) == NULL) {
2082 		errno = EAGAIN;
2083 		return (-1);
2084 	}
2085 
2086 	/*
2087 	 * If an LIO request, add the list head to the aio request
2088 	 */
2089 	reqp->req_head = lio_head;
2090 	reqp->req_type = AIO_POSIX_REQ;
2091 	reqp->req_op = mode;
2092 	reqp->req_largefile = 0;
2093 
2094 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2095 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2096 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2097 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2098 		reqp->req_sigevent.sigev_signo =
2099 		    aiocbp->aio_sigevent.sigev_signo;
2100 		reqp->req_sigevent.sigev_value.sival_ptr =
2101 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2102 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2103 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2104 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2105 		/*
2106 		 * Reuse the sigevent structure to contain the port number
2107 		 * and the user value.  Same for SIGEV_THREAD, below.
2108 		 */
2109 		reqp->req_sigevent.sigev_signo =
2110 		    pn->portnfy_port;
2111 		reqp->req_sigevent.sigev_value.sival_ptr =
2112 		    pn->portnfy_user;
2113 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2114 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2115 		/*
2116 		 * The sigevent structure contains the port number
2117 		 * and the user value.  Same for SIGEV_PORT, above.
2118 		 */
2119 		reqp->req_sigevent.sigev_signo =
2120 		    aiocbp->aio_sigevent.sigev_signo;
2121 		reqp->req_sigevent.sigev_value.sival_ptr =
2122 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2123 	}
2124 
2125 	reqp->req_resultp = &aiocbp->aio_resultp;
2126 	reqp->req_aiocbp = aiocbp;
2127 	ap = &reqp->req_args;
2128 	ap->fd = aiocbp->aio_fildes;
2129 	ap->buf = (caddr_t)aiocbp->aio_buf;
2130 	ap->bufsz = aiocbp->aio_nbytes;
2131 	ap->offset = aiocbp->aio_offset;
2132 
2133 	if ((flg & AIO_NO_DUPS) &&
2134 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2135 		aio_panic("_aio_rw(): request already in hash table");
2136 		_aio_req_free(reqp);
2137 		errno = EINVAL;
2138 		return (-1);
2139 	}
2140 	_aio_req_add(reqp, nextworker, mode);
2141 	return (0);
2142 }
2143 
2144 #if !defined(_LP64)
2145 /*
2146  * 64-bit AIO interface for POSIX
2147  */
2148 int
2149 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2150     int mode, int flg)
2151 {
2152 	aio_req_t *reqp;
2153 	aio_args_t *ap;
2154 	int kerr;
2155 
2156 	if (aiocbp == NULL) {
2157 		errno = EINVAL;
2158 		return (-1);
2159 	}
2160 
2161 	/* initialize kaio */
2162 	if (!_kaio_ok)
2163 		_kaio_init();
2164 
2165 	aiocbp->aio_state = NOCHECK;
2166 
2167 	/*
2168 	 * If we have been called because a list I/O
2169 	 * kaio() failed, we dont want to repeat the
2170 	 * system call
2171 	 */
2172 
2173 	if (flg & AIO_KAIO) {
2174 		/*
2175 		 * Try kernel aio first.
2176 		 * If errno is ENOTSUP/EBADFD,
2177 		 * fall back to the thread implementation.
2178 		 */
2179 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2180 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2181 			aiocbp->aio_state = CHECK;
2182 			kerr = (int)_kaio(mode, aiocbp);
2183 			if (kerr == 0)
2184 				return (0);
2185 			if (errno != ENOTSUP && errno != EBADFD) {
2186 				aiocbp->aio_resultp.aio_errno = errno;
2187 				aiocbp->aio_resultp.aio_return = -1;
2188 				aiocbp->aio_state = NOCHECK;
2189 				return (-1);
2190 			}
2191 			if (errno == EBADFD)
2192 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2193 		}
2194 	}
2195 
2196 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2197 	aiocbp->aio_state = USERAIO;
2198 
2199 	if (!__uaio_ok && __uaio_init() == -1)
2200 		return (-1);
2201 
2202 	if ((reqp = _aio_req_alloc()) == NULL) {
2203 		errno = EAGAIN;
2204 		return (-1);
2205 	}
2206 
2207 	/*
2208 	 * If an LIO request, add the list head to the aio request
2209 	 */
2210 	reqp->req_head = lio_head;
2211 	reqp->req_type = AIO_POSIX_REQ;
2212 	reqp->req_op = mode;
2213 	reqp->req_largefile = 1;
2214 
2215 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2216 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2217 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2218 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2219 		reqp->req_sigevent.sigev_signo =
2220 		    aiocbp->aio_sigevent.sigev_signo;
2221 		reqp->req_sigevent.sigev_value.sival_ptr =
2222 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2223 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2224 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2225 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2226 		reqp->req_sigevent.sigev_signo =
2227 		    pn->portnfy_port;
2228 		reqp->req_sigevent.sigev_value.sival_ptr =
2229 		    pn->portnfy_user;
2230 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2231 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2232 		reqp->req_sigevent.sigev_signo =
2233 		    aiocbp->aio_sigevent.sigev_signo;
2234 		reqp->req_sigevent.sigev_value.sival_ptr =
2235 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2236 	}
2237 
2238 	reqp->req_resultp = &aiocbp->aio_resultp;
2239 	reqp->req_aiocbp = aiocbp;
2240 	ap = &reqp->req_args;
2241 	ap->fd = aiocbp->aio_fildes;
2242 	ap->buf = (caddr_t)aiocbp->aio_buf;
2243 	ap->bufsz = aiocbp->aio_nbytes;
2244 	ap->offset = aiocbp->aio_offset;
2245 
2246 	if ((flg & AIO_NO_DUPS) &&
2247 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2248 		aio_panic("_aio_rw64(): request already in hash table");
2249 		_aio_req_free(reqp);
2250 		errno = EINVAL;
2251 		return (-1);
2252 	}
2253 	_aio_req_add(reqp, nextworker, mode);
2254 	return (0);
2255 }
2256 #endif	/* !defined(_LP64) */
2257