xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision 3caf57556b91e230a55067f4d0d9c0f06992020d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  * Copyright 2025 MNX Cloud, Inc.
26  */
27 
28 #include "lint.h"
29 #include "thr_uberdata.h"
30 #include "libc.h"
31 #include "asyncio.h"
32 #include <atomic.h>
33 #include <sys/param.h>
34 #include <sys/file.h>
35 #include <sys/port.h>
36 
37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
38 static aio_req_t *_aio_req_get(aio_worker_t *);
39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
41 static void _aio_work_done(aio_worker_t *);
42 static void _aio_enq_doneq(aio_req_t *);
43 
44 extern void _aio_lio_free(aio_lio_t *);
45 
46 extern int __fcntl(int, int, ...);
47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
48 
49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
50 static void _aiodone(aio_req_t *, ssize_t, int);
51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
53 
54 /*
55  * switch for kernel async I/O
56  */
57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
58 
59 /*
60  * Key for thread-specific data
61  */
62 pthread_key_t _aio_key;
63 
64 /*
65  * Array for determining whether or not a file supports kaio.
66  * Initialized in _kaio_init().
67  */
68 uint32_t *_kaio_supported = NULL;
69 
70 /*
71  *  workers for read/write requests
72  * (__aio_mutex lock protects circular linked list of workers)
73  */
74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
76 int __rw_workerscnt;		/* number of read/write workers */
77 
78 /*
79  * worker for notification requests.
80  */
81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
83 int __no_workerscnt;		/* number of write workers */
84 
85 aio_req_t *_aio_done_tail;		/* list of done requests */
86 aio_req_t *_aio_done_head;
87 
88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
89 cond_t __aio_initcv = DEFAULTCV;
90 int __aio_initbusy = 0;
91 
92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
94 
95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
97 
98 aio_hash_t *_aio_hash;
99 
100 aio_req_t *_aio_doneq;			/* double linked done queue list */
101 
102 int _aio_donecnt = 0;
103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
104 int _aio_doneq_cnt = 0;
105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
110 
111 int _max_workers = 256;			/* max number of workers permitted */
112 int _min_workers = 4;			/* min number of workers */
113 int _minworkload = 2;			/* min number of request in q */
114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
115 int __uaio_ok = 0;			/* AIO has been enabled */
116 sigset_t _worker_set;			/* worker's signal mask */
117 
118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
119 int _aio_flags = 0;			/* see asyncio.h defines for */
120 
121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
122 
123 int hz;					/* clock ticks per second */
124 
125 static int
_kaio_supported_init(void)126 _kaio_supported_init(void)
127 {
128 	void *ptr;
129 	size_t size;
130 
131 	if (_kaio_supported != NULL)	/* already initialized */
132 		return (0);
133 
134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
137 	if (ptr == MAP_FAILED)
138 		return (-1);
139 	_kaio_supported = ptr;
140 	return (0);
141 }
142 
143 /*
144  * The aio subsystem is initialized when an AIO request is made.
145  * Constants are initialized like the max number of workers that
146  * the subsystem can create, and the minimum number of workers
147  * permitted before imposing some restrictions.  Also, some
148  * workers are created.
149  */
150 int
__uaio_init(void)151 __uaio_init(void)
152 {
153 	int ret = -1;
154 	int i;
155 	int cancel_state;
156 
157 	lmutex_lock(&__aio_initlock);
158 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
159 	while (__aio_initbusy)
160 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
161 	(void) pthread_setcancelstate(cancel_state, NULL);
162 	if (__uaio_ok) {	/* already initialized */
163 		lmutex_unlock(&__aio_initlock);
164 		return (0);
165 	}
166 	__aio_initbusy = 1;
167 	lmutex_unlock(&__aio_initlock);
168 
169 	hz = (int)sysconf(_SC_CLK_TCK);
170 	__pid = getpid();
171 
172 	setup_cancelsig(SIGAIOCANCEL);
173 
174 	if (_kaio_supported_init() != 0)
175 		goto out;
176 
177 	/*
178 	 * Allocate and initialize the hash table.
179 	 * Do this only once, even if __uaio_init() is called twice.
180 	 */
181 	if (_aio_hash == NULL) {
182 		/* LINTED pointer cast */
183 		_aio_hash = (aio_hash_t *)mmap(NULL,
184 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
185 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
186 		if ((void *)_aio_hash == MAP_FAILED) {
187 			_aio_hash = NULL;
188 			goto out;
189 		}
190 		for (i = 0; i < HASHSZ; i++)
191 			(void) mutex_init(&_aio_hash[i].hash_lock,
192 			    USYNC_THREAD, NULL);
193 	}
194 
195 	/*
196 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
197 	 */
198 	(void) sigfillset(&_worker_set);
199 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
200 
201 	/*
202 	 * Create one worker to send asynchronous notifications.
203 	 * Do this only once, even if __uaio_init() is called twice.
204 	 */
205 	if (__no_workerscnt == 0 &&
206 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
207 		errno = EAGAIN;
208 		goto out;
209 	}
210 
211 	/*
212 	 * Create the minimum number of read/write workers.
213 	 * And later check whether atleast one worker is created;
214 	 * lwp_create() calls could fail because of segkp exhaustion.
215 	 */
216 	for (i = 0; i < _min_workers; i++)
217 		(void) _aio_create_worker(NULL, AIOREAD);
218 	if (__rw_workerscnt == 0) {
219 		errno = EAGAIN;
220 		goto out;
221 	}
222 
223 	ret = 0;
224 out:
225 	lmutex_lock(&__aio_initlock);
226 	if (ret == 0)
227 		__uaio_ok = 1;
228 	__aio_initbusy = 0;
229 	(void) cond_broadcast(&__aio_initcv);
230 	lmutex_unlock(&__aio_initlock);
231 	return (ret);
232 }
233 
234 /*
235  * Called from close() before actually performing the real _close().
236  */
237 void
_aio_close(int fd)238 _aio_close(int fd)
239 {
240 	if (fd < 0)	/* avoid cancelling everything */
241 		return;
242 	/*
243 	 * Cancel all outstanding aio requests for this file descriptor.
244 	 */
245 	if (__uaio_ok)
246 		(void) aiocancel_all(fd);
247 	/*
248 	 * If we have allocated the bit array, clear the bit for this file.
249 	 * The next open may re-use this file descriptor and the new file
250 	 * may have different kaio() behaviour.
251 	 */
252 	if (_kaio_supported != NULL)
253 		CLEAR_KAIO_SUPPORTED(fd);
254 }
255 
256 /*
257  * special kaio cleanup thread sits in a loop in the
258  * kernel waiting for pending kaio requests to complete.
259  */
260 void *
_kaio_cleanup_thread(void * arg)261 _kaio_cleanup_thread(void *arg)
262 {
263 	if (pthread_setspecific(_aio_key, arg) != 0)
264 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
265 	(void) _kaio(AIOSTART);
266 	return (arg);
267 }
268 
269 /*
270  * initialize kaio.
271  */
272 void
_kaio_init()273 _kaio_init()
274 {
275 	int error;
276 	sigset_t oset;
277 	int cancel_state;
278 
279 	lmutex_lock(&__aio_initlock);
280 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
281 	while (__aio_initbusy)
282 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
283 	(void) pthread_setcancelstate(cancel_state, NULL);
284 	if (_kaio_ok) {		/* already initialized */
285 		lmutex_unlock(&__aio_initlock);
286 		return;
287 	}
288 	__aio_initbusy = 1;
289 	lmutex_unlock(&__aio_initlock);
290 
291 	if (_kaio_supported_init() != 0)
292 		error = ENOMEM;
293 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
294 		error = ENOMEM;
295 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
296 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
297 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
298 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
299 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
300 	}
301 	if (error && _kaiowp != NULL) {
302 		_aio_worker_free(_kaiowp);
303 		_kaiowp = NULL;
304 	}
305 
306 	lmutex_lock(&__aio_initlock);
307 	if (error)
308 		_kaio_ok = -1;
309 	else
310 		_kaio_ok = 1;
311 	__aio_initbusy = 0;
312 	(void) cond_broadcast(&__aio_initcv);
313 	lmutex_unlock(&__aio_initlock);
314 }
315 
316 int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)317 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
318     aio_result_t *resultp)
319 {
320 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
321 }
322 
323 int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)324 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
325     aio_result_t *resultp)
326 {
327 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
328 }
329 
330 #if !defined(_LP64)
331 int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)332 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
333     aio_result_t *resultp)
334 {
335 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
336 }
337 
338 int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)339 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
340     aio_result_t *resultp)
341 {
342 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
343 }
344 #endif	/* !defined(_LP64) */
345 
346 int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)347 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
348     aio_result_t *resultp, int mode)
349 {
350 	aio_req_t *reqp;
351 	aio_args_t *ap;
352 	offset_t loffset;
353 	struct stat64 stat64;
354 	int error = 0;
355 	int kerr;
356 	int umode;
357 
358 	switch (whence) {
359 
360 	case SEEK_SET:
361 		loffset = offset;
362 		break;
363 	case SEEK_CUR:
364 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
365 			error = -1;
366 		else
367 			loffset += offset;
368 		break;
369 	case SEEK_END:
370 		if (fstat64(fd, &stat64) == -1)
371 			error = -1;
372 		else
373 			loffset = offset + stat64.st_size;
374 		break;
375 	default:
376 		errno = EINVAL;
377 		error = -1;
378 	}
379 
380 	if (error)
381 		return (error);
382 
383 	/* initialize kaio */
384 	if (!_kaio_ok)
385 		_kaio_init();
386 
387 	/*
388 	 * _aio_do_request() needs the original request code (mode) to be able
389 	 * to choose the appropiate 32/64 bit function.  All other functions
390 	 * only require the difference between READ and WRITE (umode).
391 	 */
392 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
393 		umode = mode - AIOAREAD64;
394 	else
395 		umode = mode;
396 
397 	/*
398 	 * Try kernel aio first.
399 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
400 	 */
401 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
402 		resultp->aio_errno = 0;
403 		sig_mutex_lock(&__aio_mutex);
404 		_kaio_outstand_cnt++;
405 		sig_mutex_unlock(&__aio_mutex);
406 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
407 		    (umode | AIO_POLL_BIT) : umode),
408 		    fd, buf, bufsz, loffset, resultp);
409 		if (kerr == 0) {
410 			return (0);
411 		}
412 		sig_mutex_lock(&__aio_mutex);
413 		_kaio_outstand_cnt--;
414 		sig_mutex_unlock(&__aio_mutex);
415 		if (errno != ENOTSUP && errno != EBADFD)
416 			return (-1);
417 		if (errno == EBADFD)
418 			SET_KAIO_NOT_SUPPORTED(fd);
419 	}
420 
421 	if (!__uaio_ok && __uaio_init() == -1)
422 		return (-1);
423 
424 	if ((reqp = _aio_req_alloc()) == NULL) {
425 		errno = EAGAIN;
426 		return (-1);
427 	}
428 
429 	/*
430 	 * _aio_do_request() checks reqp->req_op to differentiate
431 	 * between 32 and 64 bit access.
432 	 */
433 	reqp->req_op = mode;
434 	reqp->req_resultp = resultp;
435 	ap = &reqp->req_args;
436 	ap->fd = fd;
437 	ap->buf = buf;
438 	ap->bufsz = bufsz;
439 	ap->offset = loffset;
440 
441 	if (_aio_hash_insert(resultp, reqp) != 0) {
442 		_aio_req_free(reqp);
443 		errno = EINVAL;
444 		return (-1);
445 	}
446 	/*
447 	 * _aio_req_add() only needs the difference between READ and
448 	 * WRITE to choose the right worker queue.
449 	 */
450 	_aio_req_add(reqp, &__nextworker_rw, umode);
451 	return (0);
452 }
453 
454 int
aiocancel(aio_result_t * resultp)455 aiocancel(aio_result_t *resultp)
456 {
457 	aio_req_t *reqp;
458 	aio_worker_t *aiowp;
459 	int ret;
460 	int done = 0;
461 	int canceled = 0;
462 
463 	if (!__uaio_ok) {
464 		errno = EINVAL;
465 		return (-1);
466 	}
467 
468 	sig_mutex_lock(&__aio_mutex);
469 	reqp = _aio_hash_find(resultp);
470 	if (reqp == NULL) {
471 		if (_aio_outstand_cnt == _aio_req_done_cnt)
472 			errno = EINVAL;
473 		else
474 			errno = EACCES;
475 		ret = -1;
476 	} else {
477 		aiowp = reqp->req_worker;
478 		sig_mutex_lock(&aiowp->work_qlock1);
479 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
480 		sig_mutex_unlock(&aiowp->work_qlock1);
481 
482 		if (canceled) {
483 			ret = 0;
484 		} else {
485 			if (_aio_outstand_cnt == 0 ||
486 			    _aio_outstand_cnt == _aio_req_done_cnt)
487 				errno = EINVAL;
488 			else
489 				errno = EACCES;
490 			ret = -1;
491 		}
492 	}
493 	sig_mutex_unlock(&__aio_mutex);
494 	return (ret);
495 }
496 
497 static void
_aiowait_cleanup(void * arg __unused)498 _aiowait_cleanup(void *arg __unused)
499 {
500 	sig_mutex_lock(&__aio_mutex);
501 	_aiowait_flag--;
502 	sig_mutex_unlock(&__aio_mutex);
503 }
504 
505 /*
506  * This must be asynch safe and cancel safe
507  */
508 aio_result_t *
aiowait(struct timeval * uwait)509 aiowait(struct timeval *uwait)
510 {
511 	aio_result_t *uresultp;
512 	aio_result_t *kresultp;
513 	aio_result_t *resultp;
514 	int dontblock;
515 	int timedwait = 0;
516 	int kaio_errno = 0;
517 	struct timeval twait;
518 	struct timeval *wait = NULL;
519 	hrtime_t hrtend;
520 	hrtime_t hres;
521 
522 	if (uwait) {
523 		/*
524 		 * Check for a valid specified wait time.
525 		 * If it is invalid, fail the call right away.
526 		 */
527 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
528 		    uwait->tv_usec >= MICROSEC) {
529 			errno = EINVAL;
530 			return ((aio_result_t *)-1);
531 		}
532 
533 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
534 			hrtend = gethrtime() +
535 			    (hrtime_t)uwait->tv_sec * NANOSEC +
536 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
537 			twait = *uwait;
538 			wait = &twait;
539 			timedwait++;
540 		} else {
541 			/* polling */
542 			sig_mutex_lock(&__aio_mutex);
543 			if (_kaio_outstand_cnt == 0) {
544 				kresultp = (aio_result_t *)-1;
545 			} else {
546 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
547 				    (struct timeval *)-1, 1);
548 				if (kresultp != (aio_result_t *)-1 &&
549 				    kresultp != NULL &&
550 				    kresultp != (aio_result_t *)1) {
551 					_kaio_outstand_cnt--;
552 					sig_mutex_unlock(&__aio_mutex);
553 					return (kresultp);
554 				}
555 			}
556 			uresultp = _aio_req_done();
557 			sig_mutex_unlock(&__aio_mutex);
558 			if (uresultp != NULL &&
559 			    uresultp != (aio_result_t *)-1) {
560 				return (uresultp);
561 			}
562 			if (uresultp == (aio_result_t *)-1 &&
563 			    kresultp == (aio_result_t *)-1) {
564 				errno = EINVAL;
565 				return ((aio_result_t *)-1);
566 			} else {
567 				return (NULL);
568 			}
569 		}
570 	}
571 
572 	for (;;) {
573 		sig_mutex_lock(&__aio_mutex);
574 		uresultp = _aio_req_done();
575 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
576 			sig_mutex_unlock(&__aio_mutex);
577 			resultp = uresultp;
578 			break;
579 		}
580 		_aiowait_flag++;
581 		dontblock = (uresultp == (aio_result_t *)-1);
582 		if (dontblock && _kaio_outstand_cnt == 0) {
583 			kresultp = (aio_result_t *)-1;
584 			kaio_errno = EINVAL;
585 		} else {
586 			sig_mutex_unlock(&__aio_mutex);
587 			pthread_cleanup_push(_aiowait_cleanup, NULL);
588 			_cancel_prologue();
589 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
590 			    wait, dontblock);
591 			_cancel_epilogue();
592 			pthread_cleanup_pop(0);
593 			sig_mutex_lock(&__aio_mutex);
594 			kaio_errno = errno;
595 		}
596 		_aiowait_flag--;
597 		sig_mutex_unlock(&__aio_mutex);
598 		if (kresultp == (aio_result_t *)1) {
599 			/* aiowait() awakened by an aionotify() */
600 			continue;
601 		} else if (kresultp != NULL &&
602 		    kresultp != (aio_result_t *)-1) {
603 			resultp = kresultp;
604 			sig_mutex_lock(&__aio_mutex);
605 			_kaio_outstand_cnt--;
606 			sig_mutex_unlock(&__aio_mutex);
607 			break;
608 		} else if (kresultp == (aio_result_t *)-1 &&
609 		    kaio_errno == EINVAL &&
610 		    uresultp == (aio_result_t *)-1) {
611 			errno = kaio_errno;
612 			resultp = (aio_result_t *)-1;
613 			break;
614 		} else if (kresultp == (aio_result_t *)-1 &&
615 		    kaio_errno == EINTR) {
616 			errno = kaio_errno;
617 			resultp = (aio_result_t *)-1;
618 			break;
619 		} else if (timedwait) {
620 			hres = hrtend - gethrtime();
621 			if (hres <= 0) {
622 				/* time is up; return */
623 				resultp = NULL;
624 				break;
625 			} else {
626 				/*
627 				 * Some time left.  Round up the remaining time
628 				 * in nanoseconds to microsec.  Retry the call.
629 				 */
630 				hres += (NANOSEC / MICROSEC) - 1;
631 				wait->tv_sec = hres / NANOSEC;
632 				wait->tv_usec =
633 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
634 			}
635 		} else {
636 			ASSERT(kresultp == NULL && uresultp == NULL);
637 			resultp = NULL;
638 			continue;
639 		}
640 	}
641 	return (resultp);
642 }
643 
644 /*
645  * _aio_get_timedelta calculates the remaining time and stores the result
646  * into timespec_t *wait.
647  */
648 
649 int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)650 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
651 {
652 	int	ret = 0;
653 	struct	timeval cur;
654 	timespec_t curtime;
655 
656 	(void) gettimeofday(&cur, NULL);
657 	curtime.tv_sec = cur.tv_sec;
658 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
659 
660 	if (end->tv_sec >= curtime.tv_sec) {
661 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
662 		if (end->tv_nsec >= curtime.tv_nsec) {
663 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
664 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
665 				ret = -1;	/* timer expired */
666 		} else {
667 			if (end->tv_sec > curtime.tv_sec) {
668 				wait->tv_sec -= 1;
669 				wait->tv_nsec = NANOSEC -
670 				    (curtime.tv_nsec - end->tv_nsec);
671 			} else {
672 				ret = -1;	/* timer expired */
673 			}
674 		}
675 	} else {
676 		ret = -1;
677 	}
678 	return (ret);
679 }
680 
681 /*
682  * If closing by file descriptor: we will simply cancel all the outstanding
683  * aio`s and return.  Those aio's in question will have either noticed the
684  * cancellation notice before, during, or after initiating io.
685  */
686 int
aiocancel_all(int fd)687 aiocancel_all(int fd)
688 {
689 	aio_req_t *reqp;
690 	aio_req_t **reqpp, *last;
691 	aio_worker_t *first;
692 	aio_worker_t *next;
693 	int canceled = 0;
694 	int done = 0;
695 	int cancelall = 0;
696 
697 	sig_mutex_lock(&__aio_mutex);
698 
699 	if (_aio_outstand_cnt == 0) {
700 		sig_mutex_unlock(&__aio_mutex);
701 		return (AIO_ALLDONE);
702 	}
703 
704 	/*
705 	 * Cancel requests from the read/write workers' queues.
706 	 */
707 	first = __nextworker_rw;
708 	next = first;
709 	do {
710 		_aio_cancel_work(next, fd, &canceled, &done);
711 	} while ((next = next->work_forw) != first);
712 
713 	/*
714 	 * finally, check if there are requests on the done queue that
715 	 * should be canceled.
716 	 */
717 	if (fd < 0)
718 		cancelall = 1;
719 	reqpp = &_aio_done_tail;
720 	last = _aio_done_tail;
721 	while ((reqp = *reqpp) != NULL) {
722 		if (cancelall || reqp->req_args.fd == fd) {
723 			*reqpp = reqp->req_next;
724 			if (last == reqp) {
725 				last = reqp->req_next;
726 			}
727 			if (_aio_done_head == reqp) {
728 				/* this should be the last req in list */
729 				_aio_done_head = last;
730 			}
731 			_aio_donecnt--;
732 			_aio_set_result(reqp, -1, ECANCELED);
733 			(void) _aio_hash_del(reqp->req_resultp);
734 			_aio_req_free(reqp);
735 		} else {
736 			reqpp = &reqp->req_next;
737 			last = reqp;
738 		}
739 	}
740 
741 	if (cancelall) {
742 		ASSERT(_aio_donecnt == 0);
743 		_aio_done_head = NULL;
744 	}
745 	sig_mutex_unlock(&__aio_mutex);
746 
747 	if (canceled && done == 0)
748 		return (AIO_CANCELED);
749 	else if (done && canceled == 0)
750 		return (AIO_ALLDONE);
751 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
752 		return ((int)_kaio(AIOCANCEL, fd, NULL));
753 	return (AIO_NOTCANCELED);
754 }
755 
756 /*
757  * Cancel requests from a given work queue.  If the file descriptor
758  * parameter, fd, is non-negative, then only cancel those requests
759  * in this queue that are to this file descriptor.  If the fd
760  * parameter is -1, then cancel all requests.
761  */
762 static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)763 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
764 {
765 	aio_req_t *reqp;
766 
767 	sig_mutex_lock(&aiowp->work_qlock1);
768 	/*
769 	 * cancel queued requests first.
770 	 */
771 	reqp = aiowp->work_tail1;
772 	while (reqp != NULL) {
773 		if (fd < 0 || reqp->req_args.fd == fd) {
774 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
775 				/*
776 				 * Callers locks were dropped.
777 				 * reqp is invalid; start traversing
778 				 * the list from the beginning again.
779 				 */
780 				reqp = aiowp->work_tail1;
781 				continue;
782 			}
783 		}
784 		reqp = reqp->req_next;
785 	}
786 	/*
787 	 * Since the queued requests have been canceled, there can
788 	 * only be one inprogress request that should be canceled.
789 	 */
790 	if ((reqp = aiowp->work_req) != NULL &&
791 	    (fd < 0 || reqp->req_args.fd == fd))
792 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
793 	sig_mutex_unlock(&aiowp->work_qlock1);
794 }
795 
796 /*
797  * Cancel a request.  Return 1 if the callers locks were temporarily
798  * dropped, otherwise return 0.
799  */
800 int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)801 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
802 {
803 	int ostate = reqp->req_state;
804 
805 	ASSERT(MUTEX_HELD(&__aio_mutex));
806 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
807 	if (ostate == AIO_REQ_CANCELED)
808 		return (0);
809 	if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
810 	    aiowp->work_prev1 == reqp) {
811 		ASSERT(aiowp->work_done1 != 0);
812 		/*
813 		 * If not on the done queue yet, just mark it CANCELED,
814 		 * _aio_work_done() will do the necessary clean up.
815 		 * This is required to ensure that aiocancel_all() cancels
816 		 * all the outstanding requests, including this one which
817 		 * is not yet on done queue but has been marked done.
818 		 */
819 		_aio_set_result(reqp, -1, ECANCELED);
820 		(void) _aio_hash_del(reqp->req_resultp);
821 		reqp->req_state = AIO_REQ_CANCELED;
822 		(*canceled)++;
823 		return (0);
824 	}
825 
826 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
827 		(*done)++;
828 		return (0);
829 	}
830 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
831 		ASSERT(POSIX_AIO(reqp));
832 		/* Cancel the queued aio_fsync() request */
833 		if (!reqp->req_head->lio_canned) {
834 			reqp->req_head->lio_canned = 1;
835 			_aio_outstand_cnt--;
836 			(*canceled)++;
837 		}
838 		return (0);
839 	}
840 	reqp->req_state = AIO_REQ_CANCELED;
841 	_aio_req_del(aiowp, reqp, ostate);
842 	(void) _aio_hash_del(reqp->req_resultp);
843 	(*canceled)++;
844 	if (reqp == aiowp->work_req) {
845 		ASSERT(ostate == AIO_REQ_INPROGRESS);
846 		/*
847 		 * Set the result values now, before _aiodone() is called.
848 		 * We do this because the application can expect aio_return
849 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
850 		 * immediately after a successful return from aiocancel()
851 		 * or aio_cancel().
852 		 */
853 		_aio_set_result(reqp, -1, ECANCELED);
854 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
855 		return (0);
856 	}
857 	if (!POSIX_AIO(reqp)) {
858 		_aio_outstand_cnt--;
859 		_aio_set_result(reqp, -1, ECANCELED);
860 		_aio_req_free(reqp);
861 		return (0);
862 	}
863 	sig_mutex_unlock(&aiowp->work_qlock1);
864 	sig_mutex_unlock(&__aio_mutex);
865 	_aiodone(reqp, -1, ECANCELED);
866 	sig_mutex_lock(&__aio_mutex);
867 	sig_mutex_lock(&aiowp->work_qlock1);
868 	return (1);
869 }
870 
871 int
_aio_create_worker(aio_req_t * reqp,int mode)872 _aio_create_worker(aio_req_t *reqp, int mode)
873 {
874 	aio_worker_t *aiowp, **workers, **nextworker;
875 	int *aio_workerscnt;
876 	void *(*func)(void *);
877 	sigset_t oset;
878 	int error;
879 
880 	/*
881 	 * Put the new worker thread in the right queue.
882 	 */
883 	switch (mode) {
884 	case AIOREAD:
885 	case AIOWRITE:
886 	case AIOAREAD:
887 	case AIOAWRITE:
888 #if !defined(_LP64)
889 	case AIOAREAD64:
890 	case AIOAWRITE64:
891 #endif
892 		workers = &__workers_rw;
893 		nextworker = &__nextworker_rw;
894 		aio_workerscnt = &__rw_workerscnt;
895 		func = _aio_do_request;
896 		break;
897 	case AIONOTIFY:
898 		workers = &__workers_no;
899 		nextworker = &__nextworker_no;
900 		func = _aio_do_notify;
901 		aio_workerscnt = &__no_workerscnt;
902 		break;
903 	default:
904 		aio_panic("_aio_create_worker: invalid mode");
905 		break;
906 	}
907 
908 	if ((aiowp = _aio_worker_alloc()) == NULL)
909 		return (-1);
910 
911 	if (reqp) {
912 		reqp->req_state = AIO_REQ_QUEUED;
913 		reqp->req_worker = aiowp;
914 		aiowp->work_head1 = reqp;
915 		aiowp->work_tail1 = reqp;
916 		aiowp->work_next1 = reqp;
917 		aiowp->work_count1 = 1;
918 		aiowp->work_minload1 = 1;
919 	}
920 
921 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
922 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
923 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
924 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
925 	if (error) {
926 		if (reqp) {
927 			reqp->req_state = 0;
928 			reqp->req_worker = NULL;
929 		}
930 		_aio_worker_free(aiowp);
931 		return (-1);
932 	}
933 
934 	lmutex_lock(&__aio_mutex);
935 	(*aio_workerscnt)++;
936 	if (*workers == NULL) {
937 		aiowp->work_forw = aiowp;
938 		aiowp->work_backw = aiowp;
939 		*nextworker = aiowp;
940 		*workers = aiowp;
941 	} else {
942 		aiowp->work_backw = (*workers)->work_backw;
943 		aiowp->work_forw = (*workers);
944 		(*workers)->work_backw->work_forw = aiowp;
945 		(*workers)->work_backw = aiowp;
946 	}
947 	_aio_worker_cnt++;
948 	lmutex_unlock(&__aio_mutex);
949 
950 	(void) thr_continue(aiowp->work_tid);
951 
952 	return (0);
953 }
954 
955 /*
956  * This is the worker's main routine.
957  * The task of this function is to execute all queued requests;
958  * once the last pending request is executed this function will block
959  * in _aio_idle().  A new incoming request must wakeup this thread to
960  * restart the work.
961  * Every worker has an own work queue.  The queue lock is required
962  * to synchronize the addition of new requests for this worker or
963  * cancellation of pending/running requests.
964  *
965  * Cancellation scenarios:
966  * The cancellation of a request is being done asynchronously using
967  * _aio_cancel_req() from another thread context.
968  * A queued request can be cancelled in different manners :
969  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
970  *	- lock the queue -> remove the request -> unlock the queue
971  *	- this function/thread does not detect this cancellation process
972  * b) request is in progress (AIO_REQ_INPROGRESS) :
973  *	- this function first allow the cancellation of the running
974  *	  request with the flag "work_cancel_flg=1"
975  *		see _aio_req_get() -> _aio_cancel_on()
976  *	  During this phase, it is allowed to interrupt the worker
977  *	  thread running the request (this thread) using the SIGAIOCANCEL
978  *	  signal.
979  *	  Once this thread returns from the kernel (because the request
980  *	  is just done), then it must disable a possible cancellation
981  *	  and proceed to finish the request.  To disable the cancellation
982  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
983  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
984  *	  same procedure as in a)
985  *
986  * To b)
987  *	This thread uses sigsetjmp() to define the position in the code, where
988  *	it wish to continue working in the case that a SIGAIOCANCEL signal
989  *	is detected.
990  *	Normally this thread should get the cancellation signal during the
991  *	kernel phase (reading or writing).  In that case the signal handler
992  *	aiosigcancelhndlr() is activated using the worker thread context,
993  *	which again will use the siglongjmp() function to break the standard
994  *	code flow and jump to the "sigsetjmp" position, provided that
995  *	"work_cancel_flg" is set to "1".
996  *	Because the "work_cancel_flg" is only manipulated by this worker
997  *	thread and it can only run on one CPU at a given time, it is not
998  *	necessary to protect that flag with the queue lock.
999  *	Returning from the kernel (read or write system call) we must
1000  *	first disable the use of the SIGAIOCANCEL signal and accordingly
1001  *	the use of the siglongjmp() function to prevent a possible deadlock:
1002  *	- It can happens that this worker thread returns from the kernel and
1003  *	  blocks in "work_qlock1",
1004  *	- then a second thread cancels the apparently "in progress" request
1005  *	  and sends the SIGAIOCANCEL signal to the worker thread,
1006  *	- the worker thread gets assigned the "work_qlock1" and will returns
1007  *	  from the kernel,
1008  *	- the kernel detects the pending signal and activates the signal
1009  *	  handler instead,
1010  *	- if the "work_cancel_flg" is still set then the signal handler
1011  *	  should use siglongjmp() to cancel the "in progress" request and
1012  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
1013  *	  for a second time => deadlock.
1014  *	To avoid that situation we disable the cancellation of the request
1015  *	in progress BEFORE we try to acquire the work_qlock1.
1016  *	In that case the signal handler will not call siglongjmp() and the
1017  *	worker thread will continue running the standard code flow.
1018  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
1019  *	an eventually required siglongjmp() freeing the work_qlock1 and
1020  *	avoiding a deadlock.
1021  */
1022 void *
_aio_do_request(void * arglist)1023 _aio_do_request(void *arglist)
1024 {
1025 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
1026 	ulwp_t *self = curthread;
1027 	struct aio_args *arg;
1028 	aio_req_t *reqp;		/* current AIO request */
1029 	ssize_t retval;
1030 	int append;
1031 	int error;
1032 
1033 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1034 		aio_panic("_aio_do_request, pthread_setspecific()");
1035 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1036 	ASSERT(aiowp->work_req == NULL);
1037 
1038 	/*
1039 	 * We resume here when an operation is cancelled.
1040 	 * On first entry, aiowp->work_req == NULL, so all
1041 	 * we do is block SIGAIOCANCEL.
1042 	 */
1043 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
1044 	ASSERT(self->ul_sigdefer == 0);
1045 
1046 	sigoff(self);	/* block SIGAIOCANCEL */
1047 	if (aiowp->work_req != NULL)
1048 		_aio_finish_request(aiowp, -1, ECANCELED);
1049 
1050 	for (;;) {
1051 		/*
1052 		 * Put completed requests on aio_done_list.  This has
1053 		 * to be done as part of the main loop to ensure that
1054 		 * we don't artificially starve any aiowait'ers.
1055 		 */
1056 		if (aiowp->work_done1)
1057 			_aio_work_done(aiowp);
1058 
1059 top:
1060 		/* consume any deferred SIGAIOCANCEL signal here */
1061 		sigon(self);
1062 		sigoff(self);
1063 
1064 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1065 			if (_aio_idle(aiowp) != 0)
1066 				goto top;
1067 		}
1068 		arg = &reqp->req_args;
1069 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1070 		    reqp->req_state == AIO_REQ_CANCELED);
1071 		error = 0;
1072 
1073 		switch (reqp->req_op) {
1074 		case AIOREAD:
1075 		case AIOAREAD:
1076 			sigon(self);	/* unblock SIGAIOCANCEL */
1077 			retval = pread(arg->fd, arg->buf,
1078 			    arg->bufsz, arg->offset);
1079 			if (retval == -1) {
1080 				if (errno == ESPIPE) {
1081 					retval = read(arg->fd,
1082 					    arg->buf, arg->bufsz);
1083 					if (retval == -1)
1084 						error = errno;
1085 				} else {
1086 					error = errno;
1087 				}
1088 			}
1089 			sigoff(self);	/* block SIGAIOCANCEL */
1090 			break;
1091 		case AIOWRITE:
1092 		case AIOAWRITE:
1093 			/*
1094 			 * The SUSv3 POSIX spec for aio_write() states:
1095 			 *	If O_APPEND is set for the file descriptor,
1096 			 *	write operations append to the file in the
1097 			 *	same order as the calls were made.
1098 			 * but, somewhat inconsistently, it requires pwrite()
1099 			 * to ignore the O_APPEND setting.  So we have to use
1100 			 * fcntl() to get the open modes and call write() for
1101 			 * the O_APPEND case.
1102 			 */
1103 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1104 			sigon(self);	/* unblock SIGAIOCANCEL */
1105 			retval = append?
1106 			    write(arg->fd, arg->buf, arg->bufsz) :
1107 			    pwrite(arg->fd, arg->buf, arg->bufsz,
1108 			    arg->offset);
1109 			if (retval == -1) {
1110 				if (errno == ESPIPE) {
1111 					retval = write(arg->fd,
1112 					    arg->buf, arg->bufsz);
1113 					if (retval == -1)
1114 						error = errno;
1115 				} else {
1116 					error = errno;
1117 				}
1118 			}
1119 			sigoff(self);	/* block SIGAIOCANCEL */
1120 			break;
1121 #if !defined(_LP64)
1122 		case AIOAREAD64:
1123 			sigon(self);	/* unblock SIGAIOCANCEL */
1124 			retval = pread64(arg->fd, arg->buf,
1125 			    arg->bufsz, arg->offset);
1126 			if (retval == -1) {
1127 				if (errno == ESPIPE) {
1128 					retval = read(arg->fd,
1129 					    arg->buf, arg->bufsz);
1130 					if (retval == -1)
1131 						error = errno;
1132 				} else {
1133 					error = errno;
1134 				}
1135 			}
1136 			sigoff(self);	/* block SIGAIOCANCEL */
1137 			break;
1138 		case AIOAWRITE64:
1139 			/*
1140 			 * The SUSv3 POSIX spec for aio_write() states:
1141 			 *	If O_APPEND is set for the file descriptor,
1142 			 *	write operations append to the file in the
1143 			 *	same order as the calls were made.
1144 			 * but, somewhat inconsistently, it requires pwrite()
1145 			 * to ignore the O_APPEND setting.  So we have to use
1146 			 * fcntl() to get the open modes and call write() for
1147 			 * the O_APPEND case.
1148 			 */
1149 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1150 			sigon(self);	/* unblock SIGAIOCANCEL */
1151 			retval = append?
1152 			    write(arg->fd, arg->buf, arg->bufsz) :
1153 			    pwrite64(arg->fd, arg->buf, arg->bufsz,
1154 			    arg->offset);
1155 			if (retval == -1) {
1156 				if (errno == ESPIPE) {
1157 					retval = write(arg->fd,
1158 					    arg->buf, arg->bufsz);
1159 					if (retval == -1)
1160 						error = errno;
1161 				} else {
1162 					error = errno;
1163 				}
1164 			}
1165 			sigoff(self);	/* block SIGAIOCANCEL */
1166 			break;
1167 #endif	/* !defined(_LP64) */
1168 		case AIOFSYNC:
1169 			if (_aio_fsync_del(aiowp, reqp))
1170 				goto top;
1171 			ASSERT(reqp->req_head == NULL);
1172 			/*
1173 			 * All writes for this fsync request are now
1174 			 * acknowledged.  Now make these writes visible
1175 			 * and put the final request into the hash table.
1176 			 */
1177 			if (reqp->req_state == AIO_REQ_CANCELED) {
1178 				/* EMPTY */;
1179 			} else if (arg->offset == O_SYNC) {
1180 				if ((retval = __fdsync(arg->fd, FDSYNC_FILE)) ==
1181 				    -1) {
1182 					error = errno;
1183 				}
1184 			} else {
1185 				if ((retval = __fdsync(arg->fd, FDSYNC_DATA)) ==
1186 				    -1) {
1187 					error = errno;
1188 				}
1189 			}
1190 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1191 				aio_panic("_aio_do_request(): AIOFSYNC: "
1192 				    "request already in hash table");
1193 			break;
1194 		default:
1195 			aio_panic("_aio_do_request, bad op");
1196 		}
1197 
1198 		_aio_finish_request(aiowp, retval, error);
1199 	}
1200 	/* NOTREACHED */
1201 	return (NULL);
1202 }
1203 
1204 /*
1205  * Perform the tail processing for _aio_do_request().
1206  * The in-progress request may or may not have been cancelled.
1207  */
1208 static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)1209 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1210 {
1211 	aio_req_t *reqp;
1212 
1213 	sig_mutex_lock(&aiowp->work_qlock1);
1214 	if ((reqp = aiowp->work_req) == NULL)
1215 		sig_mutex_unlock(&aiowp->work_qlock1);
1216 	else {
1217 		aiowp->work_req = NULL;
1218 		if (reqp->req_state == AIO_REQ_CANCELED) {
1219 			retval = -1;
1220 			error = ECANCELED;
1221 		}
1222 		if (!POSIX_AIO(reqp)) {
1223 			int notify;
1224 			if (reqp->req_state == AIO_REQ_INPROGRESS) {
1225 				reqp->req_state = AIO_REQ_DONE;
1226 				_aio_set_result(reqp, retval, error);
1227 			}
1228 			sig_mutex_unlock(&aiowp->work_qlock1);
1229 			sig_mutex_lock(&__aio_mutex);
1230 			/*
1231 			 * If it was canceled, this request will not be
1232 			 * added to done list. Just free it.
1233 			 */
1234 			if (error == ECANCELED) {
1235 				_aio_outstand_cnt--;
1236 				_aio_req_free(reqp);
1237 			} else {
1238 				_aio_req_done_cnt++;
1239 			}
1240 			/*
1241 			 * Notify any thread that may have blocked
1242 			 * because it saw an outstanding request.
1243 			 */
1244 			notify = 0;
1245 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1246 				notify = 1;
1247 			}
1248 			sig_mutex_unlock(&__aio_mutex);
1249 			if (notify) {
1250 				(void) _kaio(AIONOTIFY);
1251 			}
1252 		} else {
1253 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1254 				reqp->req_state = AIO_REQ_DONE;
1255 			sig_mutex_unlock(&aiowp->work_qlock1);
1256 			_aiodone(reqp, retval, error);
1257 		}
1258 	}
1259 }
1260 
1261 void
_aio_req_mark_done(aio_req_t * reqp)1262 _aio_req_mark_done(aio_req_t *reqp)
1263 {
1264 #if !defined(_LP64)
1265 	if (reqp->req_largefile)
1266 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1267 	else
1268 #endif
1269 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1270 }
1271 
1272 /*
1273  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1274  * hopefully to consume one of our queued signals.
1275  */
1276 static void
_aio_delay(int ticks)1277 _aio_delay(int ticks)
1278 {
1279 	(void) usleep(ticks * (MICROSEC / hz));
1280 }
1281 
1282 /*
1283  * Actually send the notifications.
1284  * We could block indefinitely here if the application
1285  * is not listening for the signal or port notifications.
1286  */
1287 static void
send_notification(notif_param_t * npp)1288 send_notification(notif_param_t *npp)
1289 {
1290 	extern int __sigqueue(pid_t pid, int signo,
1291 	    /* const union sigval */ void *value, int si_code, int block);
1292 
1293 	if (npp->np_signo)
1294 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1295 		    SI_ASYNCIO, 1);
1296 	else if (npp->np_port >= 0)
1297 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1298 		    npp->np_event, npp->np_object, npp->np_user);
1299 
1300 	if (npp->np_lio_signo)
1301 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1302 		    SI_ASYNCIO, 1);
1303 	else if (npp->np_lio_port >= 0)
1304 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1305 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1306 }
1307 
1308 /*
1309  * Asynchronous notification worker.
1310  */
1311 void *
_aio_do_notify(void * arg)1312 _aio_do_notify(void *arg)
1313 {
1314 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1315 	aio_req_t *reqp;
1316 
1317 	/*
1318 	 * This isn't really necessary.  All signals are blocked.
1319 	 */
1320 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1321 		aio_panic("_aio_do_notify, pthread_setspecific()");
1322 
1323 	/*
1324 	 * Notifications are never cancelled.
1325 	 * All signals remain blocked, forever.
1326 	 */
1327 	for (;;) {
1328 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1329 			if (_aio_idle(aiowp) != 0)
1330 				aio_panic("_aio_do_notify: _aio_idle() failed");
1331 		}
1332 		send_notification(&reqp->req_notify);
1333 		_aio_req_free(reqp);
1334 	}
1335 
1336 	/* NOTREACHED */
1337 	return (NULL);
1338 }
1339 
1340 /*
1341  * Do the completion semantics for a request that was either canceled
1342  * by _aio_cancel_req() or was completed by _aio_do_request().
1343  */
1344 static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)1345 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1346 {
1347 	aio_result_t *resultp = reqp->req_resultp;
1348 	int notify = 0;
1349 	aio_lio_t *head;
1350 	int sigev_none;
1351 	int sigev_signal;
1352 	int sigev_thread;
1353 	int sigev_port;
1354 	notif_param_t np;
1355 
1356 	/*
1357 	 * We call _aiodone() only for Posix I/O.
1358 	 */
1359 	ASSERT(POSIX_AIO(reqp));
1360 
1361 	sigev_none = 0;
1362 	sigev_signal = 0;
1363 	sigev_thread = 0;
1364 	sigev_port = 0;
1365 	np.np_signo = 0;
1366 	np.np_port = -1;
1367 	np.np_lio_signo = 0;
1368 	np.np_lio_port = -1;
1369 
1370 	switch (reqp->req_sigevent.sigev_notify) {
1371 	case SIGEV_NONE:
1372 		sigev_none = 1;
1373 		break;
1374 	case SIGEV_SIGNAL:
1375 		sigev_signal = 1;
1376 		break;
1377 	case SIGEV_THREAD:
1378 		sigev_thread = 1;
1379 		break;
1380 	case SIGEV_PORT:
1381 		sigev_port = 1;
1382 		break;
1383 	default:
1384 		aio_panic("_aiodone: improper sigev_notify");
1385 		break;
1386 	}
1387 
1388 	/*
1389 	 * Figure out the notification parameters while holding __aio_mutex.
1390 	 * Actually perform the notifications after dropping __aio_mutex.
1391 	 * This allows us to sleep for a long time (if the notifications
1392 	 * incur delays) without impeding other async I/O operations.
1393 	 */
1394 
1395 	sig_mutex_lock(&__aio_mutex);
1396 
1397 	if (sigev_signal) {
1398 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1399 			notify = 1;
1400 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1401 	} else if (sigev_thread | sigev_port) {
1402 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1403 			notify = 1;
1404 		np.np_event = reqp->req_op;
1405 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1406 			np.np_event = AIOFSYNC64;
1407 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1408 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1409 	}
1410 
1411 	if (resultp->aio_errno == EINPROGRESS)
1412 		_aio_set_result(reqp, retval, error);
1413 
1414 	_aio_outstand_cnt--;
1415 
1416 	head = reqp->req_head;
1417 	reqp->req_head = NULL;
1418 
1419 	if (sigev_none) {
1420 		_aio_enq_doneq(reqp);
1421 		reqp = NULL;
1422 	} else {
1423 		(void) _aio_hash_del(resultp);
1424 		_aio_req_mark_done(reqp);
1425 	}
1426 
1427 	_aio_waitn_wakeup();
1428 
1429 	/*
1430 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1431 	 * __aio_suspend() increments "_aio_kernel_suspend"
1432 	 * when they are waiting in the kernel for completed I/Os.
1433 	 *
1434 	 * _kaio(AIONOTIFY) awakes the corresponding function
1435 	 * in the kernel; then the corresponding __aio_waitn() or
1436 	 * __aio_suspend() function could reap the recently
1437 	 * completed I/Os (_aiodone()).
1438 	 */
1439 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1440 		(void) _kaio(AIONOTIFY);
1441 
1442 	sig_mutex_unlock(&__aio_mutex);
1443 
1444 	if (head != NULL) {
1445 		/*
1446 		 * If all the lio requests have completed,
1447 		 * prepare to notify the waiting thread.
1448 		 */
1449 		sig_mutex_lock(&head->lio_mutex);
1450 		ASSERT(head->lio_refcnt == head->lio_nent);
1451 		if (head->lio_refcnt == 1) {
1452 			int waiting = 0;
1453 			if (head->lio_mode == LIO_WAIT) {
1454 				if ((waiting = head->lio_waiting) != 0)
1455 					(void) cond_signal(&head->lio_cond_cv);
1456 			} else if (head->lio_port < 0) { /* none or signal */
1457 				if ((np.np_lio_signo = head->lio_signo) != 0)
1458 					notify = 1;
1459 				np.np_lio_user = head->lio_sigval.sival_ptr;
1460 			} else {			/* thread or port */
1461 				notify = 1;
1462 				np.np_lio_port = head->lio_port;
1463 				np.np_lio_event = head->lio_event;
1464 				np.np_lio_object =
1465 				    (uintptr_t)head->lio_sigevent;
1466 				np.np_lio_user = head->lio_sigval.sival_ptr;
1467 			}
1468 			head->lio_nent = head->lio_refcnt = 0;
1469 			sig_mutex_unlock(&head->lio_mutex);
1470 			if (waiting == 0)
1471 				_aio_lio_free(head);
1472 		} else {
1473 			head->lio_nent--;
1474 			head->lio_refcnt--;
1475 			sig_mutex_unlock(&head->lio_mutex);
1476 		}
1477 	}
1478 
1479 	/*
1480 	 * The request is completed; now perform the notifications.
1481 	 */
1482 	if (notify) {
1483 		if (reqp != NULL) {
1484 			/*
1485 			 * We usually put the request on the notification
1486 			 * queue because we don't want to block and delay
1487 			 * other operations behind us in the work queue.
1488 			 * Also we must never block on a cancel notification
1489 			 * because we are being called from an application
1490 			 * thread in this case and that could lead to deadlock
1491 			 * if no other thread is receiving notificatins.
1492 			 */
1493 			reqp->req_notify = np;
1494 			reqp->req_op = AIONOTIFY;
1495 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1496 			reqp = NULL;
1497 		} else {
1498 			/*
1499 			 * We already put the request on the done queue,
1500 			 * so we can't queue it to the notification queue.
1501 			 * Just do the notification directly.
1502 			 */
1503 			send_notification(&np);
1504 		}
1505 	}
1506 
1507 	if (reqp != NULL)
1508 		_aio_req_free(reqp);
1509 }
1510 
1511 /*
1512  * Delete fsync requests from list head until there is
1513  * only one left.  Return 0 when there is only one,
1514  * otherwise return a non-zero value.
1515  */
1516 static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)1517 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1518 {
1519 	aio_lio_t *head = reqp->req_head;
1520 	int rval = 0;
1521 
1522 	ASSERT(reqp == aiowp->work_req);
1523 	sig_mutex_lock(&aiowp->work_qlock1);
1524 	sig_mutex_lock(&head->lio_mutex);
1525 	if (head->lio_refcnt > 1) {
1526 		head->lio_refcnt--;
1527 		head->lio_nent--;
1528 		aiowp->work_req = NULL;
1529 		sig_mutex_unlock(&head->lio_mutex);
1530 		sig_mutex_unlock(&aiowp->work_qlock1);
1531 		sig_mutex_lock(&__aio_mutex);
1532 		_aio_outstand_cnt--;
1533 		_aio_waitn_wakeup();
1534 		sig_mutex_unlock(&__aio_mutex);
1535 		_aio_req_free(reqp);
1536 		return (1);
1537 	}
1538 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1539 	reqp->req_head = NULL;
1540 	if (head->lio_canned)
1541 		reqp->req_state = AIO_REQ_CANCELED;
1542 	if (head->lio_mode == LIO_DESTROY) {
1543 		aiowp->work_req = NULL;
1544 		rval = 1;
1545 	}
1546 	sig_mutex_unlock(&head->lio_mutex);
1547 	sig_mutex_unlock(&aiowp->work_qlock1);
1548 	head->lio_refcnt--;
1549 	head->lio_nent--;
1550 	_aio_lio_free(head);
1551 	if (rval != 0)
1552 		_aio_req_free(reqp);
1553 	return (rval);
1554 }
1555 
1556 /*
1557  * A worker is set idle when its work queue is empty.
1558  * The worker checks again that it has no more work
1559  * and then goes to sleep waiting for more work.
1560  */
1561 int
_aio_idle(aio_worker_t * aiowp)1562 _aio_idle(aio_worker_t *aiowp)
1563 {
1564 	int error = 0;
1565 
1566 	sig_mutex_lock(&aiowp->work_qlock1);
1567 	if (aiowp->work_count1 == 0) {
1568 		ASSERT(aiowp->work_minload1 == 0);
1569 		aiowp->work_idleflg = 1;
1570 		/*
1571 		 * A cancellation handler is not needed here.
1572 		 * aio worker threads are never cancelled via pthread_cancel().
1573 		 */
1574 		error = sig_cond_wait(&aiowp->work_idle_cv,
1575 		    &aiowp->work_qlock1);
1576 		/*
1577 		 * The idle flag is normally cleared before worker is awakened
1578 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1579 		 */
1580 		if (error)
1581 			aiowp->work_idleflg = 0;
1582 	}
1583 	sig_mutex_unlock(&aiowp->work_qlock1);
1584 	return (error);
1585 }
1586 
1587 /*
1588  * A worker's completed AIO requests are placed onto a global
1589  * done queue.  The application is only sent a SIGIO signal if
1590  * the process has a handler enabled and it is not waiting via
1591  * aiowait().
1592  */
1593 static void
_aio_work_done(aio_worker_t * aiowp)1594 _aio_work_done(aio_worker_t *aiowp)
1595 {
1596 	aio_req_t *reqp;
1597 
1598 	sig_mutex_lock(&__aio_mutex);
1599 	sig_mutex_lock(&aiowp->work_qlock1);
1600 	reqp = aiowp->work_prev1;
1601 	reqp->req_next = NULL;
1602 	aiowp->work_done1 = 0;
1603 	aiowp->work_tail1 = aiowp->work_next1;
1604 	if (aiowp->work_tail1 == NULL)
1605 		aiowp->work_head1 = NULL;
1606 	aiowp->work_prev1 = NULL;
1607 	_aio_outstand_cnt--;
1608 	_aio_req_done_cnt--;
1609 	if (reqp->req_state == AIO_REQ_CANCELED) {
1610 		/*
1611 		 * Request got cancelled after it was marked done. This can
1612 		 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1613 		 * and drops all locks. Don't add the request to the done
1614 		 * queue and just discard it.
1615 		 */
1616 		sig_mutex_unlock(&aiowp->work_qlock1);
1617 		_aio_req_free(reqp);
1618 		if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1619 			sig_mutex_unlock(&__aio_mutex);
1620 			(void) _kaio(AIONOTIFY);
1621 		} else {
1622 			sig_mutex_unlock(&__aio_mutex);
1623 		}
1624 		return;
1625 	}
1626 	sig_mutex_unlock(&aiowp->work_qlock1);
1627 	_aio_donecnt++;
1628 	ASSERT(_aio_donecnt > 0 &&
1629 	    _aio_outstand_cnt >= 0 &&
1630 	    _aio_req_done_cnt >= 0);
1631 	ASSERT(reqp != NULL);
1632 
1633 	if (_aio_done_tail == NULL) {
1634 		_aio_done_head = _aio_done_tail = reqp;
1635 	} else {
1636 		_aio_done_head->req_next = reqp;
1637 		_aio_done_head = reqp;
1638 	}
1639 
1640 	if (_aiowait_flag) {
1641 		sig_mutex_unlock(&__aio_mutex);
1642 		(void) _kaio(AIONOTIFY);
1643 	} else {
1644 		sig_mutex_unlock(&__aio_mutex);
1645 		if (_sigio_enabled)
1646 			(void) kill(__pid, SIGIO);
1647 	}
1648 }
1649 
1650 /*
1651  * The done queue consists of AIO requests that are in either the
1652  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1653  * are discarded.  If the done queue is empty then NULL is returned.
1654  * Otherwise the address of a done aio_result_t is returned.
1655  */
1656 aio_result_t *
_aio_req_done(void)1657 _aio_req_done(void)
1658 {
1659 	aio_req_t *reqp;
1660 	aio_result_t *resultp;
1661 
1662 	ASSERT(MUTEX_HELD(&__aio_mutex));
1663 
1664 	if ((reqp = _aio_done_tail) != NULL) {
1665 		if ((_aio_done_tail = reqp->req_next) == NULL)
1666 			_aio_done_head = NULL;
1667 		ASSERT(_aio_donecnt > 0);
1668 		_aio_donecnt--;
1669 		(void) _aio_hash_del(reqp->req_resultp);
1670 		resultp = reqp->req_resultp;
1671 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1672 		_aio_req_free(reqp);
1673 		return (resultp);
1674 	}
1675 	/* is queue empty? */
1676 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1677 		return ((aio_result_t *)-1);
1678 	}
1679 	return (NULL);
1680 }
1681 
1682 /*
1683  * Set the return and errno values for the application's use.
1684  *
1685  * For the Posix interfaces, we must set the return value first followed
1686  * by the errno value because the Posix interfaces allow for a change
1687  * in the errno value from EINPROGRESS to something else to signal
1688  * the completion of the asynchronous request.
1689  *
1690  * The opposite is true for the Solaris interfaces.  These allow for
1691  * a change in the return value from AIO_INPROGRESS to something else
1692  * to signal the completion of the asynchronous request.
1693  */
1694 void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)1695 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1696 {
1697 	aio_result_t *resultp = reqp->req_resultp;
1698 
1699 	if (POSIX_AIO(reqp)) {
1700 		resultp->aio_return = retval;
1701 		membar_producer();
1702 		resultp->aio_errno = error;
1703 	} else {
1704 		resultp->aio_errno = error;
1705 		membar_producer();
1706 		resultp->aio_return = retval;
1707 	}
1708 }
1709 
1710 /*
1711  * Add an AIO request onto the next work queue.
1712  * A circular list of workers is used to choose the next worker.
1713  */
1714 void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)1715 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1716 {
1717 	ulwp_t *self = curthread;
1718 	aio_worker_t *aiowp;
1719 	aio_worker_t *first;
1720 	int load_bal_flg = 1;
1721 	int found;
1722 
1723 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1724 	reqp->req_next = NULL;
1725 	/*
1726 	 * Try to acquire the next worker's work queue.  If it is locked,
1727 	 * then search the list of workers until a queue is found unlocked,
1728 	 * or until the list is completely traversed at which point another
1729 	 * worker will be created.
1730 	 */
1731 	sigoff(self);		/* defer SIGIO */
1732 	sig_mutex_lock(&__aio_mutex);
1733 	first = aiowp = *nextworker;
1734 	if (mode != AIONOTIFY)
1735 		_aio_outstand_cnt++;
1736 	sig_mutex_unlock(&__aio_mutex);
1737 
1738 	switch (mode) {
1739 	case AIOREAD:
1740 	case AIOWRITE:
1741 	case AIOAREAD:
1742 	case AIOAWRITE:
1743 #if !defined(_LP64)
1744 	case AIOAREAD64:
1745 	case AIOAWRITE64:
1746 #endif
1747 		/* try to find an idle worker */
1748 		found = 0;
1749 		do {
1750 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1751 				if (aiowp->work_idleflg) {
1752 					found = 1;
1753 					break;
1754 				}
1755 				sig_mutex_unlock(&aiowp->work_qlock1);
1756 			}
1757 		} while ((aiowp = aiowp->work_forw) != first);
1758 
1759 		if (found) {
1760 			aiowp->work_minload1++;
1761 			break;
1762 		}
1763 
1764 		/* try to acquire some worker's queue lock */
1765 		do {
1766 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1767 				found = 1;
1768 				break;
1769 			}
1770 		} while ((aiowp = aiowp->work_forw) != first);
1771 
1772 		/*
1773 		 * Create more workers when the workers appear overloaded.
1774 		 * Either all the workers are busy draining their queues
1775 		 * or no worker's queue lock could be acquired.
1776 		 */
1777 		if (!found) {
1778 			if (_aio_worker_cnt < _max_workers) {
1779 				if (_aio_create_worker(reqp, mode))
1780 					aio_panic("_aio_req_add: add worker");
1781 				sigon(self);	/* reenable SIGIO */
1782 				return;
1783 			}
1784 
1785 			/*
1786 			 * No worker available and we have created
1787 			 * _max_workers, keep going through the
1788 			 * list slowly until we get a lock
1789 			 */
1790 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1791 				/*
1792 				 * give someone else a chance
1793 				 */
1794 				_aio_delay(1);
1795 				aiowp = aiowp->work_forw;
1796 			}
1797 		}
1798 
1799 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1800 		if (_aio_worker_cnt < _max_workers &&
1801 		    aiowp->work_minload1 >= _minworkload) {
1802 			sig_mutex_unlock(&aiowp->work_qlock1);
1803 			sig_mutex_lock(&__aio_mutex);
1804 			*nextworker = aiowp->work_forw;
1805 			sig_mutex_unlock(&__aio_mutex);
1806 			if (_aio_create_worker(reqp, mode))
1807 				aio_panic("aio_req_add: add worker");
1808 			sigon(self);	/* reenable SIGIO */
1809 			return;
1810 		}
1811 		aiowp->work_minload1++;
1812 		break;
1813 	case AIOFSYNC:
1814 	case AIONOTIFY:
1815 		load_bal_flg = 0;
1816 		sig_mutex_lock(&aiowp->work_qlock1);
1817 		break;
1818 	default:
1819 		aio_panic("_aio_req_add: invalid mode");
1820 		break;
1821 	}
1822 	/*
1823 	 * Put request onto worker's work queue.
1824 	 */
1825 	if (aiowp->work_tail1 == NULL) {
1826 		ASSERT(aiowp->work_count1 == 0);
1827 		aiowp->work_tail1 = reqp;
1828 		aiowp->work_next1 = reqp;
1829 	} else {
1830 		aiowp->work_head1->req_next = reqp;
1831 		if (aiowp->work_next1 == NULL)
1832 			aiowp->work_next1 = reqp;
1833 	}
1834 	reqp->req_state = AIO_REQ_QUEUED;
1835 	reqp->req_worker = aiowp;
1836 	aiowp->work_head1 = reqp;
1837 	/*
1838 	 * Awaken worker if it is not currently active.
1839 	 */
1840 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1841 		aiowp->work_idleflg = 0;
1842 		(void) cond_signal(&aiowp->work_idle_cv);
1843 	}
1844 	sig_mutex_unlock(&aiowp->work_qlock1);
1845 
1846 	if (load_bal_flg) {
1847 		sig_mutex_lock(&__aio_mutex);
1848 		*nextworker = aiowp->work_forw;
1849 		sig_mutex_unlock(&__aio_mutex);
1850 	}
1851 	sigon(self);	/* reenable SIGIO */
1852 }
1853 
1854 /*
1855  * Get an AIO request for a specified worker.
1856  * If the work queue is empty, return NULL.
1857  */
1858 aio_req_t *
_aio_req_get(aio_worker_t * aiowp)1859 _aio_req_get(aio_worker_t *aiowp)
1860 {
1861 	aio_req_t *reqp;
1862 
1863 	sig_mutex_lock(&aiowp->work_qlock1);
1864 	if ((reqp = aiowp->work_next1) != NULL) {
1865 		/*
1866 		 * Remove a POSIX request from the queue; the
1867 		 * request queue is a singularly linked list
1868 		 * with a previous pointer.  The request is
1869 		 * removed by updating the previous pointer.
1870 		 *
1871 		 * Non-posix requests are left on the queue
1872 		 * to eventually be placed on the done queue.
1873 		 */
1874 
1875 		if (POSIX_AIO(reqp)) {
1876 			if (aiowp->work_prev1 == NULL) {
1877 				aiowp->work_tail1 = reqp->req_next;
1878 				if (aiowp->work_tail1 == NULL)
1879 					aiowp->work_head1 = NULL;
1880 			} else {
1881 				aiowp->work_prev1->req_next = reqp->req_next;
1882 				if (aiowp->work_head1 == reqp)
1883 					aiowp->work_head1 = reqp->req_next;
1884 			}
1885 
1886 		} else {
1887 			aiowp->work_prev1 = reqp;
1888 			ASSERT(aiowp->work_done1 >= 0);
1889 			aiowp->work_done1++;
1890 		}
1891 		ASSERT(reqp != reqp->req_next);
1892 		aiowp->work_next1 = reqp->req_next;
1893 		ASSERT(aiowp->work_count1 >= 1);
1894 		aiowp->work_count1--;
1895 		switch (reqp->req_op) {
1896 		case AIOREAD:
1897 		case AIOWRITE:
1898 		case AIOAREAD:
1899 		case AIOAWRITE:
1900 #if !defined(_LP64)
1901 		case AIOAREAD64:
1902 		case AIOAWRITE64:
1903 #endif
1904 			ASSERT(aiowp->work_minload1 > 0);
1905 			aiowp->work_minload1--;
1906 			break;
1907 		}
1908 		reqp->req_state = AIO_REQ_INPROGRESS;
1909 	}
1910 	aiowp->work_req = reqp;
1911 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1912 	sig_mutex_unlock(&aiowp->work_qlock1);
1913 	return (reqp);
1914 }
1915 
1916 static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)1917 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1918 {
1919 	aio_req_t **last;
1920 	aio_req_t *lastrp;
1921 	aio_req_t *next;
1922 
1923 	ASSERT(aiowp != NULL);
1924 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1925 	if (POSIX_AIO(reqp)) {
1926 		if (ostate != AIO_REQ_QUEUED)
1927 			return;
1928 	}
1929 	last = &aiowp->work_tail1;
1930 	lastrp = aiowp->work_tail1;
1931 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1932 	while ((next = *last) != NULL) {
1933 		if (next == reqp) {
1934 			*last = next->req_next;
1935 			if (aiowp->work_next1 == next)
1936 				aiowp->work_next1 = next->req_next;
1937 
1938 			/*
1939 			 * if this is the first request on the queue, move
1940 			 * the lastrp pointer forward.
1941 			 */
1942 			if (lastrp == next)
1943 				lastrp = next->req_next;
1944 
1945 			/*
1946 			 * if this request is pointed by work_head1, then
1947 			 * make work_head1 point to the last request that is
1948 			 * present on the queue.
1949 			 */
1950 			if (aiowp->work_head1 == next)
1951 				aiowp->work_head1 = lastrp;
1952 
1953 			/*
1954 			 * work_prev1 is used only in non posix case and it
1955 			 * points to the current AIO_REQ_INPROGRESS request.
1956 			 * If work_prev1 points to this request which is being
1957 			 * deleted, make work_prev1 NULL and set  work_done1
1958 			 * to 0.
1959 			 *
1960 			 * A worker thread can be processing only one request
1961 			 * at a time.
1962 			 */
1963 			if (aiowp->work_prev1 == next) {
1964 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1965 				    !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1966 					aiowp->work_prev1 = NULL;
1967 					aiowp->work_done1--;
1968 			}
1969 
1970 			if (ostate == AIO_REQ_QUEUED) {
1971 				ASSERT(aiowp->work_count1 >= 1);
1972 				aiowp->work_count1--;
1973 				ASSERT(aiowp->work_minload1 >= 1);
1974 				aiowp->work_minload1--;
1975 			}
1976 			return;
1977 		}
1978 		last = &next->req_next;
1979 		lastrp = next;
1980 	}
1981 	/* NOTREACHED */
1982 }
1983 
1984 static void
_aio_enq_doneq(aio_req_t * reqp)1985 _aio_enq_doneq(aio_req_t *reqp)
1986 {
1987 	if (_aio_doneq == NULL) {
1988 		_aio_doneq = reqp;
1989 		reqp->req_next = reqp->req_prev = reqp;
1990 	} else {
1991 		reqp->req_next = _aio_doneq;
1992 		reqp->req_prev = _aio_doneq->req_prev;
1993 		_aio_doneq->req_prev->req_next = reqp;
1994 		_aio_doneq->req_prev = reqp;
1995 	}
1996 	reqp->req_state = AIO_REQ_DONEQ;
1997 	_aio_doneq_cnt++;
1998 }
1999 
2000 /*
2001  * caller owns the _aio_mutex
2002  */
2003 aio_req_t *
_aio_req_remove(aio_req_t * reqp)2004 _aio_req_remove(aio_req_t *reqp)
2005 {
2006 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2007 		return (NULL);
2008 
2009 	if (reqp) {
2010 		/* request in done queue */
2011 		if (_aio_doneq == reqp)
2012 			_aio_doneq = reqp->req_next;
2013 		if (_aio_doneq == reqp) {
2014 			/* only one request on queue */
2015 			_aio_doneq = NULL;
2016 		} else {
2017 			aio_req_t *tmp = reqp->req_next;
2018 			reqp->req_prev->req_next = tmp;
2019 			tmp->req_prev = reqp->req_prev;
2020 		}
2021 	} else if ((reqp = _aio_doneq) != NULL) {
2022 		if (reqp == reqp->req_next) {
2023 			/* only one request on queue */
2024 			_aio_doneq = NULL;
2025 		} else {
2026 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2027 			_aio_doneq->req_prev = reqp->req_prev;
2028 		}
2029 	}
2030 	if (reqp) {
2031 		_aio_doneq_cnt--;
2032 		reqp->req_next = reqp->req_prev = reqp;
2033 		reqp->req_state = AIO_REQ_DONE;
2034 	}
2035 	return (reqp);
2036 }
2037 
2038 /*
2039  * An AIO request is identified by an aio_result_t pointer.  The library
2040  * maps this aio_result_t pointer to its internal representation using a
2041  * hash table.  This function adds an aio_result_t pointer to the hash table.
2042  */
2043 static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)2044 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2045 {
2046 	aio_hash_t *hashp;
2047 	aio_req_t **prev;
2048 	aio_req_t *next;
2049 
2050 	hashp = _aio_hash + AIOHASH(resultp);
2051 	lmutex_lock(&hashp->hash_lock);
2052 	prev = &hashp->hash_ptr;
2053 	while ((next = *prev) != NULL) {
2054 		if (resultp == next->req_resultp) {
2055 			lmutex_unlock(&hashp->hash_lock);
2056 			return (-1);
2057 		}
2058 		prev = &next->req_link;
2059 	}
2060 	*prev = reqp;
2061 	ASSERT(reqp->req_link == NULL);
2062 	lmutex_unlock(&hashp->hash_lock);
2063 	return (0);
2064 }
2065 
2066 /*
2067  * Remove an entry from the hash table.
2068  */
2069 aio_req_t *
_aio_hash_del(aio_result_t * resultp)2070 _aio_hash_del(aio_result_t *resultp)
2071 {
2072 	aio_hash_t *hashp;
2073 	aio_req_t **prev;
2074 	aio_req_t *next = NULL;
2075 
2076 	if (_aio_hash != NULL) {
2077 		hashp = _aio_hash + AIOHASH(resultp);
2078 		lmutex_lock(&hashp->hash_lock);
2079 		prev = &hashp->hash_ptr;
2080 		while ((next = *prev) != NULL) {
2081 			if (resultp == next->req_resultp) {
2082 				*prev = next->req_link;
2083 				next->req_link = NULL;
2084 				break;
2085 			}
2086 			prev = &next->req_link;
2087 		}
2088 		lmutex_unlock(&hashp->hash_lock);
2089 	}
2090 	return (next);
2091 }
2092 
2093 /*
2094  *  find an entry in the hash table
2095  */
2096 aio_req_t *
_aio_hash_find(aio_result_t * resultp)2097 _aio_hash_find(aio_result_t *resultp)
2098 {
2099 	aio_hash_t *hashp;
2100 	aio_req_t **prev;
2101 	aio_req_t *next = NULL;
2102 
2103 	if (_aio_hash != NULL) {
2104 		hashp = _aio_hash + AIOHASH(resultp);
2105 		lmutex_lock(&hashp->hash_lock);
2106 		prev = &hashp->hash_ptr;
2107 		while ((next = *prev) != NULL) {
2108 			if (resultp == next->req_resultp)
2109 				break;
2110 			prev = &next->req_link;
2111 		}
2112 		lmutex_unlock(&hashp->hash_lock);
2113 	}
2114 	return (next);
2115 }
2116 
2117 /*
2118  * AIO interface for POSIX
2119  */
2120 int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2121 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2122     int mode, int flg)
2123 {
2124 	aio_req_t *reqp;
2125 	aio_args_t *ap;
2126 	int kerr;
2127 
2128 	if (aiocbp == NULL) {
2129 		errno = EINVAL;
2130 		return (-1);
2131 	}
2132 
2133 	/* initialize kaio */
2134 	if (!_kaio_ok)
2135 		_kaio_init();
2136 
2137 	aiocbp->aio_state = NOCHECK;
2138 
2139 	/*
2140 	 * If we have been called because a list I/O
2141 	 * kaio() failed, we dont want to repeat the
2142 	 * system call
2143 	 */
2144 
2145 	if (flg & AIO_KAIO) {
2146 		/*
2147 		 * Try kernel aio first.
2148 		 * If errno is ENOTSUP/EBADFD,
2149 		 * fall back to the thread implementation.
2150 		 */
2151 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2152 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2153 			aiocbp->aio_state = CHECK;
2154 			kerr = (int)_kaio(mode, aiocbp);
2155 			if (kerr == 0)
2156 				return (0);
2157 			if (errno != ENOTSUP && errno != EBADFD) {
2158 				aiocbp->aio_resultp.aio_errno = errno;
2159 				aiocbp->aio_resultp.aio_return = -1;
2160 				aiocbp->aio_state = NOCHECK;
2161 				return (-1);
2162 			}
2163 			if (errno == EBADFD)
2164 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2165 		}
2166 	}
2167 
2168 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2169 	aiocbp->aio_state = USERAIO;
2170 
2171 	if (!__uaio_ok && __uaio_init() == -1)
2172 		return (-1);
2173 
2174 	if ((reqp = _aio_req_alloc()) == NULL) {
2175 		errno = EAGAIN;
2176 		return (-1);
2177 	}
2178 
2179 	/*
2180 	 * If an LIO request, add the list head to the aio request
2181 	 */
2182 	reqp->req_head = lio_head;
2183 	reqp->req_type = AIO_POSIX_REQ;
2184 	reqp->req_op = mode;
2185 	reqp->req_largefile = 0;
2186 
2187 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2188 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2189 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2190 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2191 		reqp->req_sigevent.sigev_signo =
2192 		    aiocbp->aio_sigevent.sigev_signo;
2193 		reqp->req_sigevent.sigev_value.sival_ptr =
2194 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2195 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2196 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2197 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2198 		/*
2199 		 * Reuse the sigevent structure to contain the port number
2200 		 * and the user value.  Same for SIGEV_THREAD, below.
2201 		 */
2202 		reqp->req_sigevent.sigev_signo =
2203 		    pn->portnfy_port;
2204 		reqp->req_sigevent.sigev_value.sival_ptr =
2205 		    pn->portnfy_user;
2206 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2207 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2208 		/*
2209 		 * The sigevent structure contains the port number
2210 		 * and the user value.  Same for SIGEV_PORT, above.
2211 		 */
2212 		reqp->req_sigevent.sigev_signo =
2213 		    aiocbp->aio_sigevent.sigev_signo;
2214 		reqp->req_sigevent.sigev_value.sival_ptr =
2215 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2216 	}
2217 
2218 	reqp->req_resultp = &aiocbp->aio_resultp;
2219 	reqp->req_aiocbp = aiocbp;
2220 	ap = &reqp->req_args;
2221 	ap->fd = aiocbp->aio_fildes;
2222 	ap->buf = (caddr_t)aiocbp->aio_buf;
2223 	ap->bufsz = aiocbp->aio_nbytes;
2224 	ap->offset = aiocbp->aio_offset;
2225 
2226 	if ((flg & AIO_NO_DUPS) &&
2227 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2228 		aio_panic("_aio_rw(): request already in hash table");
2229 	}
2230 	_aio_req_add(reqp, nextworker, mode);
2231 	return (0);
2232 }
2233 
2234 #if !defined(_LP64)
2235 /*
2236  * 64-bit AIO interface for POSIX
2237  */
2238 int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2239 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2240     int mode, int flg)
2241 {
2242 	aio_req_t *reqp;
2243 	aio_args_t *ap;
2244 	int kerr;
2245 
2246 	if (aiocbp == NULL) {
2247 		errno = EINVAL;
2248 		return (-1);
2249 	}
2250 
2251 	/* initialize kaio */
2252 	if (!_kaio_ok)
2253 		_kaio_init();
2254 
2255 	aiocbp->aio_state = NOCHECK;
2256 
2257 	/*
2258 	 * If we have been called because a list I/O
2259 	 * kaio() failed, we dont want to repeat the
2260 	 * system call
2261 	 */
2262 
2263 	if (flg & AIO_KAIO) {
2264 		/*
2265 		 * Try kernel aio first.
2266 		 * If errno is ENOTSUP/EBADFD,
2267 		 * fall back to the thread implementation.
2268 		 */
2269 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2270 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2271 			aiocbp->aio_state = CHECK;
2272 			kerr = (int)_kaio(mode, aiocbp);
2273 			if (kerr == 0)
2274 				return (0);
2275 			if (errno != ENOTSUP && errno != EBADFD) {
2276 				aiocbp->aio_resultp.aio_errno = errno;
2277 				aiocbp->aio_resultp.aio_return = -1;
2278 				aiocbp->aio_state = NOCHECK;
2279 				return (-1);
2280 			}
2281 			if (errno == EBADFD)
2282 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2283 		}
2284 	}
2285 
2286 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2287 	aiocbp->aio_state = USERAIO;
2288 
2289 	if (!__uaio_ok && __uaio_init() == -1)
2290 		return (-1);
2291 
2292 	if ((reqp = _aio_req_alloc()) == NULL) {
2293 		errno = EAGAIN;
2294 		return (-1);
2295 	}
2296 
2297 	/*
2298 	 * If an LIO request, add the list head to the aio request
2299 	 */
2300 	reqp->req_head = lio_head;
2301 	reqp->req_type = AIO_POSIX_REQ;
2302 	reqp->req_op = mode;
2303 	reqp->req_largefile = 1;
2304 
2305 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2306 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2307 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2308 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2309 		reqp->req_sigevent.sigev_signo =
2310 		    aiocbp->aio_sigevent.sigev_signo;
2311 		reqp->req_sigevent.sigev_value.sival_ptr =
2312 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2313 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2314 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2315 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2316 		reqp->req_sigevent.sigev_signo =
2317 		    pn->portnfy_port;
2318 		reqp->req_sigevent.sigev_value.sival_ptr =
2319 		    pn->portnfy_user;
2320 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2321 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2322 		reqp->req_sigevent.sigev_signo =
2323 		    aiocbp->aio_sigevent.sigev_signo;
2324 		reqp->req_sigevent.sigev_value.sival_ptr =
2325 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2326 	}
2327 
2328 	reqp->req_resultp = &aiocbp->aio_resultp;
2329 	reqp->req_aiocbp = aiocbp;
2330 	ap = &reqp->req_args;
2331 	ap->fd = aiocbp->aio_fildes;
2332 	ap->buf = (caddr_t)aiocbp->aio_buf;
2333 	ap->bufsz = aiocbp->aio_nbytes;
2334 	ap->offset = aiocbp->aio_offset;
2335 
2336 	if ((flg & AIO_NO_DUPS) &&
2337 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2338 		aio_panic("_aio_rw64(): request already in hash table");
2339 	}
2340 	_aio_req_add(reqp, nextworker, mode);
2341 	return (0);
2342 }
2343 #endif	/* !defined(_LP64) */
2344