xref: /freebsd/sys/kern/vfs_aio.c (revision 87569f75a91f298c52a71823c04d41cf53c88889)
1 /*-
2  * Copyright (c) 1997 John S. Dyson.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. John S. Dyson's name may not be used to endorse or promote products
10  *    derived from this software without specific prior written permission.
11  *
12  * DISCLAIMER:  This code isn't warranted to do anything useful.  Anything
13  * bad that happens because of using this software isn't the responsibility
14  * of the author.  This software is distributed AS-IS.
15  */
16 
17 /*
18  * This file contains support for the POSIX 1003.1B AIO/LIO facility.
19  */
20 
21 #include <sys/cdefs.h>
22 __FBSDID("$FreeBSD$");
23 
24 #include <sys/param.h>
25 #include <sys/systm.h>
26 #include <sys/malloc.h>
27 #include <sys/bio.h>
28 #include <sys/buf.h>
29 #include <sys/eventhandler.h>
30 #include <sys/sysproto.h>
31 #include <sys/filedesc.h>
32 #include <sys/kernel.h>
33 #include <sys/module.h>
34 #include <sys/kthread.h>
35 #include <sys/fcntl.h>
36 #include <sys/file.h>
37 #include <sys/limits.h>
38 #include <sys/lock.h>
39 #include <sys/mutex.h>
40 #include <sys/unistd.h>
41 #include <sys/proc.h>
42 #include <sys/resourcevar.h>
43 #include <sys/signalvar.h>
44 #include <sys/protosw.h>
45 #include <sys/sema.h>
46 #include <sys/socket.h>
47 #include <sys/socketvar.h>
48 #include <sys/syscall.h>
49 #include <sys/sysent.h>
50 #include <sys/sysctl.h>
51 #include <sys/sx.h>
52 #include <sys/taskqueue.h>
53 #include <sys/vnode.h>
54 #include <sys/conf.h>
55 #include <sys/event.h>
56 
57 #include <machine/atomic.h>
58 
59 #include <posix4/posix4.h>
60 #include <vm/vm.h>
61 #include <vm/vm_extern.h>
62 #include <vm/pmap.h>
63 #include <vm/vm_map.h>
64 #include <vm/uma.h>
65 #include <sys/aio.h>
66 
67 #include "opt_vfs_aio.h"
68 
69 /*
70  * Counter for allocating reference ids to new jobs.  Wrapped to 1 on
71  * overflow.
72  */
73 static	long jobrefid;
74 
75 #define JOBST_NULL		0x0
76 #define JOBST_JOBQSOCK		0x1
77 #define JOBST_JOBQGLOBAL	0x2
78 #define JOBST_JOBRUNNING	0x3
79 #define JOBST_JOBFINISHED	0x4
80 #define	JOBST_JOBQBUF		0x5
81 
82 #ifndef MAX_AIO_PER_PROC
83 #define MAX_AIO_PER_PROC	32
84 #endif
85 
86 #ifndef MAX_AIO_QUEUE_PER_PROC
87 #define MAX_AIO_QUEUE_PER_PROC	256 /* Bigger than AIO_LISTIO_MAX */
88 #endif
89 
90 #ifndef MAX_AIO_PROCS
91 #define MAX_AIO_PROCS		32
92 #endif
93 
94 #ifndef MAX_AIO_QUEUE
95 #define	MAX_AIO_QUEUE		1024 /* Bigger than AIO_LISTIO_MAX */
96 #endif
97 
98 #ifndef TARGET_AIO_PROCS
99 #define TARGET_AIO_PROCS	4
100 #endif
101 
102 #ifndef MAX_BUF_AIO
103 #define MAX_BUF_AIO		16
104 #endif
105 
106 #ifndef AIOD_TIMEOUT_DEFAULT
107 #define	AIOD_TIMEOUT_DEFAULT	(10 * hz)
108 #endif
109 
110 #ifndef AIOD_LIFETIME_DEFAULT
111 #define AIOD_LIFETIME_DEFAULT	(30 * hz)
112 #endif
113 
114 static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management");
115 
116 static int max_aio_procs = MAX_AIO_PROCS;
117 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs,
118 	CTLFLAG_RW, &max_aio_procs, 0,
119 	"Maximum number of kernel threads to use for handling async IO ");
120 
121 static int num_aio_procs = 0;
122 SYSCTL_INT(_vfs_aio, OID_AUTO, num_aio_procs,
123 	CTLFLAG_RD, &num_aio_procs, 0,
124 	"Number of presently active kernel threads for async IO");
125 
126 /*
127  * The code will adjust the actual number of AIO processes towards this
128  * number when it gets a chance.
129  */
130 static int target_aio_procs = TARGET_AIO_PROCS;
131 SYSCTL_INT(_vfs_aio, OID_AUTO, target_aio_procs, CTLFLAG_RW, &target_aio_procs,
132 	0, "Preferred number of ready kernel threads for async IO");
133 
134 static int max_queue_count = MAX_AIO_QUEUE;
135 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue, CTLFLAG_RW, &max_queue_count, 0,
136     "Maximum number of aio requests to queue, globally");
137 
138 static int num_queue_count = 0;
139 SYSCTL_INT(_vfs_aio, OID_AUTO, num_queue_count, CTLFLAG_RD, &num_queue_count, 0,
140     "Number of queued aio requests");
141 
142 static int num_buf_aio = 0;
143 SYSCTL_INT(_vfs_aio, OID_AUTO, num_buf_aio, CTLFLAG_RD, &num_buf_aio, 0,
144     "Number of aio requests presently handled by the buf subsystem");
145 
146 /* Number of async I/O thread in the process of being started */
147 /* XXX This should be local to aio_aqueue() */
148 static int num_aio_resv_start = 0;
149 
150 static int aiod_timeout;
151 SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_timeout, CTLFLAG_RW, &aiod_timeout, 0,
152     "Timeout value for synchronous aio operations");
153 
154 static int aiod_lifetime;
155 SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
156     "Maximum lifetime for idle aiod");
157 
158 static int unloadable = 0;
159 SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
160     "Allow unload of aio (not recommended)");
161 
162 
163 static int max_aio_per_proc = MAX_AIO_PER_PROC;
164 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
165     0, "Maximum active aio requests per process (stored in the process)");
166 
167 static int max_aio_queue_per_proc = MAX_AIO_QUEUE_PER_PROC;
168 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue_per_proc, CTLFLAG_RW,
169     &max_aio_queue_per_proc, 0,
170     "Maximum queued aio requests per process (stored in the process)");
171 
172 static int max_buf_aio = MAX_BUF_AIO;
173 SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, CTLFLAG_RW, &max_buf_aio, 0,
174     "Maximum buf aio requests per process (stored in the process)");
175 
176 typedef struct oaiocb {
177 	int	aio_fildes;		/* File descriptor */
178 	off_t	aio_offset;		/* File offset for I/O */
179 	volatile void *aio_buf;         /* I/O buffer in process space */
180 	size_t	aio_nbytes;		/* Number of bytes for I/O */
181 	struct	osigevent aio_sigevent;	/* Signal to deliver */
182 	int	aio_lio_opcode;		/* LIO opcode */
183 	int	aio_reqprio;		/* Request priority -- ignored */
184 	struct	__aiocb_private	_aiocb_private;
185 } oaiocb_t;
186 
187 /*
188  * Below is a key of locks used to protect each member of struct aiocblist
189  * aioliojob and kaioinfo and any backends.
190  *
191  * * - need not protected
192  * a - locked by proc mtx
193  * b - locked by backend lock, the backend lock can be null in some cases,
194  *     for example, BIO belongs to this type, in this case, proc lock is
195  *     reused.
196  * c - locked by aio_job_mtx, the lock for the generic file I/O backend.
197  */
198 
199 /*
200  * Current, there is only two backends: BIO and generic file I/O.
201  * socket I/O is served by generic file I/O, this is not a good idea, since
202  * disk file I/O and any other types without O_NONBLOCK flag can block daemon
203  * threads, if there is no thread to serve socket I/O, the socket I/O will be
204  * delayed too long or starved, we should create some threads dedicated to
205  * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
206  * systems we really need non-blocking interface, fiddling O_NONBLOCK in file
207  * structure is not safe because there is race between userland and aio
208  * daemons.
209  */
210 
211 struct aiocblist {
212 	TAILQ_ENTRY(aiocblist) list;	/* (b) internal list of for backend */
213 	TAILQ_ENTRY(aiocblist) plist;	/* (a) list of jobs for each backend */
214 	TAILQ_ENTRY(aiocblist) allist;  /* (a) list of all jobs in proc */
215 	int	jobflags;		/* (a) job flags */
216 	int	jobstate;		/* (b) job state */
217 	int	inputcharge;		/* (*) input blockes */
218 	int	outputcharge;		/* (*) output blockes */
219 	struct	buf *bp;		/* (*) private to BIO backend,
220 				  	 * buffer pointer
221 					 */
222 	struct	proc *userproc;		/* (*) user process */
223 	struct  ucred *cred;		/* (*) active credential when created */
224 	struct	file *fd_file;		/* (*) pointer to file structure */
225 	struct	aioliojob *lio;		/* (*) optional lio job */
226 	struct	aiocb *uuaiocb;		/* (*) pointer in userspace of aiocb */
227 	struct	knlist klist;		/* (a) list of knotes */
228 	struct	aiocb uaiocb;		/* (*) kernel I/O control block */
229 	ksiginfo_t ksi;			/* (a) realtime signal info */
230 	struct task	biotask;	/* (*) private to BIO backend */
231 };
232 
233 /* jobflags */
234 #define AIOCBLIST_RUNDOWN	0x04
235 #define AIOCBLIST_DONE		0x10
236 #define AIOCBLIST_BUFDONE	0x20
237 
238 /*
239  * AIO process info
240  */
241 #define AIOP_FREE	0x1			/* proc on free queue */
242 
243 struct aiothreadlist {
244 	int aiothreadflags;			/* (c) AIO proc flags */
245 	TAILQ_ENTRY(aiothreadlist) list;	/* (c) list of processes */
246 	struct thread *aiothread;		/* (*) the AIO thread */
247 };
248 
249 /*
250  * data-structure for lio signal management
251  */
252 struct aioliojob {
253 	int	lioj_flags;			/* (a) listio flags */
254 	int	lioj_count;			/* (a) listio flags */
255 	int	lioj_finished_count;		/* (a) listio flags */
256 	struct	sigevent lioj_signal;		/* (a) signal on all I/O done */
257 	TAILQ_ENTRY(aioliojob) lioj_list;	/* (a) lio list */
258 	struct  knlist klist;			/* (a) list of knotes */
259 	ksiginfo_t lioj_ksi;			/* (a) Realtime signal info */
260 };
261 
262 #define	LIOJ_SIGNAL		0x1	/* signal on all done (lio) */
263 #define	LIOJ_SIGNAL_POSTED	0x2	/* signal has been posted */
264 #define LIOJ_KEVENT_POSTED	0x4	/* kevent triggered */
265 
266 /*
267  * per process aio data structure
268  */
269 struct kaioinfo {
270 	int	kaio_flags;		/* (a) per process kaio flags */
271 	int	kaio_maxactive_count;	/* (*) maximum number of AIOs */
272 	int	kaio_active_count;	/* (c) number of currently used AIOs */
273 	int	kaio_qallowed_count;	/* (*) maxiumu size of AIO queue */
274 	int	kaio_count;		/* (a) size of AIO queue */
275 	int	kaio_ballowed_count;	/* (*) maximum number of buffers */
276 	int	kaio_buffer_count;	/* (a) number of physio buffers */
277 	TAILQ_HEAD(,aiocblist) kaio_all;	/* (a) all AIOs in the process */
278 	TAILQ_HEAD(,aiocblist) kaio_done;	/* (a) done queue for process */
279 	TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
280 	TAILQ_HEAD(,aiocblist) kaio_jobqueue;	/* (a) job queue for process */
281 	TAILQ_HEAD(,aiocblist) kaio_bufqueue;	/* (a) buffer job queue for process */
282 	TAILQ_HEAD(,aiocblist) kaio_sockqueue;  /* (a) queue for aios waiting on sockets,
283 						 *  not used yet.
284 						 */
285 };
286 
287 #define KAIO_RUNDOWN	0x1	/* process is being run down */
288 #define KAIO_WAKEUP	0x2	/* wakeup process when there is a significant event */
289 
290 static TAILQ_HEAD(,aiothreadlist) aio_freeproc;		/* (c) Idle daemons */
291 static struct sema aio_newproc_sem;
292 static struct mtx aio_job_mtx;
293 static struct mtx aio_sock_mtx;
294 static TAILQ_HEAD(,aiocblist) aio_jobs;			/* (c) Async job list */
295 static struct unrhdr *aiod_unr;
296 
297 static void	aio_init_aioinfo(struct proc *p);
298 static void	aio_onceonly(void);
299 static int	aio_free_entry(struct aiocblist *aiocbe);
300 static void	aio_process(struct aiocblist *aiocbe);
301 static int	aio_newproc(int *);
302 static int	aio_aqueue(struct thread *td, struct aiocb *job,
303 			struct aioliojob *lio, int type, int osigev);
304 static void	aio_physwakeup(struct buf *bp);
305 static void	aio_proc_rundown(void *arg, struct proc *p);
306 static int	aio_qphysio(struct proc *p, struct aiocblist *iocb);
307 static void	biohelper(void *, int);
308 static void	aio_daemon(void *param);
309 static void	aio_swake_cb(struct socket *, struct sockbuf *);
310 static int	aio_unload(void);
311 static int	filt_aioattach(struct knote *kn);
312 static void	filt_aiodetach(struct knote *kn);
313 static int	filt_aio(struct knote *kn, long hint);
314 static int	filt_lioattach(struct knote *kn);
315 static void	filt_liodetach(struct knote *kn);
316 static int	filt_lio(struct knote *kn, long hint);
317 #define DONE_BUF 1
318 #define DONE_QUEUE 2
319 static void	aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type);
320 static int	do_lio_listio(struct thread *td, struct lio_listio_args *uap,
321 			int oldsigev);
322 
323 /*
324  * Zones for:
325  * 	kaio	Per process async io info
326  *	aiop	async io thread data
327  *	aiocb	async io jobs
328  *	aiol	list io job pointer - internal to aio_suspend XXX
329  *	aiolio	list io jobs
330  */
331 static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone;
332 
333 /* kqueue filters for aio */
334 static struct filterops aio_filtops =
335 	{ 0, filt_aioattach, filt_aiodetach, filt_aio };
336 static struct filterops lio_filtops =
337 	{ 0, filt_lioattach, filt_liodetach, filt_lio };
338 
339 static eventhandler_tag exit_tag, exec_tag;
340 
341 TASKQUEUE_DEFINE_THREAD(aiod_bio);
342 
343 /*
344  * Main operations function for use as a kernel module.
345  */
346 static int
347 aio_modload(struct module *module, int cmd, void *arg)
348 {
349 	int error = 0;
350 
351 	switch (cmd) {
352 	case MOD_LOAD:
353 		aio_onceonly();
354 		break;
355 	case MOD_UNLOAD:
356 		error = aio_unload();
357 		break;
358 	case MOD_SHUTDOWN:
359 		break;
360 	default:
361 		error = EINVAL;
362 		break;
363 	}
364 	return (error);
365 }
366 
367 static moduledata_t aio_mod = {
368 	"aio",
369 	&aio_modload,
370 	NULL
371 };
372 
373 SYSCALL_MODULE_HELPER(aio_return);
374 SYSCALL_MODULE_HELPER(aio_suspend);
375 SYSCALL_MODULE_HELPER(aio_cancel);
376 SYSCALL_MODULE_HELPER(aio_error);
377 SYSCALL_MODULE_HELPER(aio_read);
378 SYSCALL_MODULE_HELPER(aio_write);
379 SYSCALL_MODULE_HELPER(aio_waitcomplete);
380 SYSCALL_MODULE_HELPER(lio_listio);
381 SYSCALL_MODULE_HELPER(oaio_read);
382 SYSCALL_MODULE_HELPER(oaio_write);
383 SYSCALL_MODULE_HELPER(olio_listio);
384 
385 DECLARE_MODULE(aio, aio_mod,
386 	SI_SUB_VFS, SI_ORDER_ANY);
387 MODULE_VERSION(aio, 1);
388 
389 /*
390  * Startup initialization
391  */
392 static void
393 aio_onceonly(void)
394 {
395 
396 	/* XXX: should probably just use so->callback */
397 	aio_swake = &aio_swake_cb;
398 	exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
399 	    EVENTHANDLER_PRI_ANY);
400 	exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL,
401 	    EVENTHANDLER_PRI_ANY);
402 	kqueue_add_filteropts(EVFILT_AIO, &aio_filtops);
403 	kqueue_add_filteropts(EVFILT_LIO, &lio_filtops);
404 	TAILQ_INIT(&aio_freeproc);
405 	sema_init(&aio_newproc_sem, 0, "aio_new_proc");
406 	mtx_init(&aio_job_mtx, "aio_job", NULL, MTX_DEF);
407 	mtx_init(&aio_sock_mtx, "aio_sock", NULL, MTX_DEF);
408 	TAILQ_INIT(&aio_jobs);
409 	aiod_unr = new_unrhdr(1, INT_MAX, NULL);
410 	kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL,
411 	    NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
412 	aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL,
413 	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
414 	aiocb_zone = uma_zcreate("AIOCB", sizeof(struct aiocblist), NULL, NULL,
415 	    NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
416 	aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL,
417 	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
418 	aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aioliojob), NULL,
419 	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
420 	aiod_timeout = AIOD_TIMEOUT_DEFAULT;
421 	aiod_lifetime = AIOD_LIFETIME_DEFAULT;
422 	jobrefid = 1;
423 	async_io_version = _POSIX_VERSION;
424 	p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, AIO_LISTIO_MAX);
425 	p31b_setcfg(CTL_P1003_1B_AIO_MAX, MAX_AIO_QUEUE);
426 	p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, 0);
427 }
428 
429 /*
430  * Callback for unload of AIO when used as a module.
431  */
432 static int
433 aio_unload(void)
434 {
435 	int error;
436 
437 	/*
438 	 * XXX: no unloads by default, it's too dangerous.
439 	 * perhaps we could do it if locked out callers and then
440 	 * did an aio_proc_rundown() on each process.
441 	 *
442 	 * jhb: aio_proc_rundown() needs to run on curproc though,
443 	 * so I don't think that would fly.
444 	 */
445 	if (!unloadable)
446 		return (EOPNOTSUPP);
447 
448 	error = kqueue_del_filteropts(EVFILT_AIO);
449 	if (error)
450 		return error;
451 	error = kqueue_del_filteropts(EVFILT_LIO);
452 	if (error)
453 		return error;
454 	async_io_version = 0;
455 	aio_swake = NULL;
456 	taskqueue_free(taskqueue_aiod_bio);
457 	delete_unrhdr(aiod_unr);
458 	uma_zdestroy(kaio_zone);
459 	uma_zdestroy(aiop_zone);
460 	uma_zdestroy(aiocb_zone);
461 	uma_zdestroy(aiol_zone);
462 	uma_zdestroy(aiolio_zone);
463 	EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
464 	EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
465 	mtx_destroy(&aio_job_mtx);
466 	mtx_destroy(&aio_sock_mtx);
467 	sema_destroy(&aio_newproc_sem);
468 	p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
469 	p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
470 	p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
471 	return (0);
472 }
473 
474 /*
475  * Init the per-process aioinfo structure.  The aioinfo limits are set
476  * per-process for user limit (resource) management.
477  */
478 static void
479 aio_init_aioinfo(struct proc *p)
480 {
481 	struct kaioinfo *ki;
482 
483 	ki = uma_zalloc(kaio_zone, M_WAITOK);
484 	ki->kaio_flags = 0;
485 	ki->kaio_maxactive_count = max_aio_per_proc;
486 	ki->kaio_active_count = 0;
487 	ki->kaio_qallowed_count = max_aio_queue_per_proc;
488 	ki->kaio_count = 0;
489 	ki->kaio_ballowed_count = max_buf_aio;
490 	ki->kaio_buffer_count = 0;
491 	TAILQ_INIT(&ki->kaio_all);
492 	TAILQ_INIT(&ki->kaio_done);
493 	TAILQ_INIT(&ki->kaio_jobqueue);
494 	TAILQ_INIT(&ki->kaio_bufqueue);
495 	TAILQ_INIT(&ki->kaio_liojoblist);
496 	TAILQ_INIT(&ki->kaio_sockqueue);
497 	PROC_LOCK(p);
498 	if (p->p_aioinfo == NULL) {
499 		p->p_aioinfo = ki;
500 		PROC_UNLOCK(p);
501 	} else {
502 		PROC_UNLOCK(p);
503 		uma_zfree(kaio_zone, ki);
504 	}
505 
506 	while (num_aio_procs < target_aio_procs)
507 		aio_newproc(NULL);
508 }
509 
510 static int
511 aio_sendsig(struct proc *p, struct sigevent *sigev, ksiginfo_t *ksi)
512 {
513 	PROC_LOCK_ASSERT(p, MA_OWNED);
514 	if (!KSI_ONQ(ksi)) {
515 		ksi->ksi_code = SI_ASYNCIO;
516 		ksi->ksi_flags |= KSI_EXT | KSI_INS;
517 		return (psignal_event(p, sigev, ksi));
518 	}
519 	return (0);
520 }
521 
522 /*
523  * Free a job entry.  Wait for completion if it is currently active, but don't
524  * delay forever.  If we delay, we return a flag that says that we have to
525  * restart the queue scan.
526  */
527 static int
528 aio_free_entry(struct aiocblist *aiocbe)
529 {
530 	struct kaioinfo *ki;
531 	struct aioliojob *lj;
532 	struct proc *p;
533 
534 	p = aiocbe->userproc;
535 
536 	PROC_LOCK_ASSERT(p, MA_OWNED);
537 	MPASS(curproc == p);
538 	MPASS(aiocbe->jobstate == JOBST_JOBFINISHED);
539 
540 	ki = p->p_aioinfo;
541 	MPASS(ki != NULL);
542 
543 	atomic_subtract_int(&num_queue_count, 1);
544 
545 	ki->kaio_count--;
546 	MPASS(ki->kaio_count >= 0);
547 
548 	TAILQ_REMOVE(&ki->kaio_done, aiocbe, plist);
549 	TAILQ_REMOVE(&ki->kaio_all, aiocbe, allist);
550 
551 	lj = aiocbe->lio;
552 	if (lj) {
553 		lj->lioj_count--;
554 		lj->lioj_finished_count--;
555 
556 		if (lj->lioj_count == 0) {
557 			TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
558 			/* lio is going away, we need to destroy any knotes */
559 			knlist_delete(&lj->klist, curthread, 1);
560 			sigqueue_take(&lj->lioj_ksi);
561 			uma_zfree(aiolio_zone, lj);
562 		}
563 	}
564 
565 	/* aiocbe is going away, we need to destroy any knotes */
566 	knlist_delete(&aiocbe->klist, curthread, 1);
567 	sigqueue_take(&aiocbe->ksi);
568 
569  	MPASS(aiocbe->bp == NULL);
570 	aiocbe->jobstate = JOBST_NULL;
571 	PROC_UNLOCK(p);
572 
573 	/*
574 	 * The thread argument here is used to find the owning process
575 	 * and is also passed to fo_close() which may pass it to various
576 	 * places such as devsw close() routines.  Because of that, we
577 	 * need a thread pointer from the process owning the job that is
578 	 * persistent and won't disappear out from under us or move to
579 	 * another process.
580 	 *
581 	 * Currently, all the callers of this function call it to remove
582 	 * an aiocblist from the current process' job list either via a
583 	 * syscall or due to the current process calling exit() or
584 	 * execve().  Thus, we know that p == curproc.  We also know that
585 	 * curthread can't exit since we are curthread.
586 	 *
587 	 * Therefore, we use curthread as the thread to pass to
588 	 * knlist_delete().  This does mean that it is possible for the
589 	 * thread pointer at close time to differ from the thread pointer
590 	 * at open time, but this is already true of file descriptors in
591 	 * a multithreaded process.
592 	 */
593 	fdrop(aiocbe->fd_file, curthread);
594 	crfree(aiocbe->cred);
595 	uma_zfree(aiocb_zone, aiocbe);
596 	PROC_LOCK(p);
597 
598 	return (0);
599 }
600 
601 /*
602  * Rundown the jobs for a given process.
603  */
604 static void
605 aio_proc_rundown(void *arg, struct proc *p)
606 {
607 	struct kaioinfo *ki;
608 	struct aioliojob *lj;
609 	struct aiocblist *cbe, *cbn;
610 	struct file *fp;
611 	struct socket *so;
612 	int remove;
613 
614 	KASSERT(curthread->td_proc == p,
615 	    ("%s: called on non-curproc", __func__));
616 	ki = p->p_aioinfo;
617 	if (ki == NULL)
618 		return;
619 
620 	PROC_LOCK(p);
621 	ki->kaio_flags |= KAIO_RUNDOWN;
622 
623 restart:
624 
625 	/*
626 	 * Try to cancel all pending requests. This code simulates
627 	 * aio_cancel on all pending I/O requests.
628 	 */
629 	TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
630 		remove = 0;
631 		mtx_lock(&aio_job_mtx);
632 		if (cbe->jobstate == JOBST_JOBQGLOBAL) {
633 			TAILQ_REMOVE(&aio_jobs, cbe, list);
634 			remove = 1;
635 		} else if (cbe->jobstate == JOBST_JOBQSOCK) {
636 			fp = cbe->fd_file;
637 			MPASS(fp->f_type == DTYPE_SOCKET);
638 			so = fp->f_data;
639 			TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
640 			remove = 1;
641 		}
642 		mtx_unlock(&aio_job_mtx);
643 
644 		if (remove) {
645 			cbe->jobstate = JOBST_JOBFINISHED;
646 			cbe->uaiocb._aiocb_private.status = -1;
647 			cbe->uaiocb._aiocb_private.error = ECANCELED;
648 			TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
649 			aio_bio_done_notify(p, cbe, DONE_QUEUE);
650 		}
651 	}
652 
653 	/* Wait for all running I/O to be finished */
654 	if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
655 	    TAILQ_FIRST(&ki->kaio_jobqueue)) {
656 		ki->kaio_flags |= KAIO_WAKEUP;
657 		msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO, "aioprn", hz);
658 		goto restart;
659 	}
660 
661 	/* Free all completed I/O requests. */
662 	while ((cbe = TAILQ_FIRST(&ki->kaio_done)) != NULL)
663 		aio_free_entry(cbe);
664 
665 	while ((lj = TAILQ_FIRST(&ki->kaio_liojoblist)) != NULL) {
666 		if (lj->lioj_count == 0) {
667 			TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
668 			knlist_delete(&lj->klist, curthread, 1);
669 			sigqueue_take(&lj->lioj_ksi);
670 			uma_zfree(aiolio_zone, lj);
671 		} else {
672 			panic("LIO job not cleaned up: C:%d, FC:%d\n",
673 			    lj->lioj_count, lj->lioj_finished_count);
674 		}
675 	}
676 
677 	uma_zfree(kaio_zone, ki);
678 	p->p_aioinfo = NULL;
679 	PROC_UNLOCK(p);
680 }
681 
682 /*
683  * Select a job to run (called by an AIO daemon).
684  */
685 static struct aiocblist *
686 aio_selectjob(struct aiothreadlist *aiop)
687 {
688 	struct aiocblist *aiocbe;
689 	struct kaioinfo *ki;
690 	struct proc *userp;
691 
692 	mtx_assert(&aio_job_mtx, MA_OWNED);
693 	TAILQ_FOREACH(aiocbe, &aio_jobs, list) {
694 		userp = aiocbe->userproc;
695 		ki = userp->p_aioinfo;
696 
697 		if (ki->kaio_active_count < ki->kaio_maxactive_count) {
698 			TAILQ_REMOVE(&aio_jobs, aiocbe, list);
699 			/* Account for currently active jobs. */
700 			ki->kaio_active_count++;
701 			aiocbe->jobstate = JOBST_JOBRUNNING;
702 			break;
703 		}
704 	}
705 	return (aiocbe);
706 }
707 
708 /*
709  * The AIO processing activity.  This is the code that does the I/O request for
710  * the non-physio version of the operations.  The normal vn operations are used,
711  * and this code should work in all instances for every type of file, including
712  * pipes, sockets, fifos, and regular files.
713  *
714  * XXX I don't think it works well for socket, pipe, and fifo.
715  */
716 static void
717 aio_process(struct aiocblist *aiocbe)
718 {
719 	struct ucred *td_savedcred;
720 	struct thread *td;
721 	struct proc *mycp;
722 	struct aiocb *cb;
723 	struct file *fp;
724 	struct socket *so;
725 	struct uio auio;
726 	struct iovec aiov;
727 	int cnt;
728 	int error;
729 	int oublock_st, oublock_end;
730 	int inblock_st, inblock_end;
731 
732 	td = curthread;
733 	td_savedcred = td->td_ucred;
734 	td->td_ucred = aiocbe->cred;
735 	mycp = td->td_proc;
736 	cb = &aiocbe->uaiocb;
737 	fp = aiocbe->fd_file;
738 
739 	aiov.iov_base = (void *)(uintptr_t)cb->aio_buf;
740 	aiov.iov_len = cb->aio_nbytes;
741 
742 	auio.uio_iov = &aiov;
743 	auio.uio_iovcnt = 1;
744 	auio.uio_offset = cb->aio_offset;
745 	auio.uio_resid = cb->aio_nbytes;
746 	cnt = cb->aio_nbytes;
747 	auio.uio_segflg = UIO_USERSPACE;
748 	auio.uio_td = td;
749 
750 	inblock_st = mycp->p_stats->p_ru.ru_inblock;
751 	oublock_st = mycp->p_stats->p_ru.ru_oublock;
752 	/*
753 	 * aio_aqueue() acquires a reference to the file that is
754 	 * released in aio_free_entry().
755 	 */
756 	if (cb->aio_lio_opcode == LIO_READ) {
757 		auio.uio_rw = UIO_READ;
758 		error = fo_read(fp, &auio, fp->f_cred, FOF_OFFSET, td);
759 	} else {
760 		if (fp->f_type == DTYPE_VNODE)
761 			bwillwrite();
762 		auio.uio_rw = UIO_WRITE;
763 		error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
764 	}
765 	inblock_end = mycp->p_stats->p_ru.ru_inblock;
766 	oublock_end = mycp->p_stats->p_ru.ru_oublock;
767 
768 	aiocbe->inputcharge = inblock_end - inblock_st;
769 	aiocbe->outputcharge = oublock_end - oublock_st;
770 
771 	if ((error) && (auio.uio_resid != cnt)) {
772 		if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
773 			error = 0;
774 		if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
775 			int sigpipe = 1;
776 			if (fp->f_type == DTYPE_SOCKET) {
777 				so = fp->f_data;
778 				if (so->so_options & SO_NOSIGPIPE)
779 					sigpipe = 0;
780 			}
781 			if (sigpipe) {
782 				PROC_LOCK(aiocbe->userproc);
783 				psignal(aiocbe->userproc, SIGPIPE);
784 				PROC_UNLOCK(aiocbe->userproc);
785 			}
786 		}
787 	}
788 
789 	cnt -= auio.uio_resid;
790 	cb->_aiocb_private.error = error;
791 	cb->_aiocb_private.status = cnt;
792 	td->td_ucred = td_savedcred;
793 }
794 
795 static void
796 aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type)
797 {
798 	struct aioliojob *lj;
799 	struct kaioinfo *ki;
800 	int lj_done;
801 
802 	PROC_LOCK_ASSERT(userp, MA_OWNED);
803 	ki = userp->p_aioinfo;
804 	lj = aiocbe->lio;
805 	lj_done = 0;
806 	if (lj) {
807 		lj->lioj_finished_count++;
808 		if (lj->lioj_count == lj->lioj_finished_count)
809 			lj_done = 1;
810 	}
811 	if (type == DONE_QUEUE) {
812 		aiocbe->jobflags |= AIOCBLIST_DONE;
813 	} else {
814 		aiocbe->jobflags |= AIOCBLIST_BUFDONE;
815 	}
816 	TAILQ_INSERT_TAIL(&ki->kaio_done, aiocbe, plist);
817 	aiocbe->jobstate = JOBST_JOBFINISHED;
818 
819 	if (ki->kaio_flags & KAIO_RUNDOWN)
820 		goto notification_done;
821 
822 	if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
823 	    aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID)
824 		aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi);
825 
826 	KNOTE_LOCKED(&aiocbe->klist, 1);
827 
828 	if (lj_done) {
829 		if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
830 			lj->lioj_flags |= LIOJ_KEVENT_POSTED;
831 			KNOTE_LOCKED(&lj->klist, 1);
832 		}
833 		if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
834 		    == LIOJ_SIGNAL
835 		    && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
836 		        lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
837 			aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi);
838 			lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
839 		}
840 	}
841 
842 notification_done:
843 	if (ki->kaio_flags & KAIO_WAKEUP) {
844 		ki->kaio_flags &= ~KAIO_WAKEUP;
845 		wakeup(&userp->p_aioinfo);
846 	}
847 }
848 
849 /*
850  * The AIO daemon, most of the actual work is done in aio_process,
851  * but the setup (and address space mgmt) is done in this routine.
852  */
853 static void
854 aio_daemon(void *_id)
855 {
856 	struct aiocblist *aiocbe;
857 	struct aiothreadlist *aiop;
858 	struct kaioinfo *ki;
859 	struct proc *curcp, *mycp, *userp;
860 	struct vmspace *myvm, *tmpvm;
861 	struct thread *td = curthread;
862 	int id = (intptr_t)_id;
863 
864 	/*
865 	 * Local copies of curproc (cp) and vmspace (myvm)
866 	 */
867 	mycp = td->td_proc;
868 	myvm = mycp->p_vmspace;
869 
870 	KASSERT(mycp->p_textvp == NULL, ("kthread has a textvp"));
871 
872 	/*
873 	 * Allocate and ready the aio control info.  There is one aiop structure
874 	 * per daemon.
875 	 */
876 	aiop = uma_zalloc(aiop_zone, M_WAITOK);
877 	aiop->aiothread = td;
878 	aiop->aiothreadflags = AIOP_FREE;
879 
880 	/*
881 	 * Place thread (lightweight process) onto the AIO free thread list.
882 	 */
883 	mtx_lock(&aio_job_mtx);
884 	TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
885 	mtx_unlock(&aio_job_mtx);
886 
887 	/*
888 	 * Get rid of our current filedescriptors.  AIOD's don't need any
889 	 * filedescriptors, except as temporarily inherited from the client.
890 	 */
891 	fdfree(td);
892 
893 	/* The daemon resides in its own pgrp. */
894 	setsid(td, NULL);
895 
896 	/*
897 	 * Wakeup parent process.  (Parent sleeps to keep from blasting away
898 	 * and creating too many daemons.)
899 	 */
900 	sema_post(&aio_newproc_sem);
901 
902 	mtx_lock(&aio_job_mtx);
903 	for (;;) {
904 		/*
905 		 * curcp is the current daemon process context.
906 		 * userp is the current user process context.
907 		 */
908 		curcp = mycp;
909 
910 		/*
911 		 * Take daemon off of free queue
912 		 */
913 		if (aiop->aiothreadflags & AIOP_FREE) {
914 			TAILQ_REMOVE(&aio_freeproc, aiop, list);
915 			aiop->aiothreadflags &= ~AIOP_FREE;
916 		}
917 
918 		/*
919 		 * Check for jobs.
920 		 */
921 		while ((aiocbe = aio_selectjob(aiop)) != NULL) {
922 			mtx_unlock(&aio_job_mtx);
923 			userp = aiocbe->userproc;
924 
925 			/*
926 			 * Connect to process address space for user program.
927 			 */
928 			if (userp != curcp) {
929 				/*
930 				 * Save the current address space that we are
931 				 * connected to.
932 				 */
933 				tmpvm = mycp->p_vmspace;
934 
935 				/*
936 				 * Point to the new user address space, and
937 				 * refer to it.
938 				 */
939 				mycp->p_vmspace = userp->p_vmspace;
940 				atomic_add_int(&mycp->p_vmspace->vm_refcnt, 1);
941 
942 				/* Activate the new mapping. */
943 				pmap_activate(FIRST_THREAD_IN_PROC(mycp));
944 
945 				/*
946 				 * If the old address space wasn't the daemons
947 				 * own address space, then we need to remove the
948 				 * daemon's reference from the other process
949 				 * that it was acting on behalf of.
950 				 */
951 				if (tmpvm != myvm) {
952 					vmspace_free(tmpvm);
953 				}
954 				curcp = userp;
955 			}
956 
957 			ki = userp->p_aioinfo;
958 
959 			/* Do the I/O function. */
960 			aio_process(aiocbe);
961 
962 			mtx_lock(&aio_job_mtx);
963 			/* Decrement the active job count. */
964 			ki->kaio_active_count--;
965 			mtx_unlock(&aio_job_mtx);
966 
967 			PROC_LOCK(userp);
968 			TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
969 			aio_bio_done_notify(userp, aiocbe, DONE_QUEUE);
970 			PROC_UNLOCK(userp);
971 
972 			mtx_lock(&aio_job_mtx);
973 		}
974 
975 		/*
976 		 * Disconnect from user address space.
977 		 */
978 		if (curcp != mycp) {
979 
980 			mtx_unlock(&aio_job_mtx);
981 
982 			/* Get the user address space to disconnect from. */
983 			tmpvm = mycp->p_vmspace;
984 
985 			/* Get original address space for daemon. */
986 			mycp->p_vmspace = myvm;
987 
988 			/* Activate the daemon's address space. */
989 			pmap_activate(FIRST_THREAD_IN_PROC(mycp));
990 #ifdef DIAGNOSTIC
991 			if (tmpvm == myvm) {
992 				printf("AIOD: vmspace problem -- %d\n",
993 				    mycp->p_pid);
994 			}
995 #endif
996 			/* Remove our vmspace reference. */
997 			vmspace_free(tmpvm);
998 
999 			curcp = mycp;
1000 
1001 			mtx_lock(&aio_job_mtx);
1002 			/*
1003 			 * We have to restart to avoid race, we only sleep if
1004 			 * no job can be selected, that should be
1005 			 * curcp == mycp.
1006 			 */
1007 			continue;
1008 		}
1009 
1010 		mtx_assert(&aio_job_mtx, MA_OWNED);
1011 
1012 		TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
1013 		aiop->aiothreadflags |= AIOP_FREE;
1014 
1015 		/*
1016 		 * If daemon is inactive for a long time, allow it to exit,
1017 		 * thereby freeing resources.
1018 		 */
1019 		if (msleep(aiop->aiothread, &aio_job_mtx, PRIBIO, "aiordy",
1020 		    aiod_lifetime)) {
1021 			if (TAILQ_EMPTY(&aio_jobs)) {
1022 				if ((aiop->aiothreadflags & AIOP_FREE) &&
1023 				    (num_aio_procs > target_aio_procs)) {
1024 					TAILQ_REMOVE(&aio_freeproc, aiop, list);
1025 					num_aio_procs--;
1026 					mtx_unlock(&aio_job_mtx);
1027 					uma_zfree(aiop_zone, aiop);
1028 					free_unr(aiod_unr, id);
1029 #ifdef DIAGNOSTIC
1030 					if (mycp->p_vmspace->vm_refcnt <= 1) {
1031 						printf("AIOD: bad vm refcnt for"
1032 						    " exiting daemon: %d\n",
1033 						    mycp->p_vmspace->vm_refcnt);
1034 					}
1035 #endif
1036 					kthread_exit(0);
1037 				}
1038 			}
1039 		}
1040 	}
1041 	mtx_unlock(&aio_job_mtx);
1042 	panic("shouldn't be here\n");
1043 }
1044 
1045 /*
1046  * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The
1047  * AIO daemon modifies its environment itself.
1048  */
1049 static int
1050 aio_newproc(int *start)
1051 {
1052 	int error;
1053 	struct proc *p;
1054 	int id;
1055 
1056 	id = alloc_unr(aiod_unr);
1057 	error = kthread_create(aio_daemon, (void *)(intptr_t)id, &p,
1058 		RFNOWAIT, 0, "aiod%d", id);
1059 	if (error == 0) {
1060 		/*
1061 		 * Wait until daemon is started.
1062 		 */
1063 		sema_wait(&aio_newproc_sem);
1064 		mtx_lock(&aio_job_mtx);
1065 		num_aio_procs++;
1066 		if (start != NULL)
1067 			(*start)--;
1068 		mtx_unlock(&aio_job_mtx);
1069 	} else {
1070 		free_unr(aiod_unr, id);
1071 	}
1072 	return (error);
1073 }
1074 
1075 /*
1076  * Try the high-performance, low-overhead physio method for eligible
1077  * VCHR devices.  This method doesn't use an aio helper thread, and
1078  * thus has very low overhead.
1079  *
1080  * Assumes that the caller, aio_aqueue(), has incremented the file
1081  * structure's reference count, preventing its deallocation for the
1082  * duration of this call.
1083  */
1084 static int
1085 aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
1086 {
1087 	struct aiocb *cb;
1088 	struct file *fp;
1089 	struct buf *bp;
1090 	struct vnode *vp;
1091 	struct kaioinfo *ki;
1092 	struct aioliojob *lj;
1093 	int error;
1094 
1095 	cb = &aiocbe->uaiocb;
1096 	fp = aiocbe->fd_file;
1097 
1098 	if (fp->f_type != DTYPE_VNODE)
1099 		return (-1);
1100 
1101 	vp = fp->f_vnode;
1102 
1103 	/*
1104 	 * If its not a disk, we don't want to return a positive error.
1105 	 * It causes the aio code to not fall through to try the thread
1106 	 * way when you're talking to a regular file.
1107 	 */
1108 	if (!vn_isdisk(vp, &error)) {
1109 		if (error == ENOTBLK)
1110 			return (-1);
1111 		else
1112 			return (error);
1113 	}
1114 
1115 	if (vp->v_bufobj.bo_bsize == 0)
1116 		return (-1);
1117 
1118  	if (cb->aio_nbytes % vp->v_bufobj.bo_bsize)
1119 		return (-1);
1120 
1121 	if (cb->aio_nbytes > vp->v_rdev->si_iosize_max)
1122 		return (-1);
1123 
1124 	if (cb->aio_nbytes >
1125 	    MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK))
1126 		return (-1);
1127 
1128 	ki = p->p_aioinfo;
1129 	if (ki->kaio_buffer_count >= ki->kaio_ballowed_count)
1130 		return (-1);
1131 
1132 	/* Create and build a buffer header for a transfer. */
1133 	bp = (struct buf *)getpbuf(NULL);
1134 	BUF_KERNPROC(bp);
1135 
1136 	PROC_LOCK(p);
1137 	ki->kaio_count++;
1138 	ki->kaio_buffer_count++;
1139 	lj = aiocbe->lio;
1140 	if (lj)
1141 		lj->lioj_count++;
1142 	PROC_UNLOCK(p);
1143 
1144 	/*
1145 	 * Get a copy of the kva from the physical buffer.
1146 	 */
1147 	error = 0;
1148 
1149 	bp->b_bcount = cb->aio_nbytes;
1150 	bp->b_bufsize = cb->aio_nbytes;
1151 	bp->b_iodone = aio_physwakeup;
1152 	bp->b_saveaddr = bp->b_data;
1153 	bp->b_data = (void *)(uintptr_t)cb->aio_buf;
1154 	bp->b_offset = cb->aio_offset;
1155 	bp->b_iooffset = cb->aio_offset;
1156 	bp->b_blkno = btodb(cb->aio_offset);
1157 	bp->b_iocmd = cb->aio_lio_opcode == LIO_WRITE ? BIO_WRITE : BIO_READ;
1158 
1159 	/*
1160 	 * Bring buffer into kernel space.
1161 	 */
1162 	if (vmapbuf(bp) < 0) {
1163 		error = EFAULT;
1164 		goto doerror;
1165 	}
1166 
1167 	PROC_LOCK(p);
1168 	aiocbe->bp = bp;
1169 	bp->b_caller1 = (void *)aiocbe;
1170 	TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist);
1171 	TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
1172 	aiocbe->jobstate = JOBST_JOBQBUF;
1173 	cb->_aiocb_private.status = cb->aio_nbytes;
1174 	PROC_UNLOCK(p);
1175 
1176 	atomic_add_int(&num_queue_count, 1);
1177 	atomic_add_int(&num_buf_aio, 1);
1178 
1179 	bp->b_error = 0;
1180 
1181 	TASK_INIT(&aiocbe->biotask, 0, biohelper, aiocbe);
1182 
1183 	/* Perform transfer. */
1184 	dev_strategy(vp->v_rdev, bp);
1185 	return (0);
1186 
1187 doerror:
1188 	PROC_LOCK(p);
1189 	ki->kaio_count--;
1190 	ki->kaio_buffer_count--;
1191 	if (lj)
1192 		lj->lioj_count--;
1193 	aiocbe->bp = NULL;
1194 	PROC_UNLOCK(p);
1195 	relpbuf(bp, NULL);
1196 	return (error);
1197 }
1198 
1199 /*
1200  * Wake up aio requests that may be serviceable now.
1201  */
1202 static void
1203 aio_swake_cb(struct socket *so, struct sockbuf *sb)
1204 {
1205 	struct aiocblist *cb, *cbn;
1206 	int opcode, wakecount = 0;
1207 	struct aiothreadlist *aiop;
1208 
1209 	if (sb == &so->so_snd)
1210 		opcode = LIO_WRITE;
1211 	else
1212 		opcode = LIO_READ;
1213 
1214 	SOCKBUF_LOCK(sb);
1215 	sb->sb_flags &= ~SB_AIO;
1216 	mtx_lock(&aio_job_mtx);
1217 	TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) {
1218 		if (opcode == cb->uaiocb.aio_lio_opcode) {
1219 			if (cb->jobstate != JOBST_JOBQSOCK)
1220 				panic("invalid queue value");
1221 			/* XXX
1222 			 * We don't have actual sockets backend yet,
1223 			 * so we simply move the requests to the generic
1224 			 * file I/O backend.
1225 			 */
1226 			TAILQ_REMOVE(&so->so_aiojobq, cb, list);
1227 			TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
1228 			wakecount++;
1229 		}
1230 	}
1231 	mtx_unlock(&aio_job_mtx);
1232 	SOCKBUF_UNLOCK(sb);
1233 
1234 	while (wakecount--) {
1235 		mtx_lock(&aio_job_mtx);
1236 		if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
1237 			TAILQ_REMOVE(&aio_freeproc, aiop, list);
1238 			aiop->aiothreadflags &= ~AIOP_FREE;
1239 			wakeup(aiop->aiothread);
1240 		}
1241 		mtx_unlock(&aio_job_mtx);
1242 	}
1243 }
1244 
1245 /*
1246  * Queue a new AIO request.  Choosing either the threaded or direct physio VCHR
1247  * technique is done in this code.
1248  */
1249 static int
1250 aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
1251 	int type, int oldsigev)
1252 {
1253 	struct proc *p = td->td_proc;
1254 	struct file *fp;
1255 	struct socket *so;
1256 	struct aiocblist *aiocbe;
1257 	struct aiothreadlist *aiop;
1258 	struct kaioinfo *ki;
1259 	struct kevent kev;
1260 	struct kqueue *kq;
1261 	struct file *kq_fp;
1262 	struct sockbuf *sb;
1263 	int opcode;
1264 	int error;
1265 	int fd;
1266 	int jid;
1267 
1268 	if (p->p_aioinfo == NULL)
1269 		aio_init_aioinfo(p);
1270 
1271 	ki = p->p_aioinfo;
1272 
1273 	suword(&job->_aiocb_private.status, -1);
1274 	suword(&job->_aiocb_private.error, 0);
1275 	suword(&job->_aiocb_private.kernelinfo, -1);
1276 
1277 	if (num_queue_count >= max_queue_count ||
1278 	    ki->kaio_count >= ki->kaio_qallowed_count) {
1279 		suword(&job->_aiocb_private.error, EAGAIN);
1280 		return (EAGAIN);
1281 	}
1282 
1283 	aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO);
1284 	aiocbe->inputcharge = 0;
1285 	aiocbe->outputcharge = 0;
1286 	knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL);
1287 
1288 	if (oldsigev) {
1289 		bzero(&aiocbe->uaiocb, sizeof(struct aiocb));
1290 		error = copyin(job, &aiocbe->uaiocb, sizeof(struct oaiocb));
1291 		bcopy(&aiocbe->uaiocb.__spare__, &aiocbe->uaiocb.aio_sigevent,
1292 			sizeof(struct osigevent));
1293 	} else {
1294 		error = copyin(job, &aiocbe->uaiocb, sizeof(struct aiocb));
1295 	}
1296 	if (error) {
1297 		suword(&job->_aiocb_private.error, error);
1298 		uma_zfree(aiocb_zone, aiocbe);
1299 		return (error);
1300 	}
1301 
1302 	if (aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT &&
1303 	    aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_SIGNAL &&
1304 	    aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_THREAD_ID &&
1305 	    aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_NONE) {
1306 		suword(&job->_aiocb_private.error, EINVAL);
1307 		uma_zfree(aiocb_zone, aiocbe);
1308 		return (EINVAL);
1309 	}
1310 
1311 	if ((aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
1312 	     aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) &&
1313 		!_SIG_VALID(aiocbe->uaiocb.aio_sigevent.sigev_signo)) {
1314 		uma_zfree(aiocb_zone, aiocbe);
1315 		return (EINVAL);
1316 	}
1317 
1318 	ksiginfo_init(&aiocbe->ksi);
1319 
1320 	/* Save userspace address of the job info. */
1321 	aiocbe->uuaiocb = job;
1322 
1323 	/* Get the opcode. */
1324 	if (type != LIO_NOP)
1325 		aiocbe->uaiocb.aio_lio_opcode = type;
1326 	opcode = aiocbe->uaiocb.aio_lio_opcode;
1327 
1328 	/* Fetch the file object for the specified file descriptor. */
1329 	fd = aiocbe->uaiocb.aio_fildes;
1330 	switch (opcode) {
1331 	case LIO_WRITE:
1332 		error = fget_write(td, fd, &fp);
1333 		break;
1334 	case LIO_READ:
1335 		error = fget_read(td, fd, &fp);
1336 		break;
1337 	default:
1338 		error = fget(td, fd, &fp);
1339 	}
1340 	if (error) {
1341 		uma_zfree(aiocb_zone, aiocbe);
1342 		suword(&job->_aiocb_private.error, error);
1343 		return (error);
1344 	}
1345 	aiocbe->fd_file = fp;
1346 
1347 	if (aiocbe->uaiocb.aio_offset == -1LL) {
1348 		error = EINVAL;
1349 		goto aqueue_fail;
1350 	}
1351 
1352 	mtx_lock(&aio_job_mtx);
1353 	jid = jobrefid;
1354 	if (jobrefid == LONG_MAX)
1355 		jobrefid = 1;
1356 	else
1357 		jobrefid++;
1358 	mtx_unlock(&aio_job_mtx);
1359 
1360 	error = suword(&job->_aiocb_private.kernelinfo, jid);
1361 	if (error) {
1362 		error = EINVAL;
1363 		goto aqueue_fail;
1364 	}
1365 	aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jid;
1366 
1367 	if (opcode == LIO_NOP) {
1368 		fdrop(fp, td);
1369 		uma_zfree(aiocb_zone, aiocbe);
1370 		return (0);
1371 	}
1372 	if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) {
1373 		error = EINVAL;
1374 		goto aqueue_fail;
1375 	}
1376 
1377 	if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
1378 		kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue;
1379 	} else
1380 		goto no_kqueue;
1381 	error = fget(td, (u_int)kev.ident, &kq_fp);
1382 	if (error)
1383 		goto aqueue_fail;
1384 	if (kq_fp->f_type != DTYPE_KQUEUE) {
1385 		fdrop(kq_fp, td);
1386 		error = EBADF;
1387 		goto aqueue_fail;
1388 	}
1389 	kq = kq_fp->f_data;
1390 	kev.ident = (uintptr_t)aiocbe->uuaiocb;
1391 	kev.filter = EVFILT_AIO;
1392 	kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
1393 	kev.data = (intptr_t)aiocbe;
1394 	kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr;
1395 	error = kqueue_register(kq, &kev, td, 1);
1396 	fdrop(kq_fp, td);
1397 aqueue_fail:
1398 	if (error) {
1399 		fdrop(fp, td);
1400 		uma_zfree(aiocb_zone, aiocbe);
1401 		suword(&job->_aiocb_private.error, error);
1402 		goto done;
1403 	}
1404 no_kqueue:
1405 
1406 	suword(&job->_aiocb_private.error, EINPROGRESS);
1407 	aiocbe->uaiocb._aiocb_private.error = EINPROGRESS;
1408 	aiocbe->userproc = p;
1409 	aiocbe->cred = crhold(td->td_ucred);
1410 	aiocbe->jobflags = 0;
1411 	aiocbe->lio = lj;
1412 
1413 	if (fp->f_type == DTYPE_SOCKET) {
1414 		/*
1415 		 * Alternate queueing for socket ops: Reach down into the
1416 		 * descriptor to get the socket data.  Then check to see if the
1417 		 * socket is ready to be read or written (based on the requested
1418 		 * operation).
1419 		 *
1420 		 * If it is not ready for io, then queue the aiocbe on the
1421 		 * socket, and set the flags so we get a call when sbnotify()
1422 		 * happens.
1423 		 *
1424 		 * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
1425 		 * and unlock the snd sockbuf for no reason.
1426 		 */
1427 		so = fp->f_data;
1428 		sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
1429 		SOCKBUF_LOCK(sb);
1430 		if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
1431 		    LIO_WRITE) && (!sowriteable(so)))) {
1432 			sb->sb_flags |= SB_AIO;
1433 
1434 			mtx_lock(&aio_job_mtx);
1435 			TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
1436 			mtx_unlock(&aio_job_mtx);
1437 
1438 			PROC_LOCK(p);
1439 			TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
1440 			TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
1441 			aiocbe->jobstate = JOBST_JOBQSOCK;
1442 			ki->kaio_count++;
1443 			if (lj)
1444 				lj->lioj_count++;
1445 			PROC_UNLOCK(p);
1446 			SOCKBUF_UNLOCK(sb);
1447 			atomic_add_int(&num_queue_count, 1);
1448 			error = 0;
1449 			goto done;
1450 		}
1451 		SOCKBUF_UNLOCK(sb);
1452 	}
1453 
1454 	if ((error = aio_qphysio(p, aiocbe)) == 0)
1455 		goto done;
1456 #if 0
1457 	if (error > 0) {
1458 		aiocbe->uaiocb._aiocb_private.error = error;
1459 		suword(&job->_aiocb_private.error, error);
1460 		goto done;
1461 	}
1462 #endif
1463 	/* No buffer for daemon I/O. */
1464 	aiocbe->bp = NULL;
1465 
1466 	PROC_LOCK(p);
1467 	ki->kaio_count++;
1468 	if (lj)
1469 		lj->lioj_count++;
1470 	TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
1471 	TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
1472 
1473 	mtx_lock(&aio_job_mtx);
1474 	TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
1475 	aiocbe->jobstate = JOBST_JOBQGLOBAL;
1476 	PROC_UNLOCK(p);
1477 
1478 	atomic_add_int(&num_queue_count, 1);
1479 
1480 	/*
1481 	 * If we don't have a free AIO process, and we are below our quota, then
1482 	 * start one.  Otherwise, depend on the subsequent I/O completions to
1483 	 * pick-up this job.  If we don't sucessfully create the new process
1484 	 * (thread) due to resource issues, we return an error for now (EAGAIN),
1485 	 * which is likely not the correct thing to do.
1486 	 */
1487 retryproc:
1488 	error = 0;
1489 	if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
1490 		TAILQ_REMOVE(&aio_freeproc, aiop, list);
1491 		aiop->aiothreadflags &= ~AIOP_FREE;
1492 		wakeup(aiop->aiothread);
1493 	} else if (((num_aio_resv_start + num_aio_procs) < max_aio_procs) &&
1494 	    ((ki->kaio_active_count + num_aio_resv_start) <
1495 	    ki->kaio_maxactive_count)) {
1496 		num_aio_resv_start++;
1497 		mtx_unlock(&aio_job_mtx);
1498 		error = aio_newproc(&num_aio_resv_start);
1499 		mtx_lock(&aio_job_mtx);
1500 		if (error) {
1501 			num_aio_resv_start--;
1502 			goto retryproc;
1503 		}
1504 	}
1505 	mtx_unlock(&aio_job_mtx);
1506 
1507 done:
1508 	return (error);
1509 }
1510 
1511 /*
1512  * Support the aio_return system call, as a side-effect, kernel resources are
1513  * released.
1514  */
1515 int
1516 aio_return(struct thread *td, struct aio_return_args *uap)
1517 {
1518 	struct proc *p = td->td_proc;
1519 	struct aiocblist *cb;
1520 	struct aiocb *uaiocb;
1521 	struct kaioinfo *ki;
1522 	int status, error;
1523 
1524 	ki = p->p_aioinfo;
1525 	if (ki == NULL)
1526 		return (EINVAL);
1527 	uaiocb = uap->aiocbp;
1528 	PROC_LOCK(p);
1529 	TAILQ_FOREACH(cb, &ki->kaio_done, plist) {
1530 		if (cb->uuaiocb == uaiocb)
1531 			break;
1532 	}
1533 	if (cb != NULL) {
1534 		MPASS(cb->jobstate == JOBST_JOBFINISHED);
1535 		status = cb->uaiocb._aiocb_private.status;
1536 		error = cb->uaiocb._aiocb_private.error;
1537 		td->td_retval[0] = status;
1538 		if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
1539 			p->p_stats->p_ru.ru_oublock +=
1540 			    cb->outputcharge;
1541 			cb->outputcharge = 0;
1542 		} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
1543 			p->p_stats->p_ru.ru_inblock += cb->inputcharge;
1544 			cb->inputcharge = 0;
1545 		}
1546 		aio_free_entry(cb);
1547 		PROC_UNLOCK(p);
1548 		suword(&uaiocb->_aiocb_private.error, error);
1549 		suword(&uaiocb->_aiocb_private.status, status);
1550 	} else {
1551 		error = EINVAL;
1552 		PROC_UNLOCK(p);
1553 	}
1554 	return (error);
1555 }
1556 
1557 /*
1558  * Allow a process to wakeup when any of the I/O requests are completed.
1559  */
1560 int
1561 aio_suspend(struct thread *td, struct aio_suspend_args *uap)
1562 {
1563 	struct proc *p = td->td_proc;
1564 	struct timeval atv;
1565 	struct timespec ts;
1566 	struct aiocb *const *cbptr, *cbp;
1567 	struct kaioinfo *ki;
1568 	struct aiocblist *cb, *cbfirst;
1569 	struct aiocb **ujoblist;
1570 	int njoblist;
1571 	int error;
1572 	int timo;
1573 	int i;
1574 
1575 	if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX)
1576 		return (EINVAL);
1577 
1578 	timo = 0;
1579 	if (uap->timeout) {
1580 		/* Get timespec struct. */
1581 		if ((error = copyin(uap->timeout, &ts, sizeof(ts))) != 0)
1582 			return (error);
1583 
1584 		if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000)
1585 			return (EINVAL);
1586 
1587 		TIMESPEC_TO_TIMEVAL(&atv, &ts);
1588 		if (itimerfix(&atv))
1589 			return (EINVAL);
1590 		timo = tvtohz(&atv);
1591 	}
1592 
1593 	ki = p->p_aioinfo;
1594 	if (ki == NULL)
1595 		return (EAGAIN);
1596 
1597 	njoblist = 0;
1598 	ujoblist = uma_zalloc(aiol_zone, M_WAITOK);
1599 	cbptr = uap->aiocbp;
1600 
1601 	for (i = 0; i < uap->nent; i++) {
1602 		cbp = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
1603 		if (cbp == 0)
1604 			continue;
1605 		ujoblist[njoblist] = cbp;
1606 		njoblist++;
1607 	}
1608 
1609 	if (njoblist == 0) {
1610 		uma_zfree(aiol_zone, ujoblist);
1611 		return (0);
1612 	}
1613 
1614 	PROC_LOCK(p);
1615 	for (;;) {
1616 		cbfirst = NULL;
1617 		error = 0;
1618 		TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
1619 			for (i = 0; i < njoblist; i++) {
1620 				if (cb->uuaiocb == ujoblist[i]) {
1621 					if (cbfirst == NULL)
1622 						cbfirst = cb;
1623 					if (cb->jobstate == JOBST_JOBFINISHED)
1624 						goto RETURN;
1625 				}
1626 			}
1627 		}
1628 		/* All tasks were finished. */
1629 		if (cbfirst == NULL)
1630 			break;
1631 
1632 		ki->kaio_flags |= KAIO_WAKEUP;
1633 		error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
1634 		    "aiospn", timo);
1635 		if (error == ERESTART)
1636 			error = EINTR;
1637 		if (error)
1638 			break;
1639 	}
1640 RETURN:
1641 	PROC_UNLOCK(p);
1642 	uma_zfree(aiol_zone, ujoblist);
1643 	return (error);
1644 }
1645 
1646 /*
1647  * aio_cancel cancels any non-physio aio operations not currently in
1648  * progress.
1649  */
1650 int
1651 aio_cancel(struct thread *td, struct aio_cancel_args *uap)
1652 {
1653 	struct proc *p = td->td_proc;
1654 	struct kaioinfo *ki;
1655 	struct aiocblist *cbe, *cbn;
1656 	struct file *fp;
1657 	struct socket *so;
1658 	int error;
1659 	int remove;
1660 	int cancelled = 0;
1661 	int notcancelled = 0;
1662 	struct vnode *vp;
1663 
1664 	/* Lookup file object. */
1665 	error = fget(td, uap->fd, &fp);
1666 	if (error)
1667 		return (error);
1668 
1669 	ki = p->p_aioinfo;
1670 	if (ki == NULL)
1671 		goto done;
1672 
1673 	if (fp->f_type == DTYPE_VNODE) {
1674 		vp = fp->f_vnode;
1675 		if (vn_isdisk(vp, &error)) {
1676 			fdrop(fp, td);
1677 			td->td_retval[0] = AIO_NOTCANCELED;
1678 			return (0);
1679 		}
1680 	}
1681 
1682 	PROC_LOCK(p);
1683 	TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
1684 		if ((uap->fd == cbe->uaiocb.aio_fildes) &&
1685 		    ((uap->aiocbp == NULL) ||
1686 		     (uap->aiocbp == cbe->uuaiocb))) {
1687 			remove = 0;
1688 
1689 			mtx_lock(&aio_job_mtx);
1690 			if (cbe->jobstate == JOBST_JOBQGLOBAL) {
1691 				TAILQ_REMOVE(&aio_jobs, cbe, list);
1692 				remove = 1;
1693 			} else if (cbe->jobstate == JOBST_JOBQSOCK) {
1694 				MPASS(fp->f_type == DTYPE_SOCKET);
1695 				so = fp->f_data;
1696 				TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
1697 				remove = 1;
1698 			}
1699 			mtx_unlock(&aio_job_mtx);
1700 
1701 			if (remove) {
1702 				TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
1703 				cbe->uaiocb._aiocb_private.status = -1;
1704 				cbe->uaiocb._aiocb_private.error = ECANCELED;
1705 				aio_bio_done_notify(p, cbe, DONE_QUEUE);
1706 				cancelled++;
1707 			} else {
1708 				notcancelled++;
1709 			}
1710 			if (uap->aiocbp != NULL)
1711 				break;
1712 		}
1713 	}
1714 	PROC_UNLOCK(p);
1715 
1716 done:
1717 	fdrop(fp, td);
1718 
1719 	if (uap->aiocbp != NULL) {
1720 		if (cancelled) {
1721 			td->td_retval[0] = AIO_CANCELED;
1722 			return (0);
1723 		}
1724 	}
1725 
1726 	if (notcancelled) {
1727 		td->td_retval[0] = AIO_NOTCANCELED;
1728 		return (0);
1729 	}
1730 
1731 	if (cancelled) {
1732 		td->td_retval[0] = AIO_CANCELED;
1733 		return (0);
1734 	}
1735 
1736 	td->td_retval[0] = AIO_ALLDONE;
1737 
1738 	return (0);
1739 }
1740 
1741 /*
1742  * aio_error is implemented in the kernel level for compatibility purposes only.
1743  * For a user mode async implementation, it would be best to do it in a userland
1744  * subroutine.
1745  */
1746 int
1747 aio_error(struct thread *td, struct aio_error_args *uap)
1748 {
1749 	struct proc *p = td->td_proc;
1750 	struct aiocblist *cb;
1751 	struct kaioinfo *ki;
1752 	int status;
1753 
1754 	ki = p->p_aioinfo;
1755 	if (ki == NULL) {
1756 		td->td_retval[0] = EINVAL;
1757 		return (0);
1758 	}
1759 
1760 	PROC_LOCK(p);
1761 	TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
1762 		if (cb->uuaiocb == uap->aiocbp) {
1763 			if (cb->jobstate == JOBST_JOBFINISHED)
1764 				td->td_retval[0] =
1765 					cb->uaiocb._aiocb_private.error;
1766 			else
1767 				td->td_retval[0] = EINPROGRESS;
1768 			PROC_UNLOCK(p);
1769 			return (0);
1770 		}
1771 	}
1772 	PROC_UNLOCK(p);
1773 
1774 	/*
1775 	 * Hack for failure of aio_aqueue.
1776 	 */
1777 	status = fuword(&uap->aiocbp->_aiocb_private.status);
1778 	if (status == -1) {
1779 		td->td_retval[0] = fuword(&uap->aiocbp->_aiocb_private.error);
1780 		return (0);
1781 	}
1782 
1783 	td->td_retval[0] = EINVAL;
1784 	return (0);
1785 }
1786 
1787 /* syscall - asynchronous read from a file (REALTIME) */
1788 int
1789 oaio_read(struct thread *td, struct oaio_read_args *uap)
1790 {
1791 
1792 	return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1);
1793 }
1794 
1795 int
1796 aio_read(struct thread *td, struct aio_read_args *uap)
1797 {
1798 
1799 	return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0);
1800 }
1801 
1802 /* syscall - asynchronous write to a file (REALTIME) */
1803 int
1804 oaio_write(struct thread *td, struct oaio_write_args *uap)
1805 {
1806 
1807 	return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1);
1808 }
1809 
1810 int
1811 aio_write(struct thread *td, struct aio_write_args *uap)
1812 {
1813 
1814 	return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0);
1815 }
1816 
1817 /* syscall - list directed I/O (REALTIME) */
1818 int
1819 olio_listio(struct thread *td, struct olio_listio_args *uap)
1820 {
1821 	return do_lio_listio(td, (struct lio_listio_args *)uap, 1);
1822 }
1823 
1824 /* syscall - list directed I/O (REALTIME) */
1825 int
1826 lio_listio(struct thread *td, struct lio_listio_args *uap)
1827 {
1828 	return do_lio_listio(td, uap, 0);
1829 }
1830 
1831 static int
1832 do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
1833 {
1834 	struct proc *p = td->td_proc;
1835 	struct aiocb *iocb, * const *cbptr;
1836 	struct kaioinfo *ki;
1837 	struct aioliojob *lj;
1838 	struct kevent kev;
1839 	struct kqueue * kq;
1840 	struct file *kq_fp;
1841 	int nent;
1842 	int error;
1843 	int nerror;
1844 	int i;
1845 
1846 	if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT))
1847 		return (EINVAL);
1848 
1849 	nent = uap->nent;
1850 	if (nent < 0 || nent > AIO_LISTIO_MAX)
1851 		return (EINVAL);
1852 
1853 	if (p->p_aioinfo == NULL)
1854 		aio_init_aioinfo(p);
1855 
1856 	ki = p->p_aioinfo;
1857 
1858 	lj = uma_zalloc(aiolio_zone, M_WAITOK);
1859 	lj->lioj_flags = 0;
1860 	lj->lioj_count = 0;
1861 	lj->lioj_finished_count = 0;
1862 	knlist_init(&lj->klist, &p->p_mtx, NULL, NULL, NULL);
1863 	ksiginfo_init(&lj->lioj_ksi);
1864 
1865 	/*
1866 	 * Setup signal.
1867 	 */
1868 	if (uap->sig && (uap->mode == LIO_NOWAIT)) {
1869 		bzero(&lj->lioj_signal, sizeof(&lj->lioj_signal));
1870 		error = copyin(uap->sig, &lj->lioj_signal,
1871 				oldsigev ? sizeof(struct osigevent) :
1872 					   sizeof(struct sigevent));
1873 		if (error) {
1874 			uma_zfree(aiolio_zone, lj);
1875 			return (error);
1876 		}
1877 
1878 		if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
1879 			/* Assume only new style KEVENT */
1880 			error = fget(td, lj->lioj_signal.sigev_notify_kqueue,
1881 				&kq_fp);
1882 			if (error) {
1883 				uma_zfree(aiolio_zone, lj);
1884 				return (error);
1885 			}
1886 			if (kq_fp->f_type != DTYPE_KQUEUE) {
1887 				fdrop(kq_fp, td);
1888 				uma_zfree(aiolio_zone, lj);
1889 				return (EBADF);
1890 			}
1891 			kq = (struct kqueue *)kq_fp->f_data;
1892 			kev.filter = EVFILT_LIO;
1893 			kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
1894 			kev.ident = (uintptr_t)lj; /* something unique */
1895 			kev.data = (intptr_t)lj;
1896 			/* pass user defined sigval data */
1897 			kev.udata = lj->lioj_signal.sigev_value.sival_ptr;
1898 			error = kqueue_register(kq, &kev, td, 1);
1899 			fdrop(kq_fp, td);
1900 			if (error) {
1901 				uma_zfree(aiolio_zone, lj);
1902 				return (error);
1903 			}
1904 		} else if (lj->lioj_signal.sigev_notify == SIGEV_NONE) {
1905 			;
1906 		} else if (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
1907 			   lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID) {
1908 				if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
1909 					uma_zfree(aiolio_zone, lj);
1910 					return EINVAL;
1911 				}
1912 				lj->lioj_flags |= LIOJ_SIGNAL;
1913 		} else {
1914 			uma_zfree(aiolio_zone, lj);
1915 			return EINVAL;
1916 		}
1917 	}
1918 
1919 	PROC_LOCK(p);
1920 	TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
1921 	/*
1922 	 * Add extra aiocb count to avoid the lio to be freed
1923 	 * by other threads doing aio_waitcomplete or aio_return,
1924 	 * and prevent event from being sent until we have queued
1925 	 * all tasks.
1926 	 */
1927 	lj->lioj_count = 1;
1928 	PROC_UNLOCK(p);
1929 
1930 	/*
1931 	 * Get pointers to the list of I/O requests.
1932 	 */
1933 	nerror = 0;
1934 	cbptr = uap->acb_list;
1935 	for (i = 0; i < uap->nent; i++) {
1936 		iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
1937 		if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) {
1938 			error = aio_aqueue(td, iocb, lj, LIO_NOP, oldsigev);
1939 			if (error != 0)
1940 				nerror++;
1941 		}
1942 	}
1943 
1944 	error = 0;
1945 	PROC_LOCK(p);
1946 	if (uap->mode == LIO_WAIT) {
1947 		while (lj->lioj_count - 1 != lj->lioj_finished_count) {
1948 			ki->kaio_flags |= KAIO_WAKEUP;
1949 			error = msleep(&p->p_aioinfo, &p->p_mtx,
1950 			    PRIBIO | PCATCH, "aiospn", 0);
1951 			if (error == ERESTART)
1952 				error = EINTR;
1953 			if (error)
1954 				break;
1955 		}
1956 	} else {
1957 		if (lj->lioj_count - 1 == lj->lioj_finished_count) {
1958 			if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
1959 				lj->lioj_flags |= LIOJ_KEVENT_POSTED;
1960 				KNOTE_LOCKED(&lj->klist, 1);
1961 			}
1962 			if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
1963 			    == LIOJ_SIGNAL
1964 			    && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
1965 			    lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
1966 				aio_sendsig(p, &lj->lioj_signal,
1967 					    &lj->lioj_ksi);
1968 				lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
1969 			}
1970 		}
1971 	}
1972 	lj->lioj_count--;
1973 	if (lj->lioj_count == 0) {
1974 		TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
1975 		knlist_delete(&lj->klist, curthread, 1);
1976 		sigqueue_take(&lj->lioj_ksi);
1977 		PROC_UNLOCK(p);
1978 		uma_zfree(aiolio_zone, lj);
1979 	} else
1980 		PROC_UNLOCK(p);
1981 
1982 	if (nerror)
1983 		return (EIO);
1984 	return (error);
1985 }
1986 
1987 /*
1988  * Called from interrupt thread for physio, we should return as fast
1989  * as possible, so we schedule a biohelper task.
1990  */
1991 static void
1992 aio_physwakeup(struct buf *bp)
1993 {
1994 	struct aiocblist *aiocbe;
1995 
1996 	aiocbe = (struct aiocblist *)bp->b_caller1;
1997 	taskqueue_enqueue(taskqueue_aiod_bio, &aiocbe->biotask);
1998 }
1999 
2000 /*
2001  * Task routine to perform heavy tasks, process wakeup, and signals.
2002  */
2003 static void
2004 biohelper(void *context, int pending)
2005 {
2006 	struct aiocblist *aiocbe = context;
2007 	struct buf *bp;
2008 	struct proc *userp;
2009 	struct kaioinfo *ki;
2010 	int nblks;
2011 
2012 	bp = aiocbe->bp;
2013 	userp = aiocbe->userproc;
2014 	ki = userp->p_aioinfo;
2015 	PROC_LOCK(userp);
2016 	aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
2017 	aiocbe->uaiocb._aiocb_private.error = 0;
2018 	if (bp->b_ioflags & BIO_ERROR)
2019 		aiocbe->uaiocb._aiocb_private.error = bp->b_error;
2020 	nblks = btodb(aiocbe->uaiocb.aio_nbytes);
2021 	if (aiocbe->uaiocb.aio_lio_opcode == LIO_WRITE)
2022 		aiocbe->outputcharge += nblks;
2023 	else
2024 		aiocbe->inputcharge += nblks;
2025 	aiocbe->bp = NULL;
2026 	TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, aiocbe, plist);
2027 	ki->kaio_buffer_count--;
2028 	aio_bio_done_notify(userp, aiocbe, DONE_BUF);
2029 	PROC_UNLOCK(userp);
2030 
2031 	/* Release mapping into kernel space. */
2032 	vunmapbuf(bp);
2033 	relpbuf(bp, NULL);
2034 	atomic_subtract_int(&num_buf_aio, 1);
2035 }
2036 
2037 /* syscall - wait for the next completion of an aio request */
2038 int
2039 aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
2040 {
2041 	struct proc *p = td->td_proc;
2042 	struct timeval atv;
2043 	struct timespec ts;
2044 	struct kaioinfo *ki;
2045 	struct aiocblist *cb;
2046 	struct aiocb *uuaiocb;
2047 	int error, status, timo;
2048 
2049 	suword(uap->aiocbp, (long)NULL);
2050 
2051 	timo = 0;
2052 	if (uap->timeout) {
2053 		/* Get timespec struct. */
2054 		error = copyin(uap->timeout, &ts, sizeof(ts));
2055 		if (error)
2056 			return (error);
2057 
2058 		if ((ts.tv_nsec < 0) || (ts.tv_nsec >= 1000000000))
2059 			return (EINVAL);
2060 
2061 		TIMESPEC_TO_TIMEVAL(&atv, &ts);
2062 		if (itimerfix(&atv))
2063 			return (EINVAL);
2064 		timo = tvtohz(&atv);
2065 	}
2066 
2067 	if (p->p_aioinfo == NULL)
2068 		aio_init_aioinfo(p);
2069 	ki = p->p_aioinfo;
2070 
2071 	error = 0;
2072 	cb = NULL;
2073 	PROC_LOCK(p);
2074 	while ((cb = TAILQ_FIRST(&ki->kaio_done)) == NULL) {
2075 		ki->kaio_flags |= KAIO_WAKEUP;
2076 		error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
2077 		    "aiowc", timo);
2078 		if (timo && error == ERESTART)
2079 			error = EINTR;
2080 		if (error)
2081 			break;
2082 	}
2083 
2084 	if (cb != NULL) {
2085 		MPASS(cb->jobstate == JOBST_JOBFINISHED);
2086 		uuaiocb = cb->uuaiocb;
2087 		status = cb->uaiocb._aiocb_private.status;
2088 		error = cb->uaiocb._aiocb_private.error;
2089 		td->td_retval[0] = status;
2090 		if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
2091 			p->p_stats->p_ru.ru_oublock += cb->outputcharge;
2092 			cb->outputcharge = 0;
2093 		} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
2094 			p->p_stats->p_ru.ru_inblock += cb->inputcharge;
2095 			cb->inputcharge = 0;
2096 		}
2097 		aio_free_entry(cb);
2098 		PROC_UNLOCK(p);
2099 		suword(uap->aiocbp, (long)uuaiocb);
2100 		suword(&uuaiocb->_aiocb_private.error, error);
2101 		suword(&uuaiocb->_aiocb_private.status, status);
2102 	} else
2103 		PROC_UNLOCK(p);
2104 
2105 	return (error);
2106 }
2107 
2108 /* kqueue attach function */
2109 static int
2110 filt_aioattach(struct knote *kn)
2111 {
2112 	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2113 
2114 	/*
2115 	 * The aiocbe pointer must be validated before using it, so
2116 	 * registration is restricted to the kernel; the user cannot
2117 	 * set EV_FLAG1.
2118 	 */
2119 	if ((kn->kn_flags & EV_FLAG1) == 0)
2120 		return (EPERM);
2121 	kn->kn_flags &= ~EV_FLAG1;
2122 
2123 	knlist_add(&aiocbe->klist, kn, 0);
2124 
2125 	return (0);
2126 }
2127 
2128 /* kqueue detach function */
2129 static void
2130 filt_aiodetach(struct knote *kn)
2131 {
2132 	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2133 
2134 	if (!knlist_empty(&aiocbe->klist))
2135 		knlist_remove(&aiocbe->klist, kn, 0);
2136 }
2137 
2138 /* kqueue filter function */
2139 /*ARGSUSED*/
2140 static int
2141 filt_aio(struct knote *kn, long hint)
2142 {
2143 	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2144 
2145 	kn->kn_data = aiocbe->uaiocb._aiocb_private.error;
2146 	if (aiocbe->jobstate != JOBST_JOBFINISHED)
2147 		return (0);
2148 	kn->kn_flags |= EV_EOF;
2149 	return (1);
2150 }
2151 
2152 /* kqueue attach function */
2153 static int
2154 filt_lioattach(struct knote *kn)
2155 {
2156 	struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
2157 
2158 	/*
2159 	 * The aioliojob pointer must be validated before using it, so
2160 	 * registration is restricted to the kernel; the user cannot
2161 	 * set EV_FLAG1.
2162 	 */
2163 	if ((kn->kn_flags & EV_FLAG1) == 0)
2164 		return (EPERM);
2165 	kn->kn_flags &= ~EV_FLAG1;
2166 
2167 	knlist_add(&lj->klist, kn, 0);
2168 
2169 	return (0);
2170 }
2171 
2172 /* kqueue detach function */
2173 static void
2174 filt_liodetach(struct knote *kn)
2175 {
2176 	struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
2177 
2178 	if (!knlist_empty(&lj->klist))
2179 		knlist_remove(&lj->klist, kn, 0);
2180 }
2181 
2182 /* kqueue filter function */
2183 /*ARGSUSED*/
2184 static int
2185 filt_lio(struct knote *kn, long hint)
2186 {
2187 	struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
2188 
2189 	return (lj->lioj_flags & LIOJ_KEVENT_POSTED);
2190 }
2191