xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision 60405de4d8688d96dd05157c28db3ade5c9bc234)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include "synonyms.h"
30 #include "thr_uberdata.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fdsync(int, int);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48 
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53 
54 /*
55  * switch for kernel async I/O
56  */
57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58 
59 /*
60  * Key for thread-specific data
61  */
62 pthread_key_t _aio_key;
63 
64 /*
65  * Array for determining whether or not a file supports kaio.
66  * Initialized in _kaio_init().
67  */
68 uint32_t *_kaio_supported = NULL;
69 
70 /*
71  *  workers for read/write requests
72  * (__aio_mutex lock protects circular linked list of workers)
73  */
74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76 int __rw_workerscnt;		/* number of read/write workers */
77 
78 /*
79  * worker for notification requests.
80  */
81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83 int __no_workerscnt;		/* number of write workers */
84 
85 aio_req_t *_aio_done_tail;		/* list of done requests */
86 aio_req_t *_aio_done_head;
87 
88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91 
92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94 
95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97 
98 aio_hash_t *_aio_hash;
99 
100 aio_req_t *_aio_doneq;			/* double linked done queue list */
101 
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110 
111 int _max_workers = 256;			/* max number of workers permitted */
112 int _min_workers = 4;			/* min number of workers */
113 int _minworkload = 2;			/* min number of request in q */
114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
115 int __uaio_ok = 0;			/* AIO has been enabled */
116 sigset_t _worker_set;			/* worker's signal mask */
117 
118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119 int _aio_flags = 0;			/* see asyncio.h defines for */
120 
121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122 
123 int hz;					/* clock ticks per second */
124 
125 static int
126 _kaio_supported_init(void)
127 {
128 	void *ptr;
129 	size_t size;
130 
131 	if (_kaio_supported != NULL)	/* already initialized */
132 		return (0);
133 
134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 	if (ptr == MAP_FAILED)
138 		return (-1);
139 	_kaio_supported = ptr;
140 	return (0);
141 }
142 
143 /*
144  * The aio subsystem is initialized when an AIO request is made.
145  * Constants are initialized like the max number of workers that
146  * the subsystem can create, and the minimum number of workers
147  * permitted before imposing some restrictions.  Also, some
148  * workers are created.
149  */
150 int
151 __uaio_init(void)
152 {
153 	int ret = -1;
154 	int i;
155 
156 	lmutex_lock(&__aio_initlock);
157 	while (__aio_initbusy)
158 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
159 	if (__uaio_ok) {	/* already initialized */
160 		lmutex_unlock(&__aio_initlock);
161 		return (0);
162 	}
163 	__aio_initbusy = 1;
164 	lmutex_unlock(&__aio_initlock);
165 
166 	hz = (int)sysconf(_SC_CLK_TCK);
167 	__pid = getpid();
168 
169 	setup_cancelsig(SIGAIOCANCEL);
170 
171 	if (_kaio_supported_init() != 0)
172 		goto out;
173 
174 	/*
175 	 * Allocate and initialize the hash table.
176 	 */
177 	/* LINTED pointer cast */
178 	_aio_hash = (aio_hash_t *)mmap(NULL,
179 	    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
180 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
181 	if ((void *)_aio_hash == MAP_FAILED) {
182 		_aio_hash = NULL;
183 		goto out;
184 	}
185 	for (i = 0; i < HASHSZ; i++)
186 		(void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL);
187 
188 	/*
189 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
190 	 */
191 	(void) sigfillset(&_worker_set);
192 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
193 
194 	/*
195 	 * Create the minimum number of read/write workers.
196 	 */
197 	for (i = 0; i < _min_workers; i++)
198 		(void) _aio_create_worker(NULL, AIOREAD);
199 
200 	/*
201 	 * Create one worker to send asynchronous notifications.
202 	 */
203 	(void) _aio_create_worker(NULL, AIONOTIFY);
204 
205 	ret = 0;
206 out:
207 	lmutex_lock(&__aio_initlock);
208 	if (ret == 0)
209 		__uaio_ok = 1;
210 	__aio_initbusy = 0;
211 	(void) cond_broadcast(&__aio_initcv);
212 	lmutex_unlock(&__aio_initlock);
213 	return (ret);
214 }
215 
216 /*
217  * Called from close() before actually performing the real _close().
218  */
219 void
220 _aio_close(int fd)
221 {
222 	if (fd < 0)	/* avoid cancelling everything */
223 		return;
224 	/*
225 	 * Cancel all outstanding aio requests for this file descriptor.
226 	 */
227 	if (__uaio_ok)
228 		(void) aiocancel_all(fd);
229 	/*
230 	 * If we have allocated the bit array, clear the bit for this file.
231 	 * The next open may re-use this file descriptor and the new file
232 	 * may have different kaio() behaviour.
233 	 */
234 	if (_kaio_supported != NULL)
235 		CLEAR_KAIO_SUPPORTED(fd);
236 }
237 
238 /*
239  * special kaio cleanup thread sits in a loop in the
240  * kernel waiting for pending kaio requests to complete.
241  */
242 void *
243 _kaio_cleanup_thread(void *arg)
244 {
245 	if (pthread_setspecific(_aio_key, arg) != 0)
246 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
247 	(void) _kaio(AIOSTART);
248 	return (arg);
249 }
250 
251 /*
252  * initialize kaio.
253  */
254 void
255 _kaio_init()
256 {
257 	int error;
258 	sigset_t oset;
259 
260 	lmutex_lock(&__aio_initlock);
261 	while (__aio_initbusy)
262 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
263 	if (_kaio_ok) {		/* already initialized */
264 		lmutex_unlock(&__aio_initlock);
265 		return;
266 	}
267 	__aio_initbusy = 1;
268 	lmutex_unlock(&__aio_initlock);
269 
270 	if (_kaio_supported_init() != 0)
271 		error = ENOMEM;
272 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
273 		error = ENOMEM;
274 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
275 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
276 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
277 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
278 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
279 	}
280 	if (error && _kaiowp != NULL) {
281 		_aio_worker_free(_kaiowp);
282 		_kaiowp = NULL;
283 	}
284 
285 	lmutex_lock(&__aio_initlock);
286 	if (error)
287 		_kaio_ok = -1;
288 	else
289 		_kaio_ok = 1;
290 	__aio_initbusy = 0;
291 	(void) cond_broadcast(&__aio_initcv);
292 	lmutex_unlock(&__aio_initlock);
293 }
294 
295 int
296 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
297     aio_result_t *resultp)
298 {
299 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
300 }
301 
302 int
303 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
304     aio_result_t *resultp)
305 {
306 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
307 }
308 
309 #if !defined(_LP64)
310 int
311 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
312     aio_result_t *resultp)
313 {
314 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
315 }
316 
317 int
318 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
319     aio_result_t *resultp)
320 {
321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
322 }
323 #endif	/* !defined(_LP64) */
324 
325 int
326 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
327     aio_result_t *resultp, int mode)
328 {
329 	aio_req_t *reqp;
330 	aio_args_t *ap;
331 	offset_t loffset;
332 	struct stat stat;
333 	int error = 0;
334 	int kerr;
335 	int umode;
336 
337 	switch (whence) {
338 
339 	case SEEK_SET:
340 		loffset = offset;
341 		break;
342 	case SEEK_CUR:
343 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
344 			error = -1;
345 		else
346 			loffset += offset;
347 		break;
348 	case SEEK_END:
349 		if (fstat(fd, &stat) == -1)
350 			error = -1;
351 		else
352 			loffset = offset + stat.st_size;
353 		break;
354 	default:
355 		errno = EINVAL;
356 		error = -1;
357 	}
358 
359 	if (error)
360 		return (error);
361 
362 	/* initialize kaio */
363 	if (!_kaio_ok)
364 		_kaio_init();
365 
366 	/*
367 	 * _aio_do_request() needs the original request code (mode) to be able
368 	 * to choose the appropiate 32/64 bit function.  All other functions
369 	 * only require the difference between READ and WRITE (umode).
370 	 */
371 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
372 		umode = mode - AIOAREAD64;
373 	else
374 		umode = mode;
375 
376 	/*
377 	 * Try kernel aio first.
378 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
379 	 */
380 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
381 		resultp->aio_errno = 0;
382 		sig_mutex_lock(&__aio_mutex);
383 		_kaio_outstand_cnt++;
384 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
385 		    (umode | AIO_POLL_BIT) : umode),
386 		    fd, buf, bufsz, loffset, resultp);
387 		if (kerr == 0) {
388 			sig_mutex_unlock(&__aio_mutex);
389 			return (0);
390 		}
391 		_kaio_outstand_cnt--;
392 		sig_mutex_unlock(&__aio_mutex);
393 		if (errno != ENOTSUP && errno != EBADFD)
394 			return (-1);
395 		if (errno == EBADFD)
396 			SET_KAIO_NOT_SUPPORTED(fd);
397 	}
398 
399 	if (!__uaio_ok && __uaio_init() == -1)
400 		return (-1);
401 
402 	if ((reqp = _aio_req_alloc()) == NULL) {
403 		errno = EAGAIN;
404 		return (-1);
405 	}
406 
407 	/*
408 	 * _aio_do_request() checks reqp->req_op to differentiate
409 	 * between 32 and 64 bit access.
410 	 */
411 	reqp->req_op = mode;
412 	reqp->req_resultp = resultp;
413 	ap = &reqp->req_args;
414 	ap->fd = fd;
415 	ap->buf = buf;
416 	ap->bufsz = bufsz;
417 	ap->offset = loffset;
418 
419 	if (_aio_hash_insert(resultp, reqp) != 0) {
420 		_aio_req_free(reqp);
421 		errno = EINVAL;
422 		return (-1);
423 	}
424 	/*
425 	 * _aio_req_add() only needs the difference between READ and
426 	 * WRITE to choose the right worker queue.
427 	 */
428 	_aio_req_add(reqp, &__nextworker_rw, umode);
429 	return (0);
430 }
431 
432 int
433 aiocancel(aio_result_t *resultp)
434 {
435 	aio_req_t *reqp;
436 	aio_worker_t *aiowp;
437 	int ret;
438 	int done = 0;
439 	int canceled = 0;
440 
441 	if (!__uaio_ok) {
442 		errno = EINVAL;
443 		return (-1);
444 	}
445 
446 	sig_mutex_lock(&__aio_mutex);
447 	reqp = _aio_hash_find(resultp);
448 	if (reqp == NULL) {
449 		if (_aio_outstand_cnt == _aio_req_done_cnt)
450 			errno = EINVAL;
451 		else
452 			errno = EACCES;
453 		ret = -1;
454 	} else {
455 		aiowp = reqp->req_worker;
456 		sig_mutex_lock(&aiowp->work_qlock1);
457 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
458 		sig_mutex_unlock(&aiowp->work_qlock1);
459 
460 		if (canceled) {
461 			ret = 0;
462 		} else {
463 			if (_aio_outstand_cnt == 0 ||
464 			    _aio_outstand_cnt == _aio_req_done_cnt)
465 				errno = EINVAL;
466 			else
467 				errno = EACCES;
468 			ret = -1;
469 		}
470 	}
471 	sig_mutex_unlock(&__aio_mutex);
472 	return (ret);
473 }
474 
475 /*
476  * This must be asynch safe
477  */
478 aio_result_t *
479 aiowait(struct timeval *uwait)
480 {
481 	aio_result_t *uresultp;
482 	aio_result_t *kresultp;
483 	aio_result_t *resultp;
484 	int dontblock;
485 	int timedwait = 0;
486 	int kaio_errno = 0;
487 	struct timeval twait;
488 	struct timeval *wait = NULL;
489 	hrtime_t hrtend;
490 	hrtime_t hres;
491 
492 	if (uwait) {
493 		/*
494 		 * Check for a valid specified wait time.
495 		 * If it is invalid, fail the call right away.
496 		 */
497 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
498 		    uwait->tv_usec >= MICROSEC) {
499 			errno = EINVAL;
500 			return ((aio_result_t *)-1);
501 		}
502 
503 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
504 			hrtend = gethrtime() +
505 				(hrtime_t)uwait->tv_sec * NANOSEC +
506 				(hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
507 			twait = *uwait;
508 			wait = &twait;
509 			timedwait++;
510 		} else {
511 			/* polling */
512 			sig_mutex_lock(&__aio_mutex);
513 			if (_kaio_outstand_cnt == 0) {
514 				kresultp = (aio_result_t *)-1;
515 			} else {
516 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
517 				    (struct timeval *)-1, 1);
518 				if (kresultp != (aio_result_t *)-1 &&
519 				    kresultp != NULL &&
520 				    kresultp != (aio_result_t *)1) {
521 					_kaio_outstand_cnt--;
522 					sig_mutex_unlock(&__aio_mutex);
523 					return (kresultp);
524 				}
525 			}
526 			uresultp = _aio_req_done();
527 			sig_mutex_unlock(&__aio_mutex);
528 			if (uresultp != NULL &&
529 			    uresultp != (aio_result_t *)-1) {
530 				return (uresultp);
531 			}
532 			if (uresultp == (aio_result_t *)-1 &&
533 			    kresultp == (aio_result_t *)-1) {
534 				errno = EINVAL;
535 				return ((aio_result_t *)-1);
536 			} else {
537 				return (NULL);
538 			}
539 		}
540 	}
541 
542 	for (;;) {
543 		sig_mutex_lock(&__aio_mutex);
544 		uresultp = _aio_req_done();
545 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
546 			sig_mutex_unlock(&__aio_mutex);
547 			resultp = uresultp;
548 			break;
549 		}
550 		_aiowait_flag++;
551 		dontblock = (uresultp == (aio_result_t *)-1);
552 		if (dontblock && _kaio_outstand_cnt == 0) {
553 			kresultp = (aio_result_t *)-1;
554 			kaio_errno = EINVAL;
555 		} else {
556 			sig_mutex_unlock(&__aio_mutex);
557 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
558 			    wait, dontblock);
559 			sig_mutex_lock(&__aio_mutex);
560 			kaio_errno = errno;
561 		}
562 		_aiowait_flag--;
563 		sig_mutex_unlock(&__aio_mutex);
564 		if (kresultp == (aio_result_t *)1) {
565 			/* aiowait() awakened by an aionotify() */
566 			continue;
567 		} else if (kresultp != NULL &&
568 		    kresultp != (aio_result_t *)-1) {
569 			resultp = kresultp;
570 			sig_mutex_lock(&__aio_mutex);
571 			_kaio_outstand_cnt--;
572 			sig_mutex_unlock(&__aio_mutex);
573 			break;
574 		} else if (kresultp == (aio_result_t *)-1 &&
575 		    kaio_errno == EINVAL &&
576 		    uresultp == (aio_result_t *)-1) {
577 			errno = kaio_errno;
578 			resultp = (aio_result_t *)-1;
579 			break;
580 		} else if (kresultp == (aio_result_t *)-1 &&
581 		    kaio_errno == EINTR) {
582 			errno = kaio_errno;
583 			resultp = (aio_result_t *)-1;
584 			break;
585 		} else if (timedwait) {
586 			hres = hrtend - gethrtime();
587 			if (hres <= 0) {
588 				/* time is up; return */
589 				resultp = NULL;
590 				break;
591 			} else {
592 				/*
593 				 * Some time left.  Round up the remaining time
594 				 * in nanoseconds to microsec.  Retry the call.
595 				 */
596 				hres += (NANOSEC / MICROSEC) - 1;
597 				wait->tv_sec = hres / NANOSEC;
598 				wait->tv_usec =
599 					(hres % NANOSEC) / (NANOSEC / MICROSEC);
600 			}
601 		} else {
602 			ASSERT(kresultp == NULL && uresultp == NULL);
603 			resultp = NULL;
604 			continue;
605 		}
606 	}
607 	return (resultp);
608 }
609 
610 /*
611  * _aio_get_timedelta calculates the remaining time and stores the result
612  * into timespec_t *wait.
613  */
614 
615 int
616 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
617 {
618 	int	ret = 0;
619 	struct	timeval cur;
620 	timespec_t curtime;
621 
622 	(void) gettimeofday(&cur, NULL);
623 	curtime.tv_sec = cur.tv_sec;
624 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
625 
626 	if (end->tv_sec >= curtime.tv_sec) {
627 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
628 		if (end->tv_nsec >= curtime.tv_nsec) {
629 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
630 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
631 				ret = -1;	/* timer expired */
632 		} else {
633 			if (end->tv_sec > curtime.tv_sec) {
634 				wait->tv_sec -= 1;
635 				wait->tv_nsec = NANOSEC -
636 				    (curtime.tv_nsec - end->tv_nsec);
637 			} else {
638 				ret = -1;	/* timer expired */
639 			}
640 		}
641 	} else {
642 		ret = -1;
643 	}
644 	return (ret);
645 }
646 
647 /*
648  * If closing by file descriptor: we will simply cancel all the outstanding
649  * aio`s and return.  Those aio's in question will have either noticed the
650  * cancellation notice before, during, or after initiating io.
651  */
652 int
653 aiocancel_all(int fd)
654 {
655 	aio_req_t *reqp;
656 	aio_req_t **reqpp;
657 	aio_worker_t *first;
658 	aio_worker_t *next;
659 	int canceled = 0;
660 	int done = 0;
661 	int cancelall = 0;
662 
663 	sig_mutex_lock(&__aio_mutex);
664 
665 	if (_aio_outstand_cnt == 0) {
666 		sig_mutex_unlock(&__aio_mutex);
667 		return (AIO_ALLDONE);
668 	}
669 
670 	/*
671 	 * Cancel requests from the read/write workers' queues.
672 	 */
673 	first = __nextworker_rw;
674 	next = first;
675 	do {
676 		_aio_cancel_work(next, fd, &canceled, &done);
677 	} while ((next = next->work_forw) != first);
678 
679 	/*
680 	 * finally, check if there are requests on the done queue that
681 	 * should be canceled.
682 	 */
683 	if (fd < 0)
684 		cancelall = 1;
685 	reqpp = &_aio_done_tail;
686 	while ((reqp = *reqpp) != NULL) {
687 		if (cancelall || reqp->req_args.fd == fd) {
688 			*reqpp = reqp->req_next;
689 			_aio_donecnt--;
690 			(void) _aio_hash_del(reqp->req_resultp);
691 			_aio_req_free(reqp);
692 		} else
693 			reqpp = &reqp->req_next;
694 	}
695 	if (cancelall) {
696 		ASSERT(_aio_donecnt == 0);
697 		_aio_done_head = NULL;
698 	}
699 	sig_mutex_unlock(&__aio_mutex);
700 
701 	if (canceled && done == 0)
702 		return (AIO_CANCELED);
703 	else if (done && canceled == 0)
704 		return (AIO_ALLDONE);
705 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
706 		return ((int)_kaio(AIOCANCEL, fd, NULL));
707 	return (AIO_NOTCANCELED);
708 }
709 
710 /*
711  * Cancel requests from a given work queue.  If the file descriptor
712  * parameter, fd, is non-negative, then only cancel those requests
713  * in this queue that are to this file descriptor.  If the fd
714  * parameter is -1, then cancel all requests.
715  */
716 static void
717 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
718 {
719 	aio_req_t *reqp;
720 
721 	sig_mutex_lock(&aiowp->work_qlock1);
722 	/*
723 	 * cancel queued requests first.
724 	 */
725 	reqp = aiowp->work_tail1;
726 	while (reqp != NULL) {
727 		if (fd < 0 || reqp->req_args.fd == fd) {
728 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
729 				/*
730 				 * Callers locks were dropped.
731 				 * reqp is invalid; start traversing
732 				 * the list from the beginning again.
733 				 */
734 				reqp = aiowp->work_tail1;
735 				continue;
736 			}
737 		}
738 		reqp = reqp->req_next;
739 	}
740 	/*
741 	 * Since the queued requests have been canceled, there can
742 	 * only be one inprogress request that should be canceled.
743 	 */
744 	if ((reqp = aiowp->work_req) != NULL &&
745 	    (fd < 0 || reqp->req_args.fd == fd))
746 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
747 	sig_mutex_unlock(&aiowp->work_qlock1);
748 }
749 
750 /*
751  * Cancel a request.  Return 1 if the callers locks were temporarily
752  * dropped, otherwise return 0.
753  */
754 int
755 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
756 {
757 	int ostate = reqp->req_state;
758 
759 	ASSERT(MUTEX_HELD(&__aio_mutex));
760 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
761 	if (ostate == AIO_REQ_CANCELED)
762 		return (0);
763 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
764 		(*done)++;
765 		return (0);
766 	}
767 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
768 		ASSERT(POSIX_AIO(reqp));
769 		/* Cancel the queued aio_fsync() request */
770 		if (!reqp->req_head->lio_canned) {
771 			reqp->req_head->lio_canned = 1;
772 			_aio_outstand_cnt--;
773 			(*canceled)++;
774 		}
775 		return (0);
776 	}
777 	reqp->req_state = AIO_REQ_CANCELED;
778 	_aio_req_del(aiowp, reqp, ostate);
779 	(void) _aio_hash_del(reqp->req_resultp);
780 	(*canceled)++;
781 	if (reqp == aiowp->work_req) {
782 		ASSERT(ostate == AIO_REQ_INPROGRESS);
783 		/*
784 		 * Set the result values now, before _aiodone() is called.
785 		 * We do this because the application can expect aio_return
786 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
787 		 * immediately after a successful return from aiocancel()
788 		 * or aio_cancel().
789 		 */
790 		_aio_set_result(reqp, -1, ECANCELED);
791 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
792 		return (0);
793 	}
794 	if (!POSIX_AIO(reqp)) {
795 		_aio_outstand_cnt--;
796 		_aio_set_result(reqp, -1, ECANCELED);
797 		return (0);
798 	}
799 	sig_mutex_unlock(&aiowp->work_qlock1);
800 	sig_mutex_unlock(&__aio_mutex);
801 	_aiodone(reqp, -1, ECANCELED);
802 	sig_mutex_lock(&__aio_mutex);
803 	sig_mutex_lock(&aiowp->work_qlock1);
804 	return (1);
805 }
806 
807 int
808 _aio_create_worker(aio_req_t *reqp, int mode)
809 {
810 	aio_worker_t *aiowp, **workers, **nextworker;
811 	int *aio_workerscnt;
812 	void *(*func)(void *);
813 	sigset_t oset;
814 	int error;
815 
816 	/*
817 	 * Put the new worker thread in the right queue.
818 	 */
819 	switch (mode) {
820 	case AIOREAD:
821 	case AIOWRITE:
822 	case AIOAREAD:
823 	case AIOAWRITE:
824 #if !defined(_LP64)
825 	case AIOAREAD64:
826 	case AIOAWRITE64:
827 #endif
828 		workers = &__workers_rw;
829 		nextworker = &__nextworker_rw;
830 		aio_workerscnt = &__rw_workerscnt;
831 		func = _aio_do_request;
832 		break;
833 	case AIONOTIFY:
834 		workers = &__workers_no;
835 		nextworker = &__nextworker_no;
836 		func = _aio_do_notify;
837 		aio_workerscnt = &__no_workerscnt;
838 		break;
839 	default:
840 		aio_panic("_aio_create_worker: invalid mode");
841 		break;
842 	}
843 
844 	if ((aiowp = _aio_worker_alloc()) == NULL)
845 		return (-1);
846 
847 	if (reqp) {
848 		reqp->req_state = AIO_REQ_QUEUED;
849 		reqp->req_worker = aiowp;
850 		aiowp->work_head1 = reqp;
851 		aiowp->work_tail1 = reqp;
852 		aiowp->work_next1 = reqp;
853 		aiowp->work_count1 = 1;
854 		aiowp->work_minload1 = 1;
855 	}
856 
857 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
858 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
859 		THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
860 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
861 	if (error) {
862 		if (reqp) {
863 			reqp->req_state = 0;
864 			reqp->req_worker = NULL;
865 		}
866 		_aio_worker_free(aiowp);
867 		return (-1);
868 	}
869 
870 	lmutex_lock(&__aio_mutex);
871 	(*aio_workerscnt)++;
872 	if (*workers == NULL) {
873 		aiowp->work_forw = aiowp;
874 		aiowp->work_backw = aiowp;
875 		*nextworker = aiowp;
876 		*workers = aiowp;
877 	} else {
878 		aiowp->work_backw = (*workers)->work_backw;
879 		aiowp->work_forw = (*workers);
880 		(*workers)->work_backw->work_forw = aiowp;
881 		(*workers)->work_backw = aiowp;
882 	}
883 	_aio_worker_cnt++;
884 	lmutex_unlock(&__aio_mutex);
885 
886 	(void) thr_continue(aiowp->work_tid);
887 
888 	return (0);
889 }
890 
891 /*
892  * This is the worker's main routine.
893  * The task of this function is to execute all queued requests;
894  * once the last pending request is executed this function will block
895  * in _aio_idle().  A new incoming request must wakeup this thread to
896  * restart the work.
897  * Every worker has an own work queue.  The queue lock is required
898  * to synchronize the addition of new requests for this worker or
899  * cancellation of pending/running requests.
900  *
901  * Cancellation scenarios:
902  * The cancellation of a request is being done asynchronously using
903  * _aio_cancel_req() from another thread context.
904  * A queued request can be cancelled in different manners :
905  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
906  *	- lock the queue -> remove the request -> unlock the queue
907  *	- this function/thread does not detect this cancellation process
908  * b) request is in progress (AIO_REQ_INPROGRESS) :
909  *	- this function first allow the cancellation of the running
910  *	  request with the flag "work_cancel_flg=1"
911  * 		see _aio_req_get() -> _aio_cancel_on()
912  *	  During this phase, it is allowed to interrupt the worker
913  *	  thread running the request (this thread) using the SIGAIOCANCEL
914  *	  signal.
915  *	  Once this thread returns from the kernel (because the request
916  *	  is just done), then it must disable a possible cancellation
917  *	  and proceed to finish the request.  To disable the cancellation
918  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
919  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
920  *	  same procedure as in a)
921  *
922  * To b)
923  *	This thread uses sigsetjmp() to define the position in the code, where
924  *	it wish to continue working in the case that a SIGAIOCANCEL signal
925  *	is detected.
926  *	Normally this thread should get the cancellation signal during the
927  *	kernel phase (reading or writing).  In that case the signal handler
928  *	aiosigcancelhndlr() is activated using the worker thread context,
929  *	which again will use the siglongjmp() function to break the standard
930  *	code flow and jump to the "sigsetjmp" position, provided that
931  *	"work_cancel_flg" is set to "1".
932  *	Because the "work_cancel_flg" is only manipulated by this worker
933  *	thread and it can only run on one CPU at a given time, it is not
934  *	necessary to protect that flag with the queue lock.
935  *	Returning from the kernel (read or write system call) we must
936  *	first disable the use of the SIGAIOCANCEL signal and accordingly
937  *	the use of the siglongjmp() function to prevent a possible deadlock:
938  *	- It can happens that this worker thread returns from the kernel and
939  *	  blocks in "work_qlock1",
940  *	- then a second thread cancels the apparently "in progress" request
941  *	  and sends the SIGAIOCANCEL signal to the worker thread,
942  *	- the worker thread gets assigned the "work_qlock1" and will returns
943  *	  from the kernel,
944  *	- the kernel detects the pending signal and activates the signal
945  *	  handler instead,
946  *	- if the "work_cancel_flg" is still set then the signal handler
947  *	  should use siglongjmp() to cancel the "in progress" request and
948  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
949  *	  for a second time => deadlock.
950  *	To avoid that situation we disable the cancellation of the request
951  *	in progress BEFORE we try to acquire the work_qlock1.
952  *	In that case the signal handler will not call siglongjmp() and the
953  *	worker thread will continue running the standard code flow.
954  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
955  *	an eventually required siglongjmp() freeing the work_qlock1 and
956  *	avoiding a deadlock.
957  */
958 void *
959 _aio_do_request(void *arglist)
960 {
961 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
962 	ulwp_t *self = curthread;
963 	struct aio_args *arg;
964 	aio_req_t *reqp;		/* current AIO request */
965 	ssize_t retval;
966 	int error;
967 
968 	if (pthread_setspecific(_aio_key, aiowp) != 0)
969 		aio_panic("_aio_do_request, pthread_setspecific()");
970 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
971 	ASSERT(aiowp->work_req == NULL);
972 
973 	/*
974 	 * We resume here when an operation is cancelled.
975 	 * On first entry, aiowp->work_req == NULL, so all
976 	 * we do is block SIGAIOCANCEL.
977 	 */
978 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
979 	ASSERT(self->ul_sigdefer == 0);
980 
981 	sigoff(self);	/* block SIGAIOCANCEL */
982 	if (aiowp->work_req != NULL)
983 		_aio_finish_request(aiowp, -1, ECANCELED);
984 
985 	for (;;) {
986 		/*
987 		 * Put completed requests on aio_done_list.  This has
988 		 * to be done as part of the main loop to ensure that
989 		 * we don't artificially starve any aiowait'ers.
990 		 */
991 		if (aiowp->work_done1)
992 			_aio_work_done(aiowp);
993 
994 top:
995 		/* consume any deferred SIGAIOCANCEL signal here */
996 		sigon(self);
997 		sigoff(self);
998 
999 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1000 			if (_aio_idle(aiowp) != 0)
1001 				goto top;
1002 		}
1003 		arg = &reqp->req_args;
1004 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1005 		    reqp->req_state == AIO_REQ_CANCELED);
1006 		error = 0;
1007 
1008 		switch (reqp->req_op) {
1009 		case AIOREAD:
1010 		case AIOAREAD:
1011 			sigon(self);	/* unblock SIGAIOCANCEL */
1012 			retval = pread(arg->fd, arg->buf,
1013 			    arg->bufsz, arg->offset);
1014 			if (retval == -1) {
1015 				if (errno == ESPIPE) {
1016 					retval = read(arg->fd,
1017 					    arg->buf, arg->bufsz);
1018 					if (retval == -1)
1019 						error = errno;
1020 				} else {
1021 					error = errno;
1022 				}
1023 			}
1024 			sigoff(self);	/* block SIGAIOCANCEL */
1025 			break;
1026 		case AIOWRITE:
1027 		case AIOAWRITE:
1028 			sigon(self);	/* unblock SIGAIOCANCEL */
1029 			retval = pwrite(arg->fd, arg->buf,
1030 			    arg->bufsz, arg->offset);
1031 			if (retval == -1) {
1032 				if (errno == ESPIPE) {
1033 					retval = write(arg->fd,
1034 					    arg->buf, arg->bufsz);
1035 					if (retval == -1)
1036 						error = errno;
1037 				} else {
1038 					error = errno;
1039 				}
1040 			}
1041 			sigoff(self);	/* block SIGAIOCANCEL */
1042 			break;
1043 #if !defined(_LP64)
1044 		case AIOAREAD64:
1045 			sigon(self);	/* unblock SIGAIOCANCEL */
1046 			retval = pread64(arg->fd, arg->buf,
1047 			    arg->bufsz, arg->offset);
1048 			if (retval == -1) {
1049 				if (errno == ESPIPE) {
1050 					retval = read(arg->fd,
1051 					    arg->buf, arg->bufsz);
1052 					if (retval == -1)
1053 						error = errno;
1054 				} else {
1055 					error = errno;
1056 				}
1057 			}
1058 			sigoff(self);	/* block SIGAIOCANCEL */
1059 			break;
1060 		case AIOAWRITE64:
1061 			sigon(self);	/* unblock SIGAIOCANCEL */
1062 			retval = pwrite64(arg->fd, arg->buf,
1063 			    arg->bufsz, arg->offset);
1064 			if (retval == -1) {
1065 				if (errno == ESPIPE) {
1066 					retval = write(arg->fd,
1067 					    arg->buf, arg->bufsz);
1068 					if (retval == -1)
1069 						error = errno;
1070 				} else {
1071 					error = errno;
1072 				}
1073 			}
1074 			sigoff(self);	/* block SIGAIOCANCEL */
1075 			break;
1076 #endif	/* !defined(_LP64) */
1077 		case AIOFSYNC:
1078 			if (_aio_fsync_del(aiowp, reqp))
1079 				goto top;
1080 			ASSERT(reqp->req_head == NULL);
1081 			/*
1082 			 * All writes for this fsync request are now
1083 			 * acknowledged.  Now make these writes visible
1084 			 * and put the final request into the hash table.
1085 			 */
1086 			if (reqp->req_state == AIO_REQ_CANCELED) {
1087 				/* EMPTY */;
1088 			} else if (arg->offset == O_SYNC) {
1089 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
1090 					error = errno;
1091 			} else {
1092 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
1093 					error = errno;
1094 			}
1095 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1096 				aio_panic("_aio_do_request(): AIOFSYNC: "
1097 				    "request already in hash table");
1098 			break;
1099 		default:
1100 			aio_panic("_aio_do_request, bad op");
1101 		}
1102 
1103 		_aio_finish_request(aiowp, retval, error);
1104 	}
1105 	/* NOTREACHED */
1106 	return (NULL);
1107 }
1108 
1109 /*
1110  * Perform the tail processing for _aio_do_request().
1111  * The in-progress request may or may not have been cancelled.
1112  */
1113 static void
1114 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1115 {
1116 	aio_req_t *reqp;
1117 
1118 	sig_mutex_lock(&aiowp->work_qlock1);
1119 	if ((reqp = aiowp->work_req) == NULL)
1120 		sig_mutex_unlock(&aiowp->work_qlock1);
1121 	else {
1122 		aiowp->work_req = NULL;
1123 		if (reqp->req_state == AIO_REQ_CANCELED) {
1124 			retval = -1;
1125 			error = ECANCELED;
1126 		}
1127 		if (!POSIX_AIO(reqp)) {
1128 			sig_mutex_unlock(&aiowp->work_qlock1);
1129 			sig_mutex_lock(&__aio_mutex);
1130 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1131 				reqp->req_state = AIO_REQ_DONE;
1132 			_aio_req_done_cnt++;
1133 			_aio_set_result(reqp, retval, error);
1134 			if (error == ECANCELED)
1135 				_aio_outstand_cnt--;
1136 			sig_mutex_unlock(&__aio_mutex);
1137 		} else {
1138 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1139 				reqp->req_state = AIO_REQ_DONE;
1140 			sig_mutex_unlock(&aiowp->work_qlock1);
1141 			_aiodone(reqp, retval, error);
1142 		}
1143 	}
1144 }
1145 
1146 void
1147 _aio_req_mark_done(aio_req_t *reqp)
1148 {
1149 #if !defined(_LP64)
1150 	if (reqp->req_largefile)
1151 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1152 	else
1153 #endif
1154 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1155 }
1156 
1157 /*
1158  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1159  * hopefully to consume one of our queued signals.
1160  */
1161 static void
1162 _aio_delay(int ticks)
1163 {
1164 	(void) usleep(ticks * (MICROSEC / hz));
1165 }
1166 
1167 /*
1168  * Actually send the notifications.
1169  * We could block indefinitely here if the application
1170  * is not listening for the signal or port notifications.
1171  */
1172 static void
1173 send_notification(notif_param_t *npp)
1174 {
1175 	extern int __sigqueue(pid_t pid, int signo,
1176 		/* const union sigval */ void *value, int si_code, int block);
1177 
1178 	if (npp->np_signo)
1179 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1180 		    SI_ASYNCIO, 1);
1181 	else if (npp->np_port >= 0)
1182 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1183 		    npp->np_event, npp->np_object, npp->np_user);
1184 
1185 	if (npp->np_lio_signo)
1186 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1187 		    SI_ASYNCIO, 1);
1188 	else if (npp->np_lio_port >= 0)
1189 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1190 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1191 }
1192 
1193 /*
1194  * Asynchronous notification worker.
1195  */
1196 void *
1197 _aio_do_notify(void *arg)
1198 {
1199 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1200 	aio_req_t *reqp;
1201 
1202 	/*
1203 	 * This isn't really necessary.  All signals are blocked.
1204 	 */
1205 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1206 		aio_panic("_aio_do_notify, pthread_setspecific()");
1207 
1208 	/*
1209 	 * Notifications are never cancelled.
1210 	 * All signals remain blocked, forever.
1211 	 */
1212 	for (;;) {
1213 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1214 			if (_aio_idle(aiowp) != 0)
1215 				aio_panic("_aio_do_notify: _aio_idle() failed");
1216 		}
1217 		send_notification(&reqp->req_notify);
1218 		_aio_req_free(reqp);
1219 	}
1220 
1221 	/* NOTREACHED */
1222 	return (NULL);
1223 }
1224 
1225 /*
1226  * Do the completion semantics for a request that was either canceled
1227  * by _aio_cancel_req() or was completed by _aio_do_request().
1228  */
1229 static void
1230 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1231 {
1232 	aio_result_t *resultp = reqp->req_resultp;
1233 	int notify = 0;
1234 	aio_lio_t *head;
1235 	int sigev_none;
1236 	int sigev_signal;
1237 	int sigev_thread;
1238 	int sigev_port;
1239 	notif_param_t np;
1240 
1241 	/*
1242 	 * We call _aiodone() only for Posix I/O.
1243 	 */
1244 	ASSERT(POSIX_AIO(reqp));
1245 
1246 	sigev_none = 0;
1247 	sigev_signal = 0;
1248 	sigev_thread = 0;
1249 	sigev_port = 0;
1250 	np.np_signo = 0;
1251 	np.np_port = -1;
1252 	np.np_lio_signo = 0;
1253 	np.np_lio_port = -1;
1254 
1255 	switch (reqp->req_sigevent.sigev_notify) {
1256 	case SIGEV_NONE:
1257 		sigev_none = 1;
1258 		break;
1259 	case SIGEV_SIGNAL:
1260 		sigev_signal = 1;
1261 		break;
1262 	case SIGEV_THREAD:
1263 		sigev_thread = 1;
1264 		break;
1265 	case SIGEV_PORT:
1266 		sigev_port = 1;
1267 		break;
1268 	default:
1269 		aio_panic("_aiodone: improper sigev_notify");
1270 		break;
1271 	}
1272 
1273 	/*
1274 	 * Figure out the notification parameters while holding __aio_mutex.
1275 	 * Actually perform the notifications after dropping __aio_mutex.
1276 	 * This allows us to sleep for a long time (if the notifications
1277 	 * incur delays) without impeding other async I/O operations.
1278 	 */
1279 
1280 	sig_mutex_lock(&__aio_mutex);
1281 
1282 	if (sigev_signal) {
1283 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1284 			notify = 1;
1285 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1286 	} else if (sigev_thread | sigev_port) {
1287 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1288 			notify = 1;
1289 		np.np_event = reqp->req_op;
1290 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1291 			np.np_event = AIOFSYNC64;
1292 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1293 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1294 	}
1295 
1296 	if (resultp->aio_errno == EINPROGRESS)
1297 		_aio_set_result(reqp, retval, error);
1298 
1299 	_aio_outstand_cnt--;
1300 
1301 	head = reqp->req_head;
1302 	reqp->req_head = NULL;
1303 
1304 	if (sigev_none) {
1305 		_aio_enq_doneq(reqp);
1306 		reqp = NULL;
1307 	} else {
1308 		(void) _aio_hash_del(resultp);
1309 		_aio_req_mark_done(reqp);
1310 	}
1311 
1312 	_aio_waitn_wakeup();
1313 
1314 	/*
1315 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1316 	 * __aio_suspend() increments "_aio_kernel_suspend"
1317 	 * when they are waiting in the kernel for completed I/Os.
1318 	 *
1319 	 * _kaio(AIONOTIFY) awakes the corresponding function
1320 	 * in the kernel; then the corresponding __aio_waitn() or
1321 	 * __aio_suspend() function could reap the recently
1322 	 * completed I/Os (_aiodone()).
1323 	 */
1324 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1325 		(void) _kaio(AIONOTIFY);
1326 
1327 	sig_mutex_unlock(&__aio_mutex);
1328 
1329 	if (head != NULL) {
1330 		/*
1331 		 * If all the lio requests have completed,
1332 		 * prepare to notify the waiting thread.
1333 		 */
1334 		sig_mutex_lock(&head->lio_mutex);
1335 		ASSERT(head->lio_refcnt == head->lio_nent);
1336 		if (head->lio_refcnt == 1) {
1337 			int waiting = 0;
1338 			if (head->lio_mode == LIO_WAIT) {
1339 				if ((waiting = head->lio_waiting) != 0)
1340 					(void) cond_signal(&head->lio_cond_cv);
1341 			} else if (head->lio_port < 0) { /* none or signal */
1342 				if ((np.np_lio_signo = head->lio_signo) != 0)
1343 					notify = 1;
1344 				np.np_lio_user = head->lio_sigval.sival_ptr;
1345 			} else {			/* thread or port */
1346 				notify = 1;
1347 				np.np_lio_port = head->lio_port;
1348 				np.np_lio_event = head->lio_event;
1349 				np.np_lio_object =
1350 				    (uintptr_t)head->lio_sigevent;
1351 				np.np_lio_user = head->lio_sigval.sival_ptr;
1352 			}
1353 			head->lio_nent = head->lio_refcnt = 0;
1354 			sig_mutex_unlock(&head->lio_mutex);
1355 			if (waiting == 0)
1356 				_aio_lio_free(head);
1357 		} else {
1358 			head->lio_nent--;
1359 			head->lio_refcnt--;
1360 			sig_mutex_unlock(&head->lio_mutex);
1361 		}
1362 	}
1363 
1364 	/*
1365 	 * The request is completed; now perform the notifications.
1366 	 */
1367 	if (notify) {
1368 		if (reqp != NULL) {
1369 			/*
1370 			 * We usually put the request on the notification
1371 			 * queue because we don't want to block and delay
1372 			 * other operations behind us in the work queue.
1373 			 * Also we must never block on a cancel notification
1374 			 * because we are being called from an application
1375 			 * thread in this case and that could lead to deadlock
1376 			 * if no other thread is receiving notificatins.
1377 			 */
1378 			reqp->req_notify = np;
1379 			reqp->req_op = AIONOTIFY;
1380 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1381 			reqp = NULL;
1382 		} else {
1383 			/*
1384 			 * We already put the request on the done queue,
1385 			 * so we can't queue it to the notification queue.
1386 			 * Just do the notification directly.
1387 			 */
1388 			send_notification(&np);
1389 		}
1390 	}
1391 
1392 	if (reqp != NULL)
1393 		_aio_req_free(reqp);
1394 }
1395 
1396 /*
1397  * Delete fsync requests from list head until there is
1398  * only one left.  Return 0 when there is only one,
1399  * otherwise return a non-zero value.
1400  */
1401 static int
1402 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1403 {
1404 	aio_lio_t *head = reqp->req_head;
1405 	int rval = 0;
1406 
1407 	ASSERT(reqp == aiowp->work_req);
1408 	sig_mutex_lock(&aiowp->work_qlock1);
1409 	sig_mutex_lock(&head->lio_mutex);
1410 	if (head->lio_refcnt > 1) {
1411 		head->lio_refcnt--;
1412 		head->lio_nent--;
1413 		aiowp->work_req = NULL;
1414 		sig_mutex_unlock(&head->lio_mutex);
1415 		sig_mutex_unlock(&aiowp->work_qlock1);
1416 		sig_mutex_lock(&__aio_mutex);
1417 		_aio_outstand_cnt--;
1418 		_aio_waitn_wakeup();
1419 		sig_mutex_unlock(&__aio_mutex);
1420 		_aio_req_free(reqp);
1421 		return (1);
1422 	}
1423 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1424 	reqp->req_head = NULL;
1425 	if (head->lio_canned)
1426 		reqp->req_state = AIO_REQ_CANCELED;
1427 	if (head->lio_mode == LIO_DESTROY) {
1428 		aiowp->work_req = NULL;
1429 		rval = 1;
1430 	}
1431 	sig_mutex_unlock(&head->lio_mutex);
1432 	sig_mutex_unlock(&aiowp->work_qlock1);
1433 	head->lio_refcnt--;
1434 	head->lio_nent--;
1435 	_aio_lio_free(head);
1436 	if (rval != 0)
1437 		_aio_req_free(reqp);
1438 	return (rval);
1439 }
1440 
1441 /*
1442  * A worker is set idle when its work queue is empty.
1443  * The worker checks again that it has no more work
1444  * and then goes to sleep waiting for more work.
1445  */
1446 int
1447 _aio_idle(aio_worker_t *aiowp)
1448 {
1449 	int error = 0;
1450 
1451 	sig_mutex_lock(&aiowp->work_qlock1);
1452 	if (aiowp->work_count1 == 0) {
1453 		ASSERT(aiowp->work_minload1 == 0);
1454 		aiowp->work_idleflg = 1;
1455 		/*
1456 		 * A cancellation handler is not needed here.
1457 		 * aio worker threads are never cancelled via pthread_cancel().
1458 		 */
1459 		error = sig_cond_wait(&aiowp->work_idle_cv,
1460 		    &aiowp->work_qlock1);
1461 		/*
1462 		 * The idle flag is normally cleared before worker is awakened
1463 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1464 		 */
1465 		if (error)
1466 			aiowp->work_idleflg = 0;
1467 	}
1468 	sig_mutex_unlock(&aiowp->work_qlock1);
1469 	return (error);
1470 }
1471 
1472 /*
1473  * A worker's completed AIO requests are placed onto a global
1474  * done queue.  The application is only sent a SIGIO signal if
1475  * the process has a handler enabled and it is not waiting via
1476  * aiowait().
1477  */
1478 static void
1479 _aio_work_done(aio_worker_t *aiowp)
1480 {
1481 	aio_req_t *reqp;
1482 
1483 	sig_mutex_lock(&aiowp->work_qlock1);
1484 	reqp = aiowp->work_prev1;
1485 	reqp->req_next = NULL;
1486 	aiowp->work_done1 = 0;
1487 	aiowp->work_tail1 = aiowp->work_next1;
1488 	if (aiowp->work_tail1 == NULL)
1489 		aiowp->work_head1 = NULL;
1490 	aiowp->work_prev1 = NULL;
1491 	sig_mutex_unlock(&aiowp->work_qlock1);
1492 	sig_mutex_lock(&__aio_mutex);
1493 	_aio_donecnt++;
1494 	_aio_outstand_cnt--;
1495 	_aio_req_done_cnt--;
1496 	ASSERT(_aio_donecnt > 0 &&
1497 	    _aio_outstand_cnt >= 0 &&
1498 	    _aio_req_done_cnt >= 0);
1499 	ASSERT(reqp != NULL);
1500 
1501 	if (_aio_done_tail == NULL) {
1502 		_aio_done_head = _aio_done_tail = reqp;
1503 	} else {
1504 		_aio_done_head->req_next = reqp;
1505 		_aio_done_head = reqp;
1506 	}
1507 
1508 	if (_aiowait_flag) {
1509 		sig_mutex_unlock(&__aio_mutex);
1510 		(void) _kaio(AIONOTIFY);
1511 	} else {
1512 		sig_mutex_unlock(&__aio_mutex);
1513 		if (_sigio_enabled)
1514 			(void) kill(__pid, SIGIO);
1515 	}
1516 }
1517 
1518 /*
1519  * The done queue consists of AIO requests that are in either the
1520  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1521  * are discarded.  If the done queue is empty then NULL is returned.
1522  * Otherwise the address of a done aio_result_t is returned.
1523  */
1524 aio_result_t *
1525 _aio_req_done(void)
1526 {
1527 	aio_req_t *reqp;
1528 	aio_result_t *resultp;
1529 
1530 	ASSERT(MUTEX_HELD(&__aio_mutex));
1531 
1532 	if ((reqp = _aio_done_tail) != NULL) {
1533 		if ((_aio_done_tail = reqp->req_next) == NULL)
1534 			_aio_done_head = NULL;
1535 		ASSERT(_aio_donecnt > 0);
1536 		_aio_donecnt--;
1537 		(void) _aio_hash_del(reqp->req_resultp);
1538 		resultp = reqp->req_resultp;
1539 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1540 		_aio_req_free(reqp);
1541 		return (resultp);
1542 	}
1543 	/* is queue empty? */
1544 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1545 		return ((aio_result_t *)-1);
1546 	}
1547 	return (NULL);
1548 }
1549 
1550 /*
1551  * Set the return and errno values for the application's use.
1552  *
1553  * For the Posix interfaces, we must set the return value first followed
1554  * by the errno value because the Posix interfaces allow for a change
1555  * in the errno value from EINPROGRESS to something else to signal
1556  * the completion of the asynchronous request.
1557  *
1558  * The opposite is true for the Solaris interfaces.  These allow for
1559  * a change in the return value from AIO_INPROGRESS to something else
1560  * to signal the completion of the asynchronous request.
1561  */
1562 void
1563 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1564 {
1565 	aio_result_t *resultp = reqp->req_resultp;
1566 
1567 	if (POSIX_AIO(reqp)) {
1568 		resultp->aio_return = retval;
1569 		membar_producer();
1570 		resultp->aio_errno = error;
1571 	} else {
1572 		resultp->aio_errno = error;
1573 		membar_producer();
1574 		resultp->aio_return = retval;
1575 	}
1576 }
1577 
1578 /*
1579  * Add an AIO request onto the next work queue.
1580  * A circular list of workers is used to choose the next worker.
1581  */
1582 void
1583 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1584 {
1585 	ulwp_t *self = curthread;
1586 	aio_worker_t *aiowp;
1587 	aio_worker_t *first;
1588 	int load_bal_flg = 1;
1589 	int found;
1590 
1591 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1592 	reqp->req_next = NULL;
1593 	/*
1594 	 * Try to acquire the next worker's work queue.  If it is locked,
1595 	 * then search the list of workers until a queue is found unlocked,
1596 	 * or until the list is completely traversed at which point another
1597 	 * worker will be created.
1598 	 */
1599 	sigoff(self);		/* defer SIGIO */
1600 	sig_mutex_lock(&__aio_mutex);
1601 	first = aiowp = *nextworker;
1602 	if (mode != AIONOTIFY)
1603 		_aio_outstand_cnt++;
1604 	sig_mutex_unlock(&__aio_mutex);
1605 
1606 	switch (mode) {
1607 	case AIOREAD:
1608 	case AIOWRITE:
1609 	case AIOAREAD:
1610 	case AIOAWRITE:
1611 #if !defined(_LP64)
1612 	case AIOAREAD64:
1613 	case AIOAWRITE64:
1614 #endif
1615 		/* try to find an idle worker */
1616 		found = 0;
1617 		do {
1618 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1619 				if (aiowp->work_idleflg) {
1620 					found = 1;
1621 					break;
1622 				}
1623 				sig_mutex_unlock(&aiowp->work_qlock1);
1624 			}
1625 		} while ((aiowp = aiowp->work_forw) != first);
1626 
1627 		if (found) {
1628 			aiowp->work_minload1++;
1629 			break;
1630 		}
1631 
1632 		/* try to acquire some worker's queue lock */
1633 		do {
1634 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1635 				found = 1;
1636 				break;
1637 			}
1638 		} while ((aiowp = aiowp->work_forw) != first);
1639 
1640 		/*
1641 		 * Create more workers when the workers appear overloaded.
1642 		 * Either all the workers are busy draining their queues
1643 		 * or no worker's queue lock could be acquired.
1644 		 */
1645 		if (!found) {
1646 			if (_aio_worker_cnt < _max_workers) {
1647 				if (_aio_create_worker(reqp, mode))
1648 					aio_panic("_aio_req_add: add worker");
1649 				sigon(self);	/* reenable SIGIO */
1650 				return;
1651 			}
1652 
1653 			/*
1654 			 * No worker available and we have created
1655 			 * _max_workers, keep going through the
1656 			 * list slowly until we get a lock
1657 			 */
1658 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1659 				/*
1660 				 * give someone else a chance
1661 				 */
1662 				_aio_delay(1);
1663 				aiowp = aiowp->work_forw;
1664 			}
1665 		}
1666 
1667 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1668 		if (_aio_worker_cnt < _max_workers &&
1669 		    aiowp->work_minload1 >= _minworkload) {
1670 			sig_mutex_unlock(&aiowp->work_qlock1);
1671 			sig_mutex_lock(&__aio_mutex);
1672 			*nextworker = aiowp->work_forw;
1673 			sig_mutex_unlock(&__aio_mutex);
1674 			if (_aio_create_worker(reqp, mode))
1675 				aio_panic("aio_req_add: add worker");
1676 			sigon(self);	/* reenable SIGIO */
1677 			return;
1678 		}
1679 		aiowp->work_minload1++;
1680 		break;
1681 	case AIOFSYNC:
1682 	case AIONOTIFY:
1683 		load_bal_flg = 0;
1684 		sig_mutex_lock(&aiowp->work_qlock1);
1685 		break;
1686 	default:
1687 		aio_panic("_aio_req_add: invalid mode");
1688 		break;
1689 	}
1690 	/*
1691 	 * Put request onto worker's work queue.
1692 	 */
1693 	if (aiowp->work_tail1 == NULL) {
1694 		ASSERT(aiowp->work_count1 == 0);
1695 		aiowp->work_tail1 = reqp;
1696 		aiowp->work_next1 = reqp;
1697 	} else {
1698 		aiowp->work_head1->req_next = reqp;
1699 		if (aiowp->work_next1 == NULL)
1700 			aiowp->work_next1 = reqp;
1701 	}
1702 	reqp->req_state = AIO_REQ_QUEUED;
1703 	reqp->req_worker = aiowp;
1704 	aiowp->work_head1 = reqp;
1705 	/*
1706 	 * Awaken worker if it is not currently active.
1707 	 */
1708 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1709 		aiowp->work_idleflg = 0;
1710 		(void) cond_signal(&aiowp->work_idle_cv);
1711 	}
1712 	sig_mutex_unlock(&aiowp->work_qlock1);
1713 
1714 	if (load_bal_flg) {
1715 		sig_mutex_lock(&__aio_mutex);
1716 		*nextworker = aiowp->work_forw;
1717 		sig_mutex_unlock(&__aio_mutex);
1718 	}
1719 	sigon(self);	/* reenable SIGIO */
1720 }
1721 
1722 /*
1723  * Get an AIO request for a specified worker.
1724  * If the work queue is empty, return NULL.
1725  */
1726 aio_req_t *
1727 _aio_req_get(aio_worker_t *aiowp)
1728 {
1729 	aio_req_t *reqp;
1730 
1731 	sig_mutex_lock(&aiowp->work_qlock1);
1732 	if ((reqp = aiowp->work_next1) != NULL) {
1733 		/*
1734 		 * Remove a POSIX request from the queue; the
1735 		 * request queue is a singularly linked list
1736 		 * with a previous pointer.  The request is
1737 		 * removed by updating the previous pointer.
1738 		 *
1739 		 * Non-posix requests are left on the queue
1740 		 * to eventually be placed on the done queue.
1741 		 */
1742 
1743 		if (POSIX_AIO(reqp)) {
1744 			if (aiowp->work_prev1 == NULL) {
1745 				aiowp->work_tail1 = reqp->req_next;
1746 				if (aiowp->work_tail1 == NULL)
1747 					aiowp->work_head1 = NULL;
1748 			} else {
1749 				aiowp->work_prev1->req_next = reqp->req_next;
1750 				if (aiowp->work_head1 == reqp)
1751 					aiowp->work_head1 = reqp->req_next;
1752 			}
1753 
1754 		} else {
1755 			aiowp->work_prev1 = reqp;
1756 			ASSERT(aiowp->work_done1 >= 0);
1757 			aiowp->work_done1++;
1758 		}
1759 		ASSERT(reqp != reqp->req_next);
1760 		aiowp->work_next1 = reqp->req_next;
1761 		ASSERT(aiowp->work_count1 >= 1);
1762 		aiowp->work_count1--;
1763 		switch (reqp->req_op) {
1764 		case AIOREAD:
1765 		case AIOWRITE:
1766 		case AIOAREAD:
1767 		case AIOAWRITE:
1768 #if !defined(_LP64)
1769 		case AIOAREAD64:
1770 		case AIOAWRITE64:
1771 #endif
1772 			ASSERT(aiowp->work_minload1 > 0);
1773 			aiowp->work_minload1--;
1774 			break;
1775 		}
1776 		reqp->req_state = AIO_REQ_INPROGRESS;
1777 	}
1778 	aiowp->work_req = reqp;
1779 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1780 	sig_mutex_unlock(&aiowp->work_qlock1);
1781 	return (reqp);
1782 }
1783 
1784 static void
1785 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1786 {
1787 	aio_req_t **last;
1788 	aio_req_t *lastrp;
1789 	aio_req_t *next;
1790 
1791 	ASSERT(aiowp != NULL);
1792 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1793 	if (POSIX_AIO(reqp)) {
1794 		if (ostate != AIO_REQ_QUEUED)
1795 			return;
1796 	}
1797 	last = &aiowp->work_tail1;
1798 	lastrp = aiowp->work_tail1;
1799 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1800 	while ((next = *last) != NULL) {
1801 		if (next == reqp) {
1802 			*last = next->req_next;
1803 			if (aiowp->work_next1 == next)
1804 				aiowp->work_next1 = next->req_next;
1805 
1806 			if ((next->req_next != NULL) ||
1807 			    (aiowp->work_done1 == 0)) {
1808 				if (aiowp->work_head1 == next)
1809 					aiowp->work_head1 = next->req_next;
1810 				if (aiowp->work_prev1 == next)
1811 					aiowp->work_prev1 = next->req_next;
1812 			} else {
1813 				if (aiowp->work_head1 == next)
1814 					aiowp->work_head1 = lastrp;
1815 				if (aiowp->work_prev1 == next)
1816 					aiowp->work_prev1 = lastrp;
1817 			}
1818 
1819 			if (ostate == AIO_REQ_QUEUED) {
1820 				ASSERT(aiowp->work_count1 >= 1);
1821 				aiowp->work_count1--;
1822 				ASSERT(aiowp->work_minload1 >= 1);
1823 				aiowp->work_minload1--;
1824 			} else {
1825 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1826 				    !POSIX_AIO(reqp));
1827 				aiowp->work_done1--;
1828 			}
1829 			return;
1830 		}
1831 		last = &next->req_next;
1832 		lastrp = next;
1833 	}
1834 	/* NOTREACHED */
1835 }
1836 
1837 static void
1838 _aio_enq_doneq(aio_req_t *reqp)
1839 {
1840 	if (_aio_doneq == NULL) {
1841 		_aio_doneq = reqp;
1842 		reqp->req_next = reqp->req_prev = reqp;
1843 	} else {
1844 		reqp->req_next = _aio_doneq;
1845 		reqp->req_prev = _aio_doneq->req_prev;
1846 		_aio_doneq->req_prev->req_next = reqp;
1847 		_aio_doneq->req_prev = reqp;
1848 	}
1849 	reqp->req_state = AIO_REQ_DONEQ;
1850 	_aio_doneq_cnt++;
1851 }
1852 
1853 /*
1854  * caller owns the _aio_mutex
1855  */
1856 aio_req_t *
1857 _aio_req_remove(aio_req_t *reqp)
1858 {
1859 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
1860 		return (NULL);
1861 
1862 	if (reqp) {
1863 		/* request in done queue */
1864 		if (_aio_doneq == reqp)
1865 			_aio_doneq = reqp->req_next;
1866 		if (_aio_doneq == reqp) {
1867 			/* only one request on queue */
1868 			_aio_doneq = NULL;
1869 		} else {
1870 			aio_req_t *tmp = reqp->req_next;
1871 			reqp->req_prev->req_next = tmp;
1872 			tmp->req_prev = reqp->req_prev;
1873 		}
1874 	} else if ((reqp = _aio_doneq) != NULL) {
1875 		if (reqp == reqp->req_next) {
1876 			/* only one request on queue */
1877 			_aio_doneq = NULL;
1878 		} else {
1879 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
1880 			_aio_doneq->req_prev = reqp->req_prev;
1881 		}
1882 	}
1883 	if (reqp) {
1884 		_aio_doneq_cnt--;
1885 		reqp->req_next = reqp->req_prev = reqp;
1886 		reqp->req_state = AIO_REQ_DONE;
1887 	}
1888 	return (reqp);
1889 }
1890 
1891 /*
1892  * An AIO request is identified by an aio_result_t pointer.  The library
1893  * maps this aio_result_t pointer to its internal representation using a
1894  * hash table.  This function adds an aio_result_t pointer to the hash table.
1895  */
1896 static int
1897 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
1898 {
1899 	aio_hash_t *hashp;
1900 	aio_req_t **prev;
1901 	aio_req_t *next;
1902 
1903 	hashp = _aio_hash + AIOHASH(resultp);
1904 	lmutex_lock(&hashp->hash_lock);
1905 	prev = &hashp->hash_ptr;
1906 	while ((next = *prev) != NULL) {
1907 		if (resultp == next->req_resultp) {
1908 			lmutex_unlock(&hashp->hash_lock);
1909 			return (-1);
1910 		}
1911 		prev = &next->req_link;
1912 	}
1913 	*prev = reqp;
1914 	ASSERT(reqp->req_link == NULL);
1915 	lmutex_unlock(&hashp->hash_lock);
1916 	return (0);
1917 }
1918 
1919 /*
1920  * Remove an entry from the hash table.
1921  */
1922 aio_req_t *
1923 _aio_hash_del(aio_result_t *resultp)
1924 {
1925 	aio_hash_t *hashp;
1926 	aio_req_t **prev;
1927 	aio_req_t *next = NULL;
1928 
1929 	if (_aio_hash != NULL) {
1930 		hashp = _aio_hash + AIOHASH(resultp);
1931 		lmutex_lock(&hashp->hash_lock);
1932 		prev = &hashp->hash_ptr;
1933 		while ((next = *prev) != NULL) {
1934 			if (resultp == next->req_resultp) {
1935 				*prev = next->req_link;
1936 				next->req_link = NULL;
1937 				break;
1938 			}
1939 			prev = &next->req_link;
1940 		}
1941 		lmutex_unlock(&hashp->hash_lock);
1942 	}
1943 	return (next);
1944 }
1945 
1946 /*
1947  *  find an entry in the hash table
1948  */
1949 aio_req_t *
1950 _aio_hash_find(aio_result_t *resultp)
1951 {
1952 	aio_hash_t *hashp;
1953 	aio_req_t **prev;
1954 	aio_req_t *next = NULL;
1955 
1956 	if (_aio_hash != NULL) {
1957 		hashp = _aio_hash + AIOHASH(resultp);
1958 		lmutex_lock(&hashp->hash_lock);
1959 		prev = &hashp->hash_ptr;
1960 		while ((next = *prev) != NULL) {
1961 			if (resultp == next->req_resultp)
1962 				break;
1963 			prev = &next->req_link;
1964 		}
1965 		lmutex_unlock(&hashp->hash_lock);
1966 	}
1967 	return (next);
1968 }
1969 
1970 /*
1971  * AIO interface for POSIX
1972  */
1973 int
1974 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
1975     int mode, int flg)
1976 {
1977 	aio_req_t *reqp;
1978 	aio_args_t *ap;
1979 	int kerr;
1980 
1981 	if (aiocbp == NULL) {
1982 		errno = EINVAL;
1983 		return (-1);
1984 	}
1985 
1986 	/* initialize kaio */
1987 	if (!_kaio_ok)
1988 		_kaio_init();
1989 
1990 	aiocbp->aio_state = NOCHECK;
1991 
1992 	/*
1993 	 * If we have been called because a list I/O
1994 	 * kaio() failed, we dont want to repeat the
1995 	 * system call
1996 	 */
1997 
1998 	if (flg & AIO_KAIO) {
1999 		/*
2000 		 * Try kernel aio first.
2001 		 * If errno is ENOTSUP/EBADFD,
2002 		 * fall back to the thread implementation.
2003 		 */
2004 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2005 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2006 			aiocbp->aio_state = CHECK;
2007 			kerr = (int)_kaio(mode, aiocbp);
2008 			if (kerr == 0)
2009 				return (0);
2010 			if (errno != ENOTSUP && errno != EBADFD) {
2011 				aiocbp->aio_resultp.aio_errno = errno;
2012 				aiocbp->aio_resultp.aio_return = -1;
2013 				aiocbp->aio_state = NOCHECK;
2014 				return (-1);
2015 			}
2016 			if (errno == EBADFD)
2017 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2018 		}
2019 	}
2020 
2021 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2022 	aiocbp->aio_state = USERAIO;
2023 
2024 	if (!__uaio_ok && __uaio_init() == -1)
2025 		return (-1);
2026 
2027 	if ((reqp = _aio_req_alloc()) == NULL) {
2028 		errno = EAGAIN;
2029 		return (-1);
2030 	}
2031 
2032 	/*
2033 	 * If an LIO request, add the list head to the aio request
2034 	 */
2035 	reqp->req_head = lio_head;
2036 	reqp->req_type = AIO_POSIX_REQ;
2037 	reqp->req_op = mode;
2038 	reqp->req_largefile = 0;
2039 
2040 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2041 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2042 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2043 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2044 		reqp->req_sigevent.sigev_signo =
2045 		    aiocbp->aio_sigevent.sigev_signo;
2046 		reqp->req_sigevent.sigev_value.sival_ptr =
2047 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2048 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2049 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2050 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2051 		/*
2052 		 * Reuse the sigevent structure to contain the port number
2053 		 * and the user value.  Same for SIGEV_THREAD, below.
2054 		 */
2055 		reqp->req_sigevent.sigev_signo =
2056 		    pn->portnfy_port;
2057 		reqp->req_sigevent.sigev_value.sival_ptr =
2058 		    pn->portnfy_user;
2059 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2060 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2061 		/*
2062 		 * The sigevent structure contains the port number
2063 		 * and the user value.  Same for SIGEV_PORT, above.
2064 		 */
2065 		reqp->req_sigevent.sigev_signo =
2066 		    aiocbp->aio_sigevent.sigev_signo;
2067 		reqp->req_sigevent.sigev_value.sival_ptr =
2068 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2069 	}
2070 
2071 	reqp->req_resultp = &aiocbp->aio_resultp;
2072 	reqp->req_aiocbp = aiocbp;
2073 	ap = &reqp->req_args;
2074 	ap->fd = aiocbp->aio_fildes;
2075 	ap->buf = (caddr_t)aiocbp->aio_buf;
2076 	ap->bufsz = aiocbp->aio_nbytes;
2077 	ap->offset = aiocbp->aio_offset;
2078 
2079 	if ((flg & AIO_NO_DUPS) &&
2080 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2081 		aio_panic("_aio_rw(): request already in hash table");
2082 		_aio_req_free(reqp);
2083 		errno = EINVAL;
2084 		return (-1);
2085 	}
2086 	_aio_req_add(reqp, nextworker, mode);
2087 	return (0);
2088 }
2089 
2090 #if !defined(_LP64)
2091 /*
2092  * 64-bit AIO interface for POSIX
2093  */
2094 int
2095 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2096     int mode, int flg)
2097 {
2098 	aio_req_t *reqp;
2099 	aio_args_t *ap;
2100 	int kerr;
2101 
2102 	if (aiocbp == NULL) {
2103 		errno = EINVAL;
2104 		return (-1);
2105 	}
2106 
2107 	/* initialize kaio */
2108 	if (!_kaio_ok)
2109 		_kaio_init();
2110 
2111 	aiocbp->aio_state = NOCHECK;
2112 
2113 	/*
2114 	 * If we have been called because a list I/O
2115 	 * kaio() failed, we dont want to repeat the
2116 	 * system call
2117 	 */
2118 
2119 	if (flg & AIO_KAIO) {
2120 		/*
2121 		 * Try kernel aio first.
2122 		 * If errno is ENOTSUP/EBADFD,
2123 		 * fall back to the thread implementation.
2124 		 */
2125 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2126 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2127 			aiocbp->aio_state = CHECK;
2128 			kerr = (int)_kaio(mode, aiocbp);
2129 			if (kerr == 0)
2130 				return (0);
2131 			if (errno != ENOTSUP && errno != EBADFD) {
2132 				aiocbp->aio_resultp.aio_errno = errno;
2133 				aiocbp->aio_resultp.aio_return = -1;
2134 				aiocbp->aio_state = NOCHECK;
2135 				return (-1);
2136 			}
2137 			if (errno == EBADFD)
2138 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2139 		}
2140 	}
2141 
2142 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2143 	aiocbp->aio_state = USERAIO;
2144 
2145 	if (!__uaio_ok && __uaio_init() == -1)
2146 		return (-1);
2147 
2148 	if ((reqp = _aio_req_alloc()) == NULL) {
2149 		errno = EAGAIN;
2150 		return (-1);
2151 	}
2152 
2153 	/*
2154 	 * If an LIO request, add the list head to the aio request
2155 	 */
2156 	reqp->req_head = lio_head;
2157 	reqp->req_type = AIO_POSIX_REQ;
2158 	reqp->req_op = mode;
2159 	reqp->req_largefile = 1;
2160 
2161 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2162 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2163 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2164 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2165 		reqp->req_sigevent.sigev_signo =
2166 		    aiocbp->aio_sigevent.sigev_signo;
2167 		reqp->req_sigevent.sigev_value.sival_ptr =
2168 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2169 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2170 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2171 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2172 		reqp->req_sigevent.sigev_signo =
2173 		    pn->portnfy_port;
2174 		reqp->req_sigevent.sigev_value.sival_ptr =
2175 		    pn->portnfy_user;
2176 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2177 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2178 		reqp->req_sigevent.sigev_signo =
2179 		    aiocbp->aio_sigevent.sigev_signo;
2180 		reqp->req_sigevent.sigev_value.sival_ptr =
2181 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2182 	}
2183 
2184 	reqp->req_resultp = &aiocbp->aio_resultp;
2185 	reqp->req_aiocbp = aiocbp;
2186 	ap = &reqp->req_args;
2187 	ap->fd = aiocbp->aio_fildes;
2188 	ap->buf = (caddr_t)aiocbp->aio_buf;
2189 	ap->bufsz = aiocbp->aio_nbytes;
2190 	ap->offset = aiocbp->aio_offset;
2191 
2192 	if ((flg & AIO_NO_DUPS) &&
2193 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2194 		aio_panic("_aio_rw64(): request already in hash table");
2195 		_aio_req_free(reqp);
2196 		errno = EINVAL;
2197 		return (-1);
2198 	}
2199 	_aio_req_add(reqp, nextworker, mode);
2200 	return (0);
2201 }
2202 #endif	/* !defined(_LP64) */
2203