xref: /illumos-gate/usr/src/lib/libc/port/aio/aio.c (revision 4763305e3243687c189d755d737d52205b2614ed)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include "lint.h"
28 #include "thr_uberdata.h"
29 #include "libc.h"
30 #include "asyncio.h"
31 #include <atomic.h>
32 #include <sys/param.h>
33 #include <sys/file.h>
34 #include <sys/port.h>
35 
36 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
37 static aio_req_t *_aio_req_get(aio_worker_t *);
38 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
39 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
40 static void _aio_work_done(aio_worker_t *);
41 static void _aio_enq_doneq(aio_req_t *);
42 
43 extern void _aio_lio_free(aio_lio_t *);
44 
45 extern int __fcntl(int, int, ...);
46 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
47 
48 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
49 static void _aiodone(aio_req_t *, ssize_t, int);
50 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
51 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
52 
53 /*
54  * switch for kernel async I/O
55  */
56 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
57 
58 /*
59  * Key for thread-specific data
60  */
61 pthread_key_t _aio_key;
62 
63 /*
64  * Array for determining whether or not a file supports kaio.
65  * Initialized in _kaio_init().
66  */
67 uint32_t *_kaio_supported = NULL;
68 
69 /*
70  *  workers for read/write requests
71  * (__aio_mutex lock protects circular linked list of workers)
72  */
73 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
74 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
75 int __rw_workerscnt;		/* number of read/write workers */
76 
77 /*
78  * worker for notification requests.
79  */
80 aio_worker_t *__workers_no;	/* circular list of AIO workers */
81 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
82 int __no_workerscnt;		/* number of write workers */
83 
84 aio_req_t *_aio_done_tail;		/* list of done requests */
85 aio_req_t *_aio_done_head;
86 
87 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
88 cond_t __aio_initcv = DEFAULTCV;
89 int __aio_initbusy = 0;
90 
91 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
92 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
93 
94 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
95 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
96 
97 aio_hash_t *_aio_hash;
98 
99 aio_req_t *_aio_doneq;			/* double linked done queue list */
100 
101 int _aio_donecnt = 0;
102 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
103 int _aio_doneq_cnt = 0;
104 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
105 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
106 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
107 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
108 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
109 
110 int _max_workers = 256;			/* max number of workers permitted */
111 int _min_workers = 4;			/* min number of workers */
112 int _minworkload = 2;			/* min number of request in q */
113 int _aio_worker_cnt = 0;		/* number of workers to do requests */
114 int __uaio_ok = 0;			/* AIO has been enabled */
115 sigset_t _worker_set;			/* worker's signal mask */
116 
117 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
118 int _aio_flags = 0;			/* see asyncio.h defines for */
119 
120 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
121 
122 int hz;					/* clock ticks per second */
123 
124 static int
_kaio_supported_init(void)125 _kaio_supported_init(void)
126 {
127 	void *ptr;
128 	size_t size;
129 
130 	if (_kaio_supported != NULL)	/* already initialized */
131 		return (0);
132 
133 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
134 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
135 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
136 	if (ptr == MAP_FAILED)
137 		return (-1);
138 	_kaio_supported = ptr;
139 	return (0);
140 }
141 
142 /*
143  * The aio subsystem is initialized when an AIO request is made.
144  * Constants are initialized like the max number of workers that
145  * the subsystem can create, and the minimum number of workers
146  * permitted before imposing some restrictions.  Also, some
147  * workers are created.
148  */
149 int
__uaio_init(void)150 __uaio_init(void)
151 {
152 	int ret = -1;
153 	int i;
154 	int cancel_state;
155 
156 	lmutex_lock(&__aio_initlock);
157 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
158 	while (__aio_initbusy)
159 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
160 	(void) pthread_setcancelstate(cancel_state, NULL);
161 	if (__uaio_ok) {	/* already initialized */
162 		lmutex_unlock(&__aio_initlock);
163 		return (0);
164 	}
165 	__aio_initbusy = 1;
166 	lmutex_unlock(&__aio_initlock);
167 
168 	hz = (int)sysconf(_SC_CLK_TCK);
169 	__pid = getpid();
170 
171 	setup_cancelsig(SIGAIOCANCEL);
172 
173 	if (_kaio_supported_init() != 0)
174 		goto out;
175 
176 	/*
177 	 * Allocate and initialize the hash table.
178 	 * Do this only once, even if __uaio_init() is called twice.
179 	 */
180 	if (_aio_hash == NULL) {
181 		/* LINTED pointer cast */
182 		_aio_hash = (aio_hash_t *)mmap(NULL,
183 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
184 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
185 		if ((void *)_aio_hash == MAP_FAILED) {
186 			_aio_hash = NULL;
187 			goto out;
188 		}
189 		for (i = 0; i < HASHSZ; i++)
190 			(void) mutex_init(&_aio_hash[i].hash_lock,
191 			    USYNC_THREAD, NULL);
192 	}
193 
194 	/*
195 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
196 	 */
197 	(void) sigfillset(&_worker_set);
198 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
199 
200 	/*
201 	 * Create one worker to send asynchronous notifications.
202 	 * Do this only once, even if __uaio_init() is called twice.
203 	 */
204 	if (__no_workerscnt == 0 &&
205 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
206 		errno = EAGAIN;
207 		goto out;
208 	}
209 
210 	/*
211 	 * Create the minimum number of read/write workers.
212 	 * And later check whether atleast one worker is created;
213 	 * lwp_create() calls could fail because of segkp exhaustion.
214 	 */
215 	for (i = 0; i < _min_workers; i++)
216 		(void) _aio_create_worker(NULL, AIOREAD);
217 	if (__rw_workerscnt == 0) {
218 		errno = EAGAIN;
219 		goto out;
220 	}
221 
222 	ret = 0;
223 out:
224 	lmutex_lock(&__aio_initlock);
225 	if (ret == 0)
226 		__uaio_ok = 1;
227 	__aio_initbusy = 0;
228 	(void) cond_broadcast(&__aio_initcv);
229 	lmutex_unlock(&__aio_initlock);
230 	return (ret);
231 }
232 
233 /*
234  * Called from close() before actually performing the real _close().
235  */
236 void
_aio_close(int fd)237 _aio_close(int fd)
238 {
239 	if (fd < 0)	/* avoid cancelling everything */
240 		return;
241 	/*
242 	 * Cancel all outstanding aio requests for this file descriptor.
243 	 */
244 	if (__uaio_ok)
245 		(void) aiocancel_all(fd);
246 	/*
247 	 * If we have allocated the bit array, clear the bit for this file.
248 	 * The next open may re-use this file descriptor and the new file
249 	 * may have different kaio() behaviour.
250 	 */
251 	if (_kaio_supported != NULL)
252 		CLEAR_KAIO_SUPPORTED(fd);
253 }
254 
255 /*
256  * special kaio cleanup thread sits in a loop in the
257  * kernel waiting for pending kaio requests to complete.
258  */
259 void *
_kaio_cleanup_thread(void * arg)260 _kaio_cleanup_thread(void *arg)
261 {
262 	if (pthread_setspecific(_aio_key, arg) != 0)
263 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
264 	(void) _kaio(AIOSTART);
265 	return (arg);
266 }
267 
268 /*
269  * initialize kaio.
270  */
271 void
_kaio_init()272 _kaio_init()
273 {
274 	int error;
275 	sigset_t oset;
276 	int cancel_state;
277 
278 	lmutex_lock(&__aio_initlock);
279 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
280 	while (__aio_initbusy)
281 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
282 	(void) pthread_setcancelstate(cancel_state, NULL);
283 	if (_kaio_ok) {		/* already initialized */
284 		lmutex_unlock(&__aio_initlock);
285 		return;
286 	}
287 	__aio_initbusy = 1;
288 	lmutex_unlock(&__aio_initlock);
289 
290 	if (_kaio_supported_init() != 0)
291 		error = ENOMEM;
292 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
293 		error = ENOMEM;
294 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
295 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
296 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
297 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
298 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
299 	}
300 	if (error && _kaiowp != NULL) {
301 		_aio_worker_free(_kaiowp);
302 		_kaiowp = NULL;
303 	}
304 
305 	lmutex_lock(&__aio_initlock);
306 	if (error)
307 		_kaio_ok = -1;
308 	else
309 		_kaio_ok = 1;
310 	__aio_initbusy = 0;
311 	(void) cond_broadcast(&__aio_initcv);
312 	lmutex_unlock(&__aio_initlock);
313 }
314 
315 int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)316 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
317     aio_result_t *resultp)
318 {
319 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
320 }
321 
322 int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)323 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
324     aio_result_t *resultp)
325 {
326 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
327 }
328 
329 #if !defined(_LP64)
330 int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)331 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
332     aio_result_t *resultp)
333 {
334 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
335 }
336 
337 int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)338 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
339     aio_result_t *resultp)
340 {
341 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
342 }
343 #endif	/* !defined(_LP64) */
344 
345 int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)346 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
347     aio_result_t *resultp, int mode)
348 {
349 	aio_req_t *reqp;
350 	aio_args_t *ap;
351 	offset_t loffset;
352 	struct stat64 stat64;
353 	int error = 0;
354 	int kerr;
355 	int umode;
356 
357 	switch (whence) {
358 
359 	case SEEK_SET:
360 		loffset = offset;
361 		break;
362 	case SEEK_CUR:
363 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
364 			error = -1;
365 		else
366 			loffset += offset;
367 		break;
368 	case SEEK_END:
369 		if (fstat64(fd, &stat64) == -1)
370 			error = -1;
371 		else
372 			loffset = offset + stat64.st_size;
373 		break;
374 	default:
375 		errno = EINVAL;
376 		error = -1;
377 	}
378 
379 	if (error)
380 		return (error);
381 
382 	/* initialize kaio */
383 	if (!_kaio_ok)
384 		_kaio_init();
385 
386 	/*
387 	 * _aio_do_request() needs the original request code (mode) to be able
388 	 * to choose the appropiate 32/64 bit function.  All other functions
389 	 * only require the difference between READ and WRITE (umode).
390 	 */
391 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
392 		umode = mode - AIOAREAD64;
393 	else
394 		umode = mode;
395 
396 	/*
397 	 * Try kernel aio first.
398 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
399 	 */
400 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
401 		resultp->aio_errno = 0;
402 		sig_mutex_lock(&__aio_mutex);
403 		_kaio_outstand_cnt++;
404 		sig_mutex_unlock(&__aio_mutex);
405 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
406 		    (umode | AIO_POLL_BIT) : umode),
407 		    fd, buf, bufsz, loffset, resultp);
408 		if (kerr == 0) {
409 			return (0);
410 		}
411 		sig_mutex_lock(&__aio_mutex);
412 		_kaio_outstand_cnt--;
413 		sig_mutex_unlock(&__aio_mutex);
414 		if (errno != ENOTSUP && errno != EBADFD)
415 			return (-1);
416 		if (errno == EBADFD)
417 			SET_KAIO_NOT_SUPPORTED(fd);
418 	}
419 
420 	if (!__uaio_ok && __uaio_init() == -1)
421 		return (-1);
422 
423 	if ((reqp = _aio_req_alloc()) == NULL) {
424 		errno = EAGAIN;
425 		return (-1);
426 	}
427 
428 	/*
429 	 * _aio_do_request() checks reqp->req_op to differentiate
430 	 * between 32 and 64 bit access.
431 	 */
432 	reqp->req_op = mode;
433 	reqp->req_resultp = resultp;
434 	ap = &reqp->req_args;
435 	ap->fd = fd;
436 	ap->buf = buf;
437 	ap->bufsz = bufsz;
438 	ap->offset = loffset;
439 
440 	if (_aio_hash_insert(resultp, reqp) != 0) {
441 		_aio_req_free(reqp);
442 		errno = EINVAL;
443 		return (-1);
444 	}
445 	/*
446 	 * _aio_req_add() only needs the difference between READ and
447 	 * WRITE to choose the right worker queue.
448 	 */
449 	_aio_req_add(reqp, &__nextworker_rw, umode);
450 	return (0);
451 }
452 
453 int
aiocancel(aio_result_t * resultp)454 aiocancel(aio_result_t *resultp)
455 {
456 	aio_req_t *reqp;
457 	aio_worker_t *aiowp;
458 	int ret;
459 	int done = 0;
460 	int canceled = 0;
461 
462 	if (!__uaio_ok) {
463 		errno = EINVAL;
464 		return (-1);
465 	}
466 
467 	sig_mutex_lock(&__aio_mutex);
468 	reqp = _aio_hash_find(resultp);
469 	if (reqp == NULL) {
470 		if (_aio_outstand_cnt == _aio_req_done_cnt)
471 			errno = EINVAL;
472 		else
473 			errno = EACCES;
474 		ret = -1;
475 	} else {
476 		aiowp = reqp->req_worker;
477 		sig_mutex_lock(&aiowp->work_qlock1);
478 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
479 		sig_mutex_unlock(&aiowp->work_qlock1);
480 
481 		if (canceled) {
482 			ret = 0;
483 		} else {
484 			if (_aio_outstand_cnt == 0 ||
485 			    _aio_outstand_cnt == _aio_req_done_cnt)
486 				errno = EINVAL;
487 			else
488 				errno = EACCES;
489 			ret = -1;
490 		}
491 	}
492 	sig_mutex_unlock(&__aio_mutex);
493 	return (ret);
494 }
495 
496 static void
_aiowait_cleanup(void * arg __unused)497 _aiowait_cleanup(void *arg __unused)
498 {
499 	sig_mutex_lock(&__aio_mutex);
500 	_aiowait_flag--;
501 	sig_mutex_unlock(&__aio_mutex);
502 }
503 
504 /*
505  * This must be asynch safe and cancel safe
506  */
507 aio_result_t *
aiowait(struct timeval * uwait)508 aiowait(struct timeval *uwait)
509 {
510 	aio_result_t *uresultp;
511 	aio_result_t *kresultp;
512 	aio_result_t *resultp;
513 	int dontblock;
514 	int timedwait = 0;
515 	int kaio_errno = 0;
516 	struct timeval twait;
517 	struct timeval *wait = NULL;
518 	hrtime_t hrtend;
519 	hrtime_t hres;
520 
521 	if (uwait) {
522 		/*
523 		 * Check for a valid specified wait time.
524 		 * If it is invalid, fail the call right away.
525 		 */
526 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
527 		    uwait->tv_usec >= MICROSEC) {
528 			errno = EINVAL;
529 			return ((aio_result_t *)-1);
530 		}
531 
532 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
533 			hrtend = gethrtime() +
534 			    (hrtime_t)uwait->tv_sec * NANOSEC +
535 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
536 			twait = *uwait;
537 			wait = &twait;
538 			timedwait++;
539 		} else {
540 			/* polling */
541 			sig_mutex_lock(&__aio_mutex);
542 			if (_kaio_outstand_cnt == 0) {
543 				kresultp = (aio_result_t *)-1;
544 			} else {
545 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
546 				    (struct timeval *)-1, 1);
547 				if (kresultp != (aio_result_t *)-1 &&
548 				    kresultp != NULL &&
549 				    kresultp != (aio_result_t *)1) {
550 					_kaio_outstand_cnt--;
551 					sig_mutex_unlock(&__aio_mutex);
552 					return (kresultp);
553 				}
554 			}
555 			uresultp = _aio_req_done();
556 			sig_mutex_unlock(&__aio_mutex);
557 			if (uresultp != NULL &&
558 			    uresultp != (aio_result_t *)-1) {
559 				return (uresultp);
560 			}
561 			if (uresultp == (aio_result_t *)-1 &&
562 			    kresultp == (aio_result_t *)-1) {
563 				errno = EINVAL;
564 				return ((aio_result_t *)-1);
565 			} else {
566 				return (NULL);
567 			}
568 		}
569 	}
570 
571 	for (;;) {
572 		sig_mutex_lock(&__aio_mutex);
573 		uresultp = _aio_req_done();
574 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
575 			sig_mutex_unlock(&__aio_mutex);
576 			resultp = uresultp;
577 			break;
578 		}
579 		_aiowait_flag++;
580 		dontblock = (uresultp == (aio_result_t *)-1);
581 		if (dontblock && _kaio_outstand_cnt == 0) {
582 			kresultp = (aio_result_t *)-1;
583 			kaio_errno = EINVAL;
584 		} else {
585 			sig_mutex_unlock(&__aio_mutex);
586 			pthread_cleanup_push(_aiowait_cleanup, NULL);
587 			_cancel_prologue();
588 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
589 			    wait, dontblock);
590 			_cancel_epilogue();
591 			pthread_cleanup_pop(0);
592 			sig_mutex_lock(&__aio_mutex);
593 			kaio_errno = errno;
594 		}
595 		_aiowait_flag--;
596 		sig_mutex_unlock(&__aio_mutex);
597 		if (kresultp == (aio_result_t *)1) {
598 			/* aiowait() awakened by an aionotify() */
599 			continue;
600 		} else if (kresultp != NULL &&
601 		    kresultp != (aio_result_t *)-1) {
602 			resultp = kresultp;
603 			sig_mutex_lock(&__aio_mutex);
604 			_kaio_outstand_cnt--;
605 			sig_mutex_unlock(&__aio_mutex);
606 			break;
607 		} else if (kresultp == (aio_result_t *)-1 &&
608 		    kaio_errno == EINVAL &&
609 		    uresultp == (aio_result_t *)-1) {
610 			errno = kaio_errno;
611 			resultp = (aio_result_t *)-1;
612 			break;
613 		} else if (kresultp == (aio_result_t *)-1 &&
614 		    kaio_errno == EINTR) {
615 			errno = kaio_errno;
616 			resultp = (aio_result_t *)-1;
617 			break;
618 		} else if (timedwait) {
619 			hres = hrtend - gethrtime();
620 			if (hres <= 0) {
621 				/* time is up; return */
622 				resultp = NULL;
623 				break;
624 			} else {
625 				/*
626 				 * Some time left.  Round up the remaining time
627 				 * in nanoseconds to microsec.  Retry the call.
628 				 */
629 				hres += (NANOSEC / MICROSEC) - 1;
630 				wait->tv_sec = hres / NANOSEC;
631 				wait->tv_usec =
632 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
633 			}
634 		} else {
635 			ASSERT(kresultp == NULL && uresultp == NULL);
636 			resultp = NULL;
637 			continue;
638 		}
639 	}
640 	return (resultp);
641 }
642 
643 /*
644  * _aio_get_timedelta calculates the remaining time and stores the result
645  * into timespec_t *wait.
646  */
647 
648 int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)649 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
650 {
651 	int	ret = 0;
652 	struct	timeval cur;
653 	timespec_t curtime;
654 
655 	(void) gettimeofday(&cur, NULL);
656 	curtime.tv_sec = cur.tv_sec;
657 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
658 
659 	if (end->tv_sec >= curtime.tv_sec) {
660 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
661 		if (end->tv_nsec >= curtime.tv_nsec) {
662 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
663 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
664 				ret = -1;	/* timer expired */
665 		} else {
666 			if (end->tv_sec > curtime.tv_sec) {
667 				wait->tv_sec -= 1;
668 				wait->tv_nsec = NANOSEC -
669 				    (curtime.tv_nsec - end->tv_nsec);
670 			} else {
671 				ret = -1;	/* timer expired */
672 			}
673 		}
674 	} else {
675 		ret = -1;
676 	}
677 	return (ret);
678 }
679 
680 /*
681  * If closing by file descriptor: we will simply cancel all the outstanding
682  * aio`s and return.  Those aio's in question will have either noticed the
683  * cancellation notice before, during, or after initiating io.
684  */
685 int
aiocancel_all(int fd)686 aiocancel_all(int fd)
687 {
688 	aio_req_t *reqp;
689 	aio_req_t **reqpp, *last;
690 	aio_worker_t *first;
691 	aio_worker_t *next;
692 	int canceled = 0;
693 	int done = 0;
694 	int cancelall = 0;
695 
696 	sig_mutex_lock(&__aio_mutex);
697 
698 	if (_aio_outstand_cnt == 0) {
699 		sig_mutex_unlock(&__aio_mutex);
700 		return (AIO_ALLDONE);
701 	}
702 
703 	/*
704 	 * Cancel requests from the read/write workers' queues.
705 	 */
706 	first = __nextworker_rw;
707 	next = first;
708 	do {
709 		_aio_cancel_work(next, fd, &canceled, &done);
710 	} while ((next = next->work_forw) != first);
711 
712 	/*
713 	 * finally, check if there are requests on the done queue that
714 	 * should be canceled.
715 	 */
716 	if (fd < 0)
717 		cancelall = 1;
718 	reqpp = &_aio_done_tail;
719 	last = _aio_done_tail;
720 	while ((reqp = *reqpp) != NULL) {
721 		if (cancelall || reqp->req_args.fd == fd) {
722 			*reqpp = reqp->req_next;
723 			if (last == reqp) {
724 				last = reqp->req_next;
725 			}
726 			if (_aio_done_head == reqp) {
727 				/* this should be the last req in list */
728 				_aio_done_head = last;
729 			}
730 			_aio_donecnt--;
731 			_aio_set_result(reqp, -1, ECANCELED);
732 			(void) _aio_hash_del(reqp->req_resultp);
733 			_aio_req_free(reqp);
734 		} else {
735 			reqpp = &reqp->req_next;
736 			last = reqp;
737 		}
738 	}
739 
740 	if (cancelall) {
741 		ASSERT(_aio_donecnt == 0);
742 		_aio_done_head = NULL;
743 	}
744 	sig_mutex_unlock(&__aio_mutex);
745 
746 	if (canceled && done == 0)
747 		return (AIO_CANCELED);
748 	else if (done && canceled == 0)
749 		return (AIO_ALLDONE);
750 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
751 		return ((int)_kaio(AIOCANCEL, fd, NULL));
752 	return (AIO_NOTCANCELED);
753 }
754 
755 /*
756  * Cancel requests from a given work queue.  If the file descriptor
757  * parameter, fd, is non-negative, then only cancel those requests
758  * in this queue that are to this file descriptor.  If the fd
759  * parameter is -1, then cancel all requests.
760  */
761 static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)762 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
763 {
764 	aio_req_t *reqp;
765 
766 	sig_mutex_lock(&aiowp->work_qlock1);
767 	/*
768 	 * cancel queued requests first.
769 	 */
770 	reqp = aiowp->work_tail1;
771 	while (reqp != NULL) {
772 		if (fd < 0 || reqp->req_args.fd == fd) {
773 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
774 				/*
775 				 * Callers locks were dropped.
776 				 * reqp is invalid; start traversing
777 				 * the list from the beginning again.
778 				 */
779 				reqp = aiowp->work_tail1;
780 				continue;
781 			}
782 		}
783 		reqp = reqp->req_next;
784 	}
785 	/*
786 	 * Since the queued requests have been canceled, there can
787 	 * only be one inprogress request that should be canceled.
788 	 */
789 	if ((reqp = aiowp->work_req) != NULL &&
790 	    (fd < 0 || reqp->req_args.fd == fd))
791 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
792 	sig_mutex_unlock(&aiowp->work_qlock1);
793 }
794 
795 /*
796  * Cancel a request.  Return 1 if the callers locks were temporarily
797  * dropped, otherwise return 0.
798  */
799 int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)800 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
801 {
802 	int ostate = reqp->req_state;
803 
804 	ASSERT(MUTEX_HELD(&__aio_mutex));
805 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
806 	if (ostate == AIO_REQ_CANCELED)
807 		return (0);
808 	if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
809 	    aiowp->work_prev1 == reqp) {
810 		ASSERT(aiowp->work_done1 != 0);
811 		/*
812 		 * If not on the done queue yet, just mark it CANCELED,
813 		 * _aio_work_done() will do the necessary clean up.
814 		 * This is required to ensure that aiocancel_all() cancels
815 		 * all the outstanding requests, including this one which
816 		 * is not yet on done queue but has been marked done.
817 		 */
818 		_aio_set_result(reqp, -1, ECANCELED);
819 		(void) _aio_hash_del(reqp->req_resultp);
820 		reqp->req_state = AIO_REQ_CANCELED;
821 		(*canceled)++;
822 		return (0);
823 	}
824 
825 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
826 		(*done)++;
827 		return (0);
828 	}
829 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
830 		ASSERT(POSIX_AIO(reqp));
831 		/* Cancel the queued aio_fsync() request */
832 		if (!reqp->req_head->lio_canned) {
833 			reqp->req_head->lio_canned = 1;
834 			_aio_outstand_cnt--;
835 			(*canceled)++;
836 		}
837 		return (0);
838 	}
839 	reqp->req_state = AIO_REQ_CANCELED;
840 	_aio_req_del(aiowp, reqp, ostate);
841 	(void) _aio_hash_del(reqp->req_resultp);
842 	(*canceled)++;
843 	if (reqp == aiowp->work_req) {
844 		ASSERT(ostate == AIO_REQ_INPROGRESS);
845 		/*
846 		 * Set the result values now, before _aiodone() is called.
847 		 * We do this because the application can expect aio_return
848 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
849 		 * immediately after a successful return from aiocancel()
850 		 * or aio_cancel().
851 		 */
852 		_aio_set_result(reqp, -1, ECANCELED);
853 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
854 		return (0);
855 	}
856 	if (!POSIX_AIO(reqp)) {
857 		_aio_outstand_cnt--;
858 		_aio_set_result(reqp, -1, ECANCELED);
859 		_aio_req_free(reqp);
860 		return (0);
861 	}
862 	sig_mutex_unlock(&aiowp->work_qlock1);
863 	sig_mutex_unlock(&__aio_mutex);
864 	_aiodone(reqp, -1, ECANCELED);
865 	sig_mutex_lock(&__aio_mutex);
866 	sig_mutex_lock(&aiowp->work_qlock1);
867 	return (1);
868 }
869 
870 int
_aio_create_worker(aio_req_t * reqp,int mode)871 _aio_create_worker(aio_req_t *reqp, int mode)
872 {
873 	aio_worker_t *aiowp, **workers, **nextworker;
874 	int *aio_workerscnt;
875 	void *(*func)(void *);
876 	sigset_t oset;
877 	int error;
878 
879 	/*
880 	 * Put the new worker thread in the right queue.
881 	 */
882 	switch (mode) {
883 	case AIOREAD:
884 	case AIOWRITE:
885 	case AIOAREAD:
886 	case AIOAWRITE:
887 #if !defined(_LP64)
888 	case AIOAREAD64:
889 	case AIOAWRITE64:
890 #endif
891 		workers = &__workers_rw;
892 		nextworker = &__nextworker_rw;
893 		aio_workerscnt = &__rw_workerscnt;
894 		func = _aio_do_request;
895 		break;
896 	case AIONOTIFY:
897 		workers = &__workers_no;
898 		nextworker = &__nextworker_no;
899 		func = _aio_do_notify;
900 		aio_workerscnt = &__no_workerscnt;
901 		break;
902 	default:
903 		aio_panic("_aio_create_worker: invalid mode");
904 		break;
905 	}
906 
907 	if ((aiowp = _aio_worker_alloc()) == NULL)
908 		return (-1);
909 
910 	if (reqp) {
911 		reqp->req_state = AIO_REQ_QUEUED;
912 		reqp->req_worker = aiowp;
913 		aiowp->work_head1 = reqp;
914 		aiowp->work_tail1 = reqp;
915 		aiowp->work_next1 = reqp;
916 		aiowp->work_count1 = 1;
917 		aiowp->work_minload1 = 1;
918 	}
919 
920 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
921 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
922 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
923 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
924 	if (error) {
925 		if (reqp) {
926 			reqp->req_state = 0;
927 			reqp->req_worker = NULL;
928 		}
929 		_aio_worker_free(aiowp);
930 		return (-1);
931 	}
932 
933 	lmutex_lock(&__aio_mutex);
934 	(*aio_workerscnt)++;
935 	if (*workers == NULL) {
936 		aiowp->work_forw = aiowp;
937 		aiowp->work_backw = aiowp;
938 		*nextworker = aiowp;
939 		*workers = aiowp;
940 	} else {
941 		aiowp->work_backw = (*workers)->work_backw;
942 		aiowp->work_forw = (*workers);
943 		(*workers)->work_backw->work_forw = aiowp;
944 		(*workers)->work_backw = aiowp;
945 	}
946 	_aio_worker_cnt++;
947 	lmutex_unlock(&__aio_mutex);
948 
949 	(void) thr_continue(aiowp->work_tid);
950 
951 	return (0);
952 }
953 
954 /*
955  * This is the worker's main routine.
956  * The task of this function is to execute all queued requests;
957  * once the last pending request is executed this function will block
958  * in _aio_idle().  A new incoming request must wakeup this thread to
959  * restart the work.
960  * Every worker has an own work queue.  The queue lock is required
961  * to synchronize the addition of new requests for this worker or
962  * cancellation of pending/running requests.
963  *
964  * Cancellation scenarios:
965  * The cancellation of a request is being done asynchronously using
966  * _aio_cancel_req() from another thread context.
967  * A queued request can be cancelled in different manners :
968  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
969  *	- lock the queue -> remove the request -> unlock the queue
970  *	- this function/thread does not detect this cancellation process
971  * b) request is in progress (AIO_REQ_INPROGRESS) :
972  *	- this function first allow the cancellation of the running
973  *	  request with the flag "work_cancel_flg=1"
974  *		see _aio_req_get() -> _aio_cancel_on()
975  *	  During this phase, it is allowed to interrupt the worker
976  *	  thread running the request (this thread) using the SIGAIOCANCEL
977  *	  signal.
978  *	  Once this thread returns from the kernel (because the request
979  *	  is just done), then it must disable a possible cancellation
980  *	  and proceed to finish the request.  To disable the cancellation
981  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
982  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
983  *	  same procedure as in a)
984  *
985  * To b)
986  *	This thread uses sigsetjmp() to define the position in the code, where
987  *	it wish to continue working in the case that a SIGAIOCANCEL signal
988  *	is detected.
989  *	Normally this thread should get the cancellation signal during the
990  *	kernel phase (reading or writing).  In that case the signal handler
991  *	aiosigcancelhndlr() is activated using the worker thread context,
992  *	which again will use the siglongjmp() function to break the standard
993  *	code flow and jump to the "sigsetjmp" position, provided that
994  *	"work_cancel_flg" is set to "1".
995  *	Because the "work_cancel_flg" is only manipulated by this worker
996  *	thread and it can only run on one CPU at a given time, it is not
997  *	necessary to protect that flag with the queue lock.
998  *	Returning from the kernel (read or write system call) we must
999  *	first disable the use of the SIGAIOCANCEL signal and accordingly
1000  *	the use of the siglongjmp() function to prevent a possible deadlock:
1001  *	- It can happens that this worker thread returns from the kernel and
1002  *	  blocks in "work_qlock1",
1003  *	- then a second thread cancels the apparently "in progress" request
1004  *	  and sends the SIGAIOCANCEL signal to the worker thread,
1005  *	- the worker thread gets assigned the "work_qlock1" and will returns
1006  *	  from the kernel,
1007  *	- the kernel detects the pending signal and activates the signal
1008  *	  handler instead,
1009  *	- if the "work_cancel_flg" is still set then the signal handler
1010  *	  should use siglongjmp() to cancel the "in progress" request and
1011  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
1012  *	  for a second time => deadlock.
1013  *	To avoid that situation we disable the cancellation of the request
1014  *	in progress BEFORE we try to acquire the work_qlock1.
1015  *	In that case the signal handler will not call siglongjmp() and the
1016  *	worker thread will continue running the standard code flow.
1017  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
1018  *	an eventually required siglongjmp() freeing the work_qlock1 and
1019  *	avoiding a deadlock.
1020  */
1021 void *
_aio_do_request(void * arglist)1022 _aio_do_request(void *arglist)
1023 {
1024 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
1025 	ulwp_t *self = curthread;
1026 	struct aio_args *arg;
1027 	aio_req_t *reqp;		/* current AIO request */
1028 	ssize_t retval;
1029 	int append;
1030 	int error;
1031 
1032 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1033 		aio_panic("_aio_do_request, pthread_setspecific()");
1034 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1035 	ASSERT(aiowp->work_req == NULL);
1036 
1037 	/*
1038 	 * We resume here when an operation is cancelled.
1039 	 * On first entry, aiowp->work_req == NULL, so all
1040 	 * we do is block SIGAIOCANCEL.
1041 	 */
1042 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
1043 	ASSERT(self->ul_sigdefer == 0);
1044 
1045 	sigoff(self);	/* block SIGAIOCANCEL */
1046 	if (aiowp->work_req != NULL)
1047 		_aio_finish_request(aiowp, -1, ECANCELED);
1048 
1049 	for (;;) {
1050 		/*
1051 		 * Put completed requests on aio_done_list.  This has
1052 		 * to be done as part of the main loop to ensure that
1053 		 * we don't artificially starve any aiowait'ers.
1054 		 */
1055 		if (aiowp->work_done1)
1056 			_aio_work_done(aiowp);
1057 
1058 top:
1059 		/* consume any deferred SIGAIOCANCEL signal here */
1060 		sigon(self);
1061 		sigoff(self);
1062 
1063 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1064 			if (_aio_idle(aiowp) != 0)
1065 				goto top;
1066 		}
1067 		arg = &reqp->req_args;
1068 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1069 		    reqp->req_state == AIO_REQ_CANCELED);
1070 		error = 0;
1071 
1072 		switch (reqp->req_op) {
1073 		case AIOREAD:
1074 		case AIOAREAD:
1075 			sigon(self);	/* unblock SIGAIOCANCEL */
1076 			retval = pread(arg->fd, arg->buf,
1077 			    arg->bufsz, arg->offset);
1078 			if (retval == -1) {
1079 				if (errno == ESPIPE) {
1080 					retval = read(arg->fd,
1081 					    arg->buf, arg->bufsz);
1082 					if (retval == -1)
1083 						error = errno;
1084 				} else {
1085 					error = errno;
1086 				}
1087 			}
1088 			sigoff(self);	/* block SIGAIOCANCEL */
1089 			break;
1090 		case AIOWRITE:
1091 		case AIOAWRITE:
1092 			/*
1093 			 * The SUSv3 POSIX spec for aio_write() states:
1094 			 *	If O_APPEND is set for the file descriptor,
1095 			 *	write operations append to the file in the
1096 			 *	same order as the calls were made.
1097 			 * but, somewhat inconsistently, it requires pwrite()
1098 			 * to ignore the O_APPEND setting.  So we have to use
1099 			 * fcntl() to get the open modes and call write() for
1100 			 * the O_APPEND case.
1101 			 */
1102 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1103 			sigon(self);	/* unblock SIGAIOCANCEL */
1104 			retval = append?
1105 			    write(arg->fd, arg->buf, arg->bufsz) :
1106 			    pwrite(arg->fd, arg->buf, arg->bufsz,
1107 			    arg->offset);
1108 			if (retval == -1) {
1109 				if (errno == ESPIPE) {
1110 					retval = write(arg->fd,
1111 					    arg->buf, arg->bufsz);
1112 					if (retval == -1)
1113 						error = errno;
1114 				} else {
1115 					error = errno;
1116 				}
1117 			}
1118 			sigoff(self);	/* block SIGAIOCANCEL */
1119 			break;
1120 #if !defined(_LP64)
1121 		case AIOAREAD64:
1122 			sigon(self);	/* unblock SIGAIOCANCEL */
1123 			retval = pread64(arg->fd, arg->buf,
1124 			    arg->bufsz, arg->offset);
1125 			if (retval == -1) {
1126 				if (errno == ESPIPE) {
1127 					retval = read(arg->fd,
1128 					    arg->buf, arg->bufsz);
1129 					if (retval == -1)
1130 						error = errno;
1131 				} else {
1132 					error = errno;
1133 				}
1134 			}
1135 			sigoff(self);	/* block SIGAIOCANCEL */
1136 			break;
1137 		case AIOAWRITE64:
1138 			/*
1139 			 * The SUSv3 POSIX spec for aio_write() states:
1140 			 *	If O_APPEND is set for the file descriptor,
1141 			 *	write operations append to the file in the
1142 			 *	same order as the calls were made.
1143 			 * but, somewhat inconsistently, it requires pwrite()
1144 			 * to ignore the O_APPEND setting.  So we have to use
1145 			 * fcntl() to get the open modes and call write() for
1146 			 * the O_APPEND case.
1147 			 */
1148 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1149 			sigon(self);	/* unblock SIGAIOCANCEL */
1150 			retval = append?
1151 			    write(arg->fd, arg->buf, arg->bufsz) :
1152 			    pwrite64(arg->fd, arg->buf, arg->bufsz,
1153 			    arg->offset);
1154 			if (retval == -1) {
1155 				if (errno == ESPIPE) {
1156 					retval = write(arg->fd,
1157 					    arg->buf, arg->bufsz);
1158 					if (retval == -1)
1159 						error = errno;
1160 				} else {
1161 					error = errno;
1162 				}
1163 			}
1164 			sigoff(self);	/* block SIGAIOCANCEL */
1165 			break;
1166 #endif	/* !defined(_LP64) */
1167 		case AIOFSYNC:
1168 			if (_aio_fsync_del(aiowp, reqp))
1169 				goto top;
1170 			ASSERT(reqp->req_head == NULL);
1171 			/*
1172 			 * All writes for this fsync request are now
1173 			 * acknowledged.  Now make these writes visible
1174 			 * and put the final request into the hash table.
1175 			 */
1176 			if (reqp->req_state == AIO_REQ_CANCELED) {
1177 				/* EMPTY */;
1178 			} else if (arg->offset == O_SYNC) {
1179 				if ((retval = __fdsync(arg->fd, FDSYNC_FILE)) ==
1180 				    -1) {
1181 					error = errno;
1182 				}
1183 			} else {
1184 				if ((retval = __fdsync(arg->fd, FDSYNC_DATA)) ==
1185 				    -1) {
1186 					error = errno;
1187 				}
1188 			}
1189 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1190 				aio_panic("_aio_do_request(): AIOFSYNC: "
1191 				    "request already in hash table");
1192 			break;
1193 		default:
1194 			aio_panic("_aio_do_request, bad op");
1195 		}
1196 
1197 		_aio_finish_request(aiowp, retval, error);
1198 	}
1199 	/* NOTREACHED */
1200 	return (NULL);
1201 }
1202 
1203 /*
1204  * Perform the tail processing for _aio_do_request().
1205  * The in-progress request may or may not have been cancelled.
1206  */
1207 static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)1208 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1209 {
1210 	aio_req_t *reqp;
1211 
1212 	sig_mutex_lock(&aiowp->work_qlock1);
1213 	if ((reqp = aiowp->work_req) == NULL)
1214 		sig_mutex_unlock(&aiowp->work_qlock1);
1215 	else {
1216 		aiowp->work_req = NULL;
1217 		if (reqp->req_state == AIO_REQ_CANCELED) {
1218 			retval = -1;
1219 			error = ECANCELED;
1220 		}
1221 		if (!POSIX_AIO(reqp)) {
1222 			int notify;
1223 			if (reqp->req_state == AIO_REQ_INPROGRESS) {
1224 				reqp->req_state = AIO_REQ_DONE;
1225 				_aio_set_result(reqp, retval, error);
1226 			}
1227 			sig_mutex_unlock(&aiowp->work_qlock1);
1228 			sig_mutex_lock(&__aio_mutex);
1229 			/*
1230 			 * If it was canceled, this request will not be
1231 			 * added to done list. Just free it.
1232 			 */
1233 			if (error == ECANCELED) {
1234 				_aio_outstand_cnt--;
1235 				_aio_req_free(reqp);
1236 			} else {
1237 				_aio_req_done_cnt++;
1238 			}
1239 			/*
1240 			 * Notify any thread that may have blocked
1241 			 * because it saw an outstanding request.
1242 			 */
1243 			notify = 0;
1244 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1245 				notify = 1;
1246 			}
1247 			sig_mutex_unlock(&__aio_mutex);
1248 			if (notify) {
1249 				(void) _kaio(AIONOTIFY);
1250 			}
1251 		} else {
1252 			if (reqp->req_state == AIO_REQ_INPROGRESS)
1253 				reqp->req_state = AIO_REQ_DONE;
1254 			sig_mutex_unlock(&aiowp->work_qlock1);
1255 			_aiodone(reqp, retval, error);
1256 		}
1257 	}
1258 }
1259 
1260 void
_aio_req_mark_done(aio_req_t * reqp)1261 _aio_req_mark_done(aio_req_t *reqp)
1262 {
1263 #if !defined(_LP64)
1264 	if (reqp->req_largefile)
1265 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1266 	else
1267 #endif
1268 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1269 }
1270 
1271 /*
1272  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1273  * hopefully to consume one of our queued signals.
1274  */
1275 static void
_aio_delay(int ticks)1276 _aio_delay(int ticks)
1277 {
1278 	(void) usleep(ticks * (MICROSEC / hz));
1279 }
1280 
1281 /*
1282  * Actually send the notifications.
1283  * We could block indefinitely here if the application
1284  * is not listening for the signal or port notifications.
1285  */
1286 static void
send_notification(notif_param_t * npp)1287 send_notification(notif_param_t *npp)
1288 {
1289 	extern int __sigqueue(pid_t pid, int signo,
1290 	    /* const union sigval */ void *value, int si_code, int block);
1291 
1292 	if (npp->np_signo)
1293 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1294 		    SI_ASYNCIO, 1);
1295 	else if (npp->np_port >= 0)
1296 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1297 		    npp->np_event, npp->np_object, npp->np_user);
1298 
1299 	if (npp->np_lio_signo)
1300 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1301 		    SI_ASYNCIO, 1);
1302 	else if (npp->np_lio_port >= 0)
1303 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1304 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1305 }
1306 
1307 /*
1308  * Asynchronous notification worker.
1309  */
1310 void *
_aio_do_notify(void * arg)1311 _aio_do_notify(void *arg)
1312 {
1313 	aio_worker_t *aiowp = (aio_worker_t *)arg;
1314 	aio_req_t *reqp;
1315 
1316 	/*
1317 	 * This isn't really necessary.  All signals are blocked.
1318 	 */
1319 	if (pthread_setspecific(_aio_key, aiowp) != 0)
1320 		aio_panic("_aio_do_notify, pthread_setspecific()");
1321 
1322 	/*
1323 	 * Notifications are never cancelled.
1324 	 * All signals remain blocked, forever.
1325 	 */
1326 	for (;;) {
1327 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
1328 			if (_aio_idle(aiowp) != 0)
1329 				aio_panic("_aio_do_notify: _aio_idle() failed");
1330 		}
1331 		send_notification(&reqp->req_notify);
1332 		_aio_req_free(reqp);
1333 	}
1334 
1335 	/* NOTREACHED */
1336 	return (NULL);
1337 }
1338 
1339 /*
1340  * Do the completion semantics for a request that was either canceled
1341  * by _aio_cancel_req() or was completed by _aio_do_request().
1342  */
1343 static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)1344 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1345 {
1346 	aio_result_t *resultp = reqp->req_resultp;
1347 	int notify = 0;
1348 	aio_lio_t *head;
1349 	int sigev_none;
1350 	int sigev_signal;
1351 	int sigev_thread;
1352 	int sigev_port;
1353 	notif_param_t np;
1354 
1355 	/*
1356 	 * We call _aiodone() only for Posix I/O.
1357 	 */
1358 	ASSERT(POSIX_AIO(reqp));
1359 
1360 	sigev_none = 0;
1361 	sigev_signal = 0;
1362 	sigev_thread = 0;
1363 	sigev_port = 0;
1364 	np.np_signo = 0;
1365 	np.np_port = -1;
1366 	np.np_lio_signo = 0;
1367 	np.np_lio_port = -1;
1368 
1369 	switch (reqp->req_sigevent.sigev_notify) {
1370 	case SIGEV_NONE:
1371 		sigev_none = 1;
1372 		break;
1373 	case SIGEV_SIGNAL:
1374 		sigev_signal = 1;
1375 		break;
1376 	case SIGEV_THREAD:
1377 		sigev_thread = 1;
1378 		break;
1379 	case SIGEV_PORT:
1380 		sigev_port = 1;
1381 		break;
1382 	default:
1383 		aio_panic("_aiodone: improper sigev_notify");
1384 		break;
1385 	}
1386 
1387 	/*
1388 	 * Figure out the notification parameters while holding __aio_mutex.
1389 	 * Actually perform the notifications after dropping __aio_mutex.
1390 	 * This allows us to sleep for a long time (if the notifications
1391 	 * incur delays) without impeding other async I/O operations.
1392 	 */
1393 
1394 	sig_mutex_lock(&__aio_mutex);
1395 
1396 	if (sigev_signal) {
1397 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1398 			notify = 1;
1399 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1400 	} else if (sigev_thread | sigev_port) {
1401 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1402 			notify = 1;
1403 		np.np_event = reqp->req_op;
1404 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
1405 			np.np_event = AIOFSYNC64;
1406 		np.np_object = (uintptr_t)reqp->req_aiocbp;
1407 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1408 	}
1409 
1410 	if (resultp->aio_errno == EINPROGRESS)
1411 		_aio_set_result(reqp, retval, error);
1412 
1413 	_aio_outstand_cnt--;
1414 
1415 	head = reqp->req_head;
1416 	reqp->req_head = NULL;
1417 
1418 	if (sigev_none) {
1419 		_aio_enq_doneq(reqp);
1420 		reqp = NULL;
1421 	} else {
1422 		(void) _aio_hash_del(resultp);
1423 		_aio_req_mark_done(reqp);
1424 	}
1425 
1426 	_aio_waitn_wakeup();
1427 
1428 	/*
1429 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1430 	 * __aio_suspend() increments "_aio_kernel_suspend"
1431 	 * when they are waiting in the kernel for completed I/Os.
1432 	 *
1433 	 * _kaio(AIONOTIFY) awakes the corresponding function
1434 	 * in the kernel; then the corresponding __aio_waitn() or
1435 	 * __aio_suspend() function could reap the recently
1436 	 * completed I/Os (_aiodone()).
1437 	 */
1438 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1439 		(void) _kaio(AIONOTIFY);
1440 
1441 	sig_mutex_unlock(&__aio_mutex);
1442 
1443 	if (head != NULL) {
1444 		/*
1445 		 * If all the lio requests have completed,
1446 		 * prepare to notify the waiting thread.
1447 		 */
1448 		sig_mutex_lock(&head->lio_mutex);
1449 		ASSERT(head->lio_refcnt == head->lio_nent);
1450 		if (head->lio_refcnt == 1) {
1451 			int waiting = 0;
1452 			if (head->lio_mode == LIO_WAIT) {
1453 				if ((waiting = head->lio_waiting) != 0)
1454 					(void) cond_signal(&head->lio_cond_cv);
1455 			} else if (head->lio_port < 0) { /* none or signal */
1456 				if ((np.np_lio_signo = head->lio_signo) != 0)
1457 					notify = 1;
1458 				np.np_lio_user = head->lio_sigval.sival_ptr;
1459 			} else {			/* thread or port */
1460 				notify = 1;
1461 				np.np_lio_port = head->lio_port;
1462 				np.np_lio_event = head->lio_event;
1463 				np.np_lio_object =
1464 				    (uintptr_t)head->lio_sigevent;
1465 				np.np_lio_user = head->lio_sigval.sival_ptr;
1466 			}
1467 			head->lio_nent = head->lio_refcnt = 0;
1468 			sig_mutex_unlock(&head->lio_mutex);
1469 			if (waiting == 0)
1470 				_aio_lio_free(head);
1471 		} else {
1472 			head->lio_nent--;
1473 			head->lio_refcnt--;
1474 			sig_mutex_unlock(&head->lio_mutex);
1475 		}
1476 	}
1477 
1478 	/*
1479 	 * The request is completed; now perform the notifications.
1480 	 */
1481 	if (notify) {
1482 		if (reqp != NULL) {
1483 			/*
1484 			 * We usually put the request on the notification
1485 			 * queue because we don't want to block and delay
1486 			 * other operations behind us in the work queue.
1487 			 * Also we must never block on a cancel notification
1488 			 * because we are being called from an application
1489 			 * thread in this case and that could lead to deadlock
1490 			 * if no other thread is receiving notificatins.
1491 			 */
1492 			reqp->req_notify = np;
1493 			reqp->req_op = AIONOTIFY;
1494 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
1495 			reqp = NULL;
1496 		} else {
1497 			/*
1498 			 * We already put the request on the done queue,
1499 			 * so we can't queue it to the notification queue.
1500 			 * Just do the notification directly.
1501 			 */
1502 			send_notification(&np);
1503 		}
1504 	}
1505 
1506 	if (reqp != NULL)
1507 		_aio_req_free(reqp);
1508 }
1509 
1510 /*
1511  * Delete fsync requests from list head until there is
1512  * only one left.  Return 0 when there is only one,
1513  * otherwise return a non-zero value.
1514  */
1515 static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)1516 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1517 {
1518 	aio_lio_t *head = reqp->req_head;
1519 	int rval = 0;
1520 
1521 	ASSERT(reqp == aiowp->work_req);
1522 	sig_mutex_lock(&aiowp->work_qlock1);
1523 	sig_mutex_lock(&head->lio_mutex);
1524 	if (head->lio_refcnt > 1) {
1525 		head->lio_refcnt--;
1526 		head->lio_nent--;
1527 		aiowp->work_req = NULL;
1528 		sig_mutex_unlock(&head->lio_mutex);
1529 		sig_mutex_unlock(&aiowp->work_qlock1);
1530 		sig_mutex_lock(&__aio_mutex);
1531 		_aio_outstand_cnt--;
1532 		_aio_waitn_wakeup();
1533 		sig_mutex_unlock(&__aio_mutex);
1534 		_aio_req_free(reqp);
1535 		return (1);
1536 	}
1537 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1538 	reqp->req_head = NULL;
1539 	if (head->lio_canned)
1540 		reqp->req_state = AIO_REQ_CANCELED;
1541 	if (head->lio_mode == LIO_DESTROY) {
1542 		aiowp->work_req = NULL;
1543 		rval = 1;
1544 	}
1545 	sig_mutex_unlock(&head->lio_mutex);
1546 	sig_mutex_unlock(&aiowp->work_qlock1);
1547 	head->lio_refcnt--;
1548 	head->lio_nent--;
1549 	_aio_lio_free(head);
1550 	if (rval != 0)
1551 		_aio_req_free(reqp);
1552 	return (rval);
1553 }
1554 
1555 /*
1556  * A worker is set idle when its work queue is empty.
1557  * The worker checks again that it has no more work
1558  * and then goes to sleep waiting for more work.
1559  */
1560 int
_aio_idle(aio_worker_t * aiowp)1561 _aio_idle(aio_worker_t *aiowp)
1562 {
1563 	int error = 0;
1564 
1565 	sig_mutex_lock(&aiowp->work_qlock1);
1566 	if (aiowp->work_count1 == 0) {
1567 		ASSERT(aiowp->work_minload1 == 0);
1568 		aiowp->work_idleflg = 1;
1569 		/*
1570 		 * A cancellation handler is not needed here.
1571 		 * aio worker threads are never cancelled via pthread_cancel().
1572 		 */
1573 		error = sig_cond_wait(&aiowp->work_idle_cv,
1574 		    &aiowp->work_qlock1);
1575 		/*
1576 		 * The idle flag is normally cleared before worker is awakened
1577 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
1578 		 */
1579 		if (error)
1580 			aiowp->work_idleflg = 0;
1581 	}
1582 	sig_mutex_unlock(&aiowp->work_qlock1);
1583 	return (error);
1584 }
1585 
1586 /*
1587  * A worker's completed AIO requests are placed onto a global
1588  * done queue.  The application is only sent a SIGIO signal if
1589  * the process has a handler enabled and it is not waiting via
1590  * aiowait().
1591  */
1592 static void
_aio_work_done(aio_worker_t * aiowp)1593 _aio_work_done(aio_worker_t *aiowp)
1594 {
1595 	aio_req_t *reqp;
1596 
1597 	sig_mutex_lock(&__aio_mutex);
1598 	sig_mutex_lock(&aiowp->work_qlock1);
1599 	reqp = aiowp->work_prev1;
1600 	reqp->req_next = NULL;
1601 	aiowp->work_done1 = 0;
1602 	aiowp->work_tail1 = aiowp->work_next1;
1603 	if (aiowp->work_tail1 == NULL)
1604 		aiowp->work_head1 = NULL;
1605 	aiowp->work_prev1 = NULL;
1606 	_aio_outstand_cnt--;
1607 	_aio_req_done_cnt--;
1608 	if (reqp->req_state == AIO_REQ_CANCELED) {
1609 		/*
1610 		 * Request got cancelled after it was marked done. This can
1611 		 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1612 		 * and drops all locks. Don't add the request to the done
1613 		 * queue and just discard it.
1614 		 */
1615 		sig_mutex_unlock(&aiowp->work_qlock1);
1616 		_aio_req_free(reqp);
1617 		if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1618 			sig_mutex_unlock(&__aio_mutex);
1619 			(void) _kaio(AIONOTIFY);
1620 		} else {
1621 			sig_mutex_unlock(&__aio_mutex);
1622 		}
1623 		return;
1624 	}
1625 	sig_mutex_unlock(&aiowp->work_qlock1);
1626 	_aio_donecnt++;
1627 	ASSERT(_aio_donecnt > 0 &&
1628 	    _aio_outstand_cnt >= 0 &&
1629 	    _aio_req_done_cnt >= 0);
1630 	ASSERT(reqp != NULL);
1631 
1632 	if (_aio_done_tail == NULL) {
1633 		_aio_done_head = _aio_done_tail = reqp;
1634 	} else {
1635 		_aio_done_head->req_next = reqp;
1636 		_aio_done_head = reqp;
1637 	}
1638 
1639 	if (_aiowait_flag) {
1640 		sig_mutex_unlock(&__aio_mutex);
1641 		(void) _kaio(AIONOTIFY);
1642 	} else {
1643 		sig_mutex_unlock(&__aio_mutex);
1644 		if (_sigio_enabled)
1645 			(void) kill(__pid, SIGIO);
1646 	}
1647 }
1648 
1649 /*
1650  * The done queue consists of AIO requests that are in either the
1651  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
1652  * are discarded.  If the done queue is empty then NULL is returned.
1653  * Otherwise the address of a done aio_result_t is returned.
1654  */
1655 aio_result_t *
_aio_req_done(void)1656 _aio_req_done(void)
1657 {
1658 	aio_req_t *reqp;
1659 	aio_result_t *resultp;
1660 
1661 	ASSERT(MUTEX_HELD(&__aio_mutex));
1662 
1663 	if ((reqp = _aio_done_tail) != NULL) {
1664 		if ((_aio_done_tail = reqp->req_next) == NULL)
1665 			_aio_done_head = NULL;
1666 		ASSERT(_aio_donecnt > 0);
1667 		_aio_donecnt--;
1668 		(void) _aio_hash_del(reqp->req_resultp);
1669 		resultp = reqp->req_resultp;
1670 		ASSERT(reqp->req_state == AIO_REQ_DONE);
1671 		_aio_req_free(reqp);
1672 		return (resultp);
1673 	}
1674 	/* is queue empty? */
1675 	if (reqp == NULL && _aio_outstand_cnt == 0) {
1676 		return ((aio_result_t *)-1);
1677 	}
1678 	return (NULL);
1679 }
1680 
1681 /*
1682  * Set the return and errno values for the application's use.
1683  *
1684  * For the Posix interfaces, we must set the return value first followed
1685  * by the errno value because the Posix interfaces allow for a change
1686  * in the errno value from EINPROGRESS to something else to signal
1687  * the completion of the asynchronous request.
1688  *
1689  * The opposite is true for the Solaris interfaces.  These allow for
1690  * a change in the return value from AIO_INPROGRESS to something else
1691  * to signal the completion of the asynchronous request.
1692  */
1693 void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)1694 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1695 {
1696 	aio_result_t *resultp = reqp->req_resultp;
1697 
1698 	if (POSIX_AIO(reqp)) {
1699 		resultp->aio_return = retval;
1700 		membar_producer();
1701 		resultp->aio_errno = error;
1702 	} else {
1703 		resultp->aio_errno = error;
1704 		membar_producer();
1705 		resultp->aio_return = retval;
1706 	}
1707 }
1708 
1709 /*
1710  * Add an AIO request onto the next work queue.
1711  * A circular list of workers is used to choose the next worker.
1712  */
1713 void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)1714 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1715 {
1716 	ulwp_t *self = curthread;
1717 	aio_worker_t *aiowp;
1718 	aio_worker_t *first;
1719 	int load_bal_flg = 1;
1720 	int found;
1721 
1722 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1723 	reqp->req_next = NULL;
1724 	/*
1725 	 * Try to acquire the next worker's work queue.  If it is locked,
1726 	 * then search the list of workers until a queue is found unlocked,
1727 	 * or until the list is completely traversed at which point another
1728 	 * worker will be created.
1729 	 */
1730 	sigoff(self);		/* defer SIGIO */
1731 	sig_mutex_lock(&__aio_mutex);
1732 	first = aiowp = *nextworker;
1733 	if (mode != AIONOTIFY)
1734 		_aio_outstand_cnt++;
1735 	sig_mutex_unlock(&__aio_mutex);
1736 
1737 	switch (mode) {
1738 	case AIOREAD:
1739 	case AIOWRITE:
1740 	case AIOAREAD:
1741 	case AIOAWRITE:
1742 #if !defined(_LP64)
1743 	case AIOAREAD64:
1744 	case AIOAWRITE64:
1745 #endif
1746 		/* try to find an idle worker */
1747 		found = 0;
1748 		do {
1749 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1750 				if (aiowp->work_idleflg) {
1751 					found = 1;
1752 					break;
1753 				}
1754 				sig_mutex_unlock(&aiowp->work_qlock1);
1755 			}
1756 		} while ((aiowp = aiowp->work_forw) != first);
1757 
1758 		if (found) {
1759 			aiowp->work_minload1++;
1760 			break;
1761 		}
1762 
1763 		/* try to acquire some worker's queue lock */
1764 		do {
1765 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1766 				found = 1;
1767 				break;
1768 			}
1769 		} while ((aiowp = aiowp->work_forw) != first);
1770 
1771 		/*
1772 		 * Create more workers when the workers appear overloaded.
1773 		 * Either all the workers are busy draining their queues
1774 		 * or no worker's queue lock could be acquired.
1775 		 */
1776 		if (!found) {
1777 			if (_aio_worker_cnt < _max_workers) {
1778 				if (_aio_create_worker(reqp, mode))
1779 					aio_panic("_aio_req_add: add worker");
1780 				sigon(self);	/* reenable SIGIO */
1781 				return;
1782 			}
1783 
1784 			/*
1785 			 * No worker available and we have created
1786 			 * _max_workers, keep going through the
1787 			 * list slowly until we get a lock
1788 			 */
1789 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1790 				/*
1791 				 * give someone else a chance
1792 				 */
1793 				_aio_delay(1);
1794 				aiowp = aiowp->work_forw;
1795 			}
1796 		}
1797 
1798 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1799 		if (_aio_worker_cnt < _max_workers &&
1800 		    aiowp->work_minload1 >= _minworkload) {
1801 			sig_mutex_unlock(&aiowp->work_qlock1);
1802 			sig_mutex_lock(&__aio_mutex);
1803 			*nextworker = aiowp->work_forw;
1804 			sig_mutex_unlock(&__aio_mutex);
1805 			if (_aio_create_worker(reqp, mode))
1806 				aio_panic("aio_req_add: add worker");
1807 			sigon(self);	/* reenable SIGIO */
1808 			return;
1809 		}
1810 		aiowp->work_minload1++;
1811 		break;
1812 	case AIOFSYNC:
1813 	case AIONOTIFY:
1814 		load_bal_flg = 0;
1815 		sig_mutex_lock(&aiowp->work_qlock1);
1816 		break;
1817 	default:
1818 		aio_panic("_aio_req_add: invalid mode");
1819 		break;
1820 	}
1821 	/*
1822 	 * Put request onto worker's work queue.
1823 	 */
1824 	if (aiowp->work_tail1 == NULL) {
1825 		ASSERT(aiowp->work_count1 == 0);
1826 		aiowp->work_tail1 = reqp;
1827 		aiowp->work_next1 = reqp;
1828 	} else {
1829 		aiowp->work_head1->req_next = reqp;
1830 		if (aiowp->work_next1 == NULL)
1831 			aiowp->work_next1 = reqp;
1832 	}
1833 	reqp->req_state = AIO_REQ_QUEUED;
1834 	reqp->req_worker = aiowp;
1835 	aiowp->work_head1 = reqp;
1836 	/*
1837 	 * Awaken worker if it is not currently active.
1838 	 */
1839 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1840 		aiowp->work_idleflg = 0;
1841 		(void) cond_signal(&aiowp->work_idle_cv);
1842 	}
1843 	sig_mutex_unlock(&aiowp->work_qlock1);
1844 
1845 	if (load_bal_flg) {
1846 		sig_mutex_lock(&__aio_mutex);
1847 		*nextworker = aiowp->work_forw;
1848 		sig_mutex_unlock(&__aio_mutex);
1849 	}
1850 	sigon(self);	/* reenable SIGIO */
1851 }
1852 
1853 /*
1854  * Get an AIO request for a specified worker.
1855  * If the work queue is empty, return NULL.
1856  */
1857 aio_req_t *
_aio_req_get(aio_worker_t * aiowp)1858 _aio_req_get(aio_worker_t *aiowp)
1859 {
1860 	aio_req_t *reqp;
1861 
1862 	sig_mutex_lock(&aiowp->work_qlock1);
1863 	if ((reqp = aiowp->work_next1) != NULL) {
1864 		/*
1865 		 * Remove a POSIX request from the queue; the
1866 		 * request queue is a singularly linked list
1867 		 * with a previous pointer.  The request is
1868 		 * removed by updating the previous pointer.
1869 		 *
1870 		 * Non-posix requests are left on the queue
1871 		 * to eventually be placed on the done queue.
1872 		 */
1873 
1874 		if (POSIX_AIO(reqp)) {
1875 			if (aiowp->work_prev1 == NULL) {
1876 				aiowp->work_tail1 = reqp->req_next;
1877 				if (aiowp->work_tail1 == NULL)
1878 					aiowp->work_head1 = NULL;
1879 			} else {
1880 				aiowp->work_prev1->req_next = reqp->req_next;
1881 				if (aiowp->work_head1 == reqp)
1882 					aiowp->work_head1 = reqp->req_next;
1883 			}
1884 
1885 		} else {
1886 			aiowp->work_prev1 = reqp;
1887 			ASSERT(aiowp->work_done1 >= 0);
1888 			aiowp->work_done1++;
1889 		}
1890 		ASSERT(reqp != reqp->req_next);
1891 		aiowp->work_next1 = reqp->req_next;
1892 		ASSERT(aiowp->work_count1 >= 1);
1893 		aiowp->work_count1--;
1894 		switch (reqp->req_op) {
1895 		case AIOREAD:
1896 		case AIOWRITE:
1897 		case AIOAREAD:
1898 		case AIOAWRITE:
1899 #if !defined(_LP64)
1900 		case AIOAREAD64:
1901 		case AIOAWRITE64:
1902 #endif
1903 			ASSERT(aiowp->work_minload1 > 0);
1904 			aiowp->work_minload1--;
1905 			break;
1906 		}
1907 		reqp->req_state = AIO_REQ_INPROGRESS;
1908 	}
1909 	aiowp->work_req = reqp;
1910 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1911 	sig_mutex_unlock(&aiowp->work_qlock1);
1912 	return (reqp);
1913 }
1914 
1915 static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)1916 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1917 {
1918 	aio_req_t **last;
1919 	aio_req_t *lastrp;
1920 	aio_req_t *next;
1921 
1922 	ASSERT(aiowp != NULL);
1923 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1924 	if (POSIX_AIO(reqp)) {
1925 		if (ostate != AIO_REQ_QUEUED)
1926 			return;
1927 	}
1928 	last = &aiowp->work_tail1;
1929 	lastrp = aiowp->work_tail1;
1930 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1931 	while ((next = *last) != NULL) {
1932 		if (next == reqp) {
1933 			*last = next->req_next;
1934 			if (aiowp->work_next1 == next)
1935 				aiowp->work_next1 = next->req_next;
1936 
1937 			/*
1938 			 * if this is the first request on the queue, move
1939 			 * the lastrp pointer forward.
1940 			 */
1941 			if (lastrp == next)
1942 				lastrp = next->req_next;
1943 
1944 			/*
1945 			 * if this request is pointed by work_head1, then
1946 			 * make work_head1 point to the last request that is
1947 			 * present on the queue.
1948 			 */
1949 			if (aiowp->work_head1 == next)
1950 				aiowp->work_head1 = lastrp;
1951 
1952 			/*
1953 			 * work_prev1 is used only in non posix case and it
1954 			 * points to the current AIO_REQ_INPROGRESS request.
1955 			 * If work_prev1 points to this request which is being
1956 			 * deleted, make work_prev1 NULL and set  work_done1
1957 			 * to 0.
1958 			 *
1959 			 * A worker thread can be processing only one request
1960 			 * at a time.
1961 			 */
1962 			if (aiowp->work_prev1 == next) {
1963 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
1964 				    !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1965 					aiowp->work_prev1 = NULL;
1966 					aiowp->work_done1--;
1967 			}
1968 
1969 			if (ostate == AIO_REQ_QUEUED) {
1970 				ASSERT(aiowp->work_count1 >= 1);
1971 				aiowp->work_count1--;
1972 				ASSERT(aiowp->work_minload1 >= 1);
1973 				aiowp->work_minload1--;
1974 			}
1975 			return;
1976 		}
1977 		last = &next->req_next;
1978 		lastrp = next;
1979 	}
1980 	/* NOTREACHED */
1981 }
1982 
1983 static void
_aio_enq_doneq(aio_req_t * reqp)1984 _aio_enq_doneq(aio_req_t *reqp)
1985 {
1986 	if (_aio_doneq == NULL) {
1987 		_aio_doneq = reqp;
1988 		reqp->req_next = reqp->req_prev = reqp;
1989 	} else {
1990 		reqp->req_next = _aio_doneq;
1991 		reqp->req_prev = _aio_doneq->req_prev;
1992 		_aio_doneq->req_prev->req_next = reqp;
1993 		_aio_doneq->req_prev = reqp;
1994 	}
1995 	reqp->req_state = AIO_REQ_DONEQ;
1996 	_aio_doneq_cnt++;
1997 }
1998 
1999 /*
2000  * caller owns the _aio_mutex
2001  */
2002 aio_req_t *
_aio_req_remove(aio_req_t * reqp)2003 _aio_req_remove(aio_req_t *reqp)
2004 {
2005 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2006 		return (NULL);
2007 
2008 	if (reqp) {
2009 		/* request in done queue */
2010 		if (_aio_doneq == reqp)
2011 			_aio_doneq = reqp->req_next;
2012 		if (_aio_doneq == reqp) {
2013 			/* only one request on queue */
2014 			_aio_doneq = NULL;
2015 		} else {
2016 			aio_req_t *tmp = reqp->req_next;
2017 			reqp->req_prev->req_next = tmp;
2018 			tmp->req_prev = reqp->req_prev;
2019 		}
2020 	} else if ((reqp = _aio_doneq) != NULL) {
2021 		if (reqp == reqp->req_next) {
2022 			/* only one request on queue */
2023 			_aio_doneq = NULL;
2024 		} else {
2025 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2026 			_aio_doneq->req_prev = reqp->req_prev;
2027 		}
2028 	}
2029 	if (reqp) {
2030 		_aio_doneq_cnt--;
2031 		reqp->req_next = reqp->req_prev = reqp;
2032 		reqp->req_state = AIO_REQ_DONE;
2033 	}
2034 	return (reqp);
2035 }
2036 
2037 /*
2038  * An AIO request is identified by an aio_result_t pointer.  The library
2039  * maps this aio_result_t pointer to its internal representation using a
2040  * hash table.  This function adds an aio_result_t pointer to the hash table.
2041  */
2042 static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)2043 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2044 {
2045 	aio_hash_t *hashp;
2046 	aio_req_t **prev;
2047 	aio_req_t *next;
2048 
2049 	hashp = _aio_hash + AIOHASH(resultp);
2050 	lmutex_lock(&hashp->hash_lock);
2051 	prev = &hashp->hash_ptr;
2052 	while ((next = *prev) != NULL) {
2053 		if (resultp == next->req_resultp) {
2054 			lmutex_unlock(&hashp->hash_lock);
2055 			return (-1);
2056 		}
2057 		prev = &next->req_link;
2058 	}
2059 	*prev = reqp;
2060 	ASSERT(reqp->req_link == NULL);
2061 	lmutex_unlock(&hashp->hash_lock);
2062 	return (0);
2063 }
2064 
2065 /*
2066  * Remove an entry from the hash table.
2067  */
2068 aio_req_t *
_aio_hash_del(aio_result_t * resultp)2069 _aio_hash_del(aio_result_t *resultp)
2070 {
2071 	aio_hash_t *hashp;
2072 	aio_req_t **prev;
2073 	aio_req_t *next = NULL;
2074 
2075 	if (_aio_hash != NULL) {
2076 		hashp = _aio_hash + AIOHASH(resultp);
2077 		lmutex_lock(&hashp->hash_lock);
2078 		prev = &hashp->hash_ptr;
2079 		while ((next = *prev) != NULL) {
2080 			if (resultp == next->req_resultp) {
2081 				*prev = next->req_link;
2082 				next->req_link = NULL;
2083 				break;
2084 			}
2085 			prev = &next->req_link;
2086 		}
2087 		lmutex_unlock(&hashp->hash_lock);
2088 	}
2089 	return (next);
2090 }
2091 
2092 /*
2093  *  find an entry in the hash table
2094  */
2095 aio_req_t *
_aio_hash_find(aio_result_t * resultp)2096 _aio_hash_find(aio_result_t *resultp)
2097 {
2098 	aio_hash_t *hashp;
2099 	aio_req_t **prev;
2100 	aio_req_t *next = NULL;
2101 
2102 	if (_aio_hash != NULL) {
2103 		hashp = _aio_hash + AIOHASH(resultp);
2104 		lmutex_lock(&hashp->hash_lock);
2105 		prev = &hashp->hash_ptr;
2106 		while ((next = *prev) != NULL) {
2107 			if (resultp == next->req_resultp)
2108 				break;
2109 			prev = &next->req_link;
2110 		}
2111 		lmutex_unlock(&hashp->hash_lock);
2112 	}
2113 	return (next);
2114 }
2115 
2116 /*
2117  * AIO interface for POSIX
2118  */
2119 int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2120 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2121     int mode, int flg)
2122 {
2123 	aio_req_t *reqp;
2124 	aio_args_t *ap;
2125 	int kerr;
2126 
2127 	if (aiocbp == NULL) {
2128 		errno = EINVAL;
2129 		return (-1);
2130 	}
2131 
2132 	/* initialize kaio */
2133 	if (!_kaio_ok)
2134 		_kaio_init();
2135 
2136 	aiocbp->aio_state = NOCHECK;
2137 
2138 	/*
2139 	 * If we have been called because a list I/O
2140 	 * kaio() failed, we dont want to repeat the
2141 	 * system call
2142 	 */
2143 
2144 	if (flg & AIO_KAIO) {
2145 		/*
2146 		 * Try kernel aio first.
2147 		 * If errno is ENOTSUP/EBADFD,
2148 		 * fall back to the thread implementation.
2149 		 */
2150 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2151 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2152 			aiocbp->aio_state = CHECK;
2153 			kerr = (int)_kaio(mode, aiocbp);
2154 			if (kerr == 0)
2155 				return (0);
2156 			if (errno != ENOTSUP && errno != EBADFD) {
2157 				aiocbp->aio_resultp.aio_errno = errno;
2158 				aiocbp->aio_resultp.aio_return = -1;
2159 				aiocbp->aio_state = NOCHECK;
2160 				return (-1);
2161 			}
2162 			if (errno == EBADFD)
2163 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2164 		}
2165 	}
2166 
2167 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2168 	aiocbp->aio_state = USERAIO;
2169 
2170 	if (!__uaio_ok && __uaio_init() == -1)
2171 		return (-1);
2172 
2173 	if ((reqp = _aio_req_alloc()) == NULL) {
2174 		errno = EAGAIN;
2175 		return (-1);
2176 	}
2177 
2178 	/*
2179 	 * If an LIO request, add the list head to the aio request
2180 	 */
2181 	reqp->req_head = lio_head;
2182 	reqp->req_type = AIO_POSIX_REQ;
2183 	reqp->req_op = mode;
2184 	reqp->req_largefile = 0;
2185 
2186 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2187 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2188 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2189 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2190 		reqp->req_sigevent.sigev_signo =
2191 		    aiocbp->aio_sigevent.sigev_signo;
2192 		reqp->req_sigevent.sigev_value.sival_ptr =
2193 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2194 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2195 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2196 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2197 		/*
2198 		 * Reuse the sigevent structure to contain the port number
2199 		 * and the user value.  Same for SIGEV_THREAD, below.
2200 		 */
2201 		reqp->req_sigevent.sigev_signo =
2202 		    pn->portnfy_port;
2203 		reqp->req_sigevent.sigev_value.sival_ptr =
2204 		    pn->portnfy_user;
2205 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2206 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2207 		/*
2208 		 * The sigevent structure contains the port number
2209 		 * and the user value.  Same for SIGEV_PORT, above.
2210 		 */
2211 		reqp->req_sigevent.sigev_signo =
2212 		    aiocbp->aio_sigevent.sigev_signo;
2213 		reqp->req_sigevent.sigev_value.sival_ptr =
2214 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2215 	}
2216 
2217 	reqp->req_resultp = &aiocbp->aio_resultp;
2218 	reqp->req_aiocbp = aiocbp;
2219 	ap = &reqp->req_args;
2220 	ap->fd = aiocbp->aio_fildes;
2221 	ap->buf = (caddr_t)aiocbp->aio_buf;
2222 	ap->bufsz = aiocbp->aio_nbytes;
2223 	ap->offset = aiocbp->aio_offset;
2224 
2225 	if ((flg & AIO_NO_DUPS) &&
2226 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2227 		aio_panic("_aio_rw(): request already in hash table");
2228 		_aio_req_free(reqp);
2229 		errno = EINVAL;
2230 		return (-1);
2231 	}
2232 	_aio_req_add(reqp, nextworker, mode);
2233 	return (0);
2234 }
2235 
2236 #if !defined(_LP64)
2237 /*
2238  * 64-bit AIO interface for POSIX
2239  */
2240 int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2241 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2242     int mode, int flg)
2243 {
2244 	aio_req_t *reqp;
2245 	aio_args_t *ap;
2246 	int kerr;
2247 
2248 	if (aiocbp == NULL) {
2249 		errno = EINVAL;
2250 		return (-1);
2251 	}
2252 
2253 	/* initialize kaio */
2254 	if (!_kaio_ok)
2255 		_kaio_init();
2256 
2257 	aiocbp->aio_state = NOCHECK;
2258 
2259 	/*
2260 	 * If we have been called because a list I/O
2261 	 * kaio() failed, we dont want to repeat the
2262 	 * system call
2263 	 */
2264 
2265 	if (flg & AIO_KAIO) {
2266 		/*
2267 		 * Try kernel aio first.
2268 		 * If errno is ENOTSUP/EBADFD,
2269 		 * fall back to the thread implementation.
2270 		 */
2271 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2272 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2273 			aiocbp->aio_state = CHECK;
2274 			kerr = (int)_kaio(mode, aiocbp);
2275 			if (kerr == 0)
2276 				return (0);
2277 			if (errno != ENOTSUP && errno != EBADFD) {
2278 				aiocbp->aio_resultp.aio_errno = errno;
2279 				aiocbp->aio_resultp.aio_return = -1;
2280 				aiocbp->aio_state = NOCHECK;
2281 				return (-1);
2282 			}
2283 			if (errno == EBADFD)
2284 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2285 		}
2286 	}
2287 
2288 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2289 	aiocbp->aio_state = USERAIO;
2290 
2291 	if (!__uaio_ok && __uaio_init() == -1)
2292 		return (-1);
2293 
2294 	if ((reqp = _aio_req_alloc()) == NULL) {
2295 		errno = EAGAIN;
2296 		return (-1);
2297 	}
2298 
2299 	/*
2300 	 * If an LIO request, add the list head to the aio request
2301 	 */
2302 	reqp->req_head = lio_head;
2303 	reqp->req_type = AIO_POSIX_REQ;
2304 	reqp->req_op = mode;
2305 	reqp->req_largefile = 1;
2306 
2307 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2308 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2309 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2310 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2311 		reqp->req_sigevent.sigev_signo =
2312 		    aiocbp->aio_sigevent.sigev_signo;
2313 		reqp->req_sigevent.sigev_value.sival_ptr =
2314 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2315 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2316 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2317 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2318 		reqp->req_sigevent.sigev_signo =
2319 		    pn->portnfy_port;
2320 		reqp->req_sigevent.sigev_value.sival_ptr =
2321 		    pn->portnfy_user;
2322 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2323 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2324 		reqp->req_sigevent.sigev_signo =
2325 		    aiocbp->aio_sigevent.sigev_signo;
2326 		reqp->req_sigevent.sigev_value.sival_ptr =
2327 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
2328 	}
2329 
2330 	reqp->req_resultp = &aiocbp->aio_resultp;
2331 	reqp->req_aiocbp = aiocbp;
2332 	ap = &reqp->req_args;
2333 	ap->fd = aiocbp->aio_fildes;
2334 	ap->buf = (caddr_t)aiocbp->aio_buf;
2335 	ap->bufsz = aiocbp->aio_nbytes;
2336 	ap->offset = aiocbp->aio_offset;
2337 
2338 	if ((flg & AIO_NO_DUPS) &&
2339 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2340 		aio_panic("_aio_rw64(): request already in hash table");
2341 		_aio_req_free(reqp);
2342 		errno = EINVAL;
2343 		return (-1);
2344 	}
2345 	_aio_req_add(reqp, nextworker, mode);
2346 	return (0);
2347 }
2348 #endif	/* !defined(_LP64) */
2349