xref: /illumos-gate/usr/src/uts/common/os/aio_subr.c (revision df05b9eea34242cff39427aa1782e012cd696979)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <sys/types.h>
28 #include <sys/proc.h>
29 #include <sys/file.h>
30 #include <sys/errno.h>
31 #include <sys/param.h>
32 #include <sys/sysmacros.h>
33 #include <sys/cmn_err.h>
34 #include <sys/systm.h>
35 #include <vm/as.h>
36 #include <vm/page.h>
37 #include <sys/uio.h>
38 #include <sys/kmem.h>
39 #include <sys/debug.h>
40 #include <sys/aio_impl.h>
41 #include <sys/epm.h>
42 #include <sys/fs/snode.h>
43 #include <sys/siginfo.h>
44 #include <sys/cpuvar.h>
45 #include <sys/tnf_probe.h>
46 #include <sys/conf.h>
47 #include <sys/sdt.h>
48 
49 int aphysio(int (*)(), int (*)(), dev_t, int, void (*)(), struct aio_req *);
50 void aio_done(struct buf *);
51 void aphysio_unlock(aio_req_t *);
52 void aio_cleanup(int);
53 void aio_cleanup_exit(void);
54 
55 /*
56  * private functions
57  */
58 static void aio_sigev_send(proc_t *, sigqueue_t *);
59 static void aio_hash_delete(aio_t *, aio_req_t *);
60 static void aio_lio_free(aio_t *, aio_lio_t *);
61 static int aio_cleanup_cleanupq(aio_t *, aio_req_t *, int);
62 static int aio_cleanup_notifyq(aio_t *, aio_req_t *, int);
63 static void aio_cleanup_pollq(aio_t *, aio_req_t *, int);
64 static void aio_cleanup_portq(aio_t *, aio_req_t *, int);
65 
66 /*
67  * async version of physio() that doesn't wait synchronously
68  * for the driver's strategy routine to complete.
69  */
70 
71 int
72 aphysio(
73 	int (*strategy)(struct buf *),
74 	int (*cancel)(struct buf *),
75 	dev_t dev,
76 	int rw,
77 	void (*mincnt)(struct buf *),
78 	struct aio_req *aio)
79 {
80 	struct uio *uio = aio->aio_uio;
81 	aio_req_t *reqp = (aio_req_t *)aio->aio_private;
82 	struct buf *bp = &reqp->aio_req_buf;
83 	struct iovec *iov;
84 	struct as *as;
85 	char *a;
86 	int	error;
87 	size_t	c;
88 	struct page **pplist;
89 	struct dev_ops *ops = devopsp[getmajor(dev)];
90 
91 	if (uio->uio_loffset < 0)
92 		return (EINVAL);
93 #ifdef	_ILP32
94 	/*
95 	 * For 32-bit kernels, check against SPEC_MAXOFFSET_T which represents
96 	 * the maximum size that can be supported by the IO subsystem.
97 	 * XXX this code assumes a D_64BIT driver.
98 	 */
99 	if (uio->uio_loffset > SPEC_MAXOFFSET_T)
100 		return (EINVAL);
101 #endif	/* _ILP32 */
102 
103 	TNF_PROBE_5(aphysio_start, "kaio", /* CSTYLED */,
104 	    tnf_opaque, bp, bp,
105 	    tnf_device, device, dev,
106 	    tnf_offset, blkno, btodt(uio->uio_loffset),
107 	    tnf_size, size, uio->uio_iov->iov_len,
108 	    tnf_bioflags, rw, rw);
109 
110 	if (rw == B_READ) {
111 		CPU_STATS_ADD_K(sys, phread, 1);
112 	} else {
113 		CPU_STATS_ADD_K(sys, phwrite, 1);
114 	}
115 
116 	iov = uio->uio_iov;
117 	sema_init(&bp->b_sem, 0, NULL, SEMA_DEFAULT, NULL);
118 	sema_init(&bp->b_io, 0, NULL, SEMA_DEFAULT, NULL);
119 
120 	bp->b_error = 0;
121 	bp->b_flags = B_BUSY | B_PHYS | B_ASYNC | rw;
122 	bp->b_edev = dev;
123 	bp->b_dev = cmpdev(dev);
124 	bp->b_lblkno = btodt(uio->uio_loffset);
125 	bp->b_offset = uio->uio_loffset;
126 	(void) ops->devo_getinfo(NULL, DDI_INFO_DEVT2DEVINFO,
127 	    (void *)bp->b_edev, (void **)&bp->b_dip);
128 
129 	/*
130 	 * Clustering: Clustering can set the b_iodone, b_forw and
131 	 * b_proc fields to cluster-specifc values.
132 	 */
133 	if (bp->b_iodone == NULL) {
134 		bp->b_iodone = (int (*)()) aio_done;
135 		/* b_forw points at an aio_req_t structure */
136 		bp->b_forw = (struct buf *)reqp;
137 		bp->b_proc = curproc;
138 	}
139 
140 	a = bp->b_un.b_addr = iov->iov_base;
141 	c = bp->b_bcount = iov->iov_len;
142 
143 	(*mincnt)(bp);
144 	if (bp->b_bcount != iov->iov_len)
145 		return (ENOTSUP);
146 
147 	as = bp->b_proc->p_as;
148 
149 	error = as_pagelock(as, &pplist, a,
150 	    c, rw == B_READ? S_WRITE : S_READ);
151 	if (error != 0) {
152 		bp->b_flags |= B_ERROR;
153 		bp->b_error = error;
154 		bp->b_flags &= ~(B_BUSY|B_WANTED|B_PHYS|B_SHADOW);
155 		return (error);
156 	}
157 	reqp->aio_req_flags |= AIO_PAGELOCKDONE;
158 	bp->b_shadow = pplist;
159 	if (pplist != NULL) {
160 		bp->b_flags |= B_SHADOW;
161 	}
162 
163 	if (cancel != anocancel)
164 		cmn_err(CE_PANIC,
165 		    "aphysio: cancellation not supported, use anocancel");
166 
167 	reqp->aio_req_cancel = cancel;
168 
169 	DTRACE_IO1(start, struct buf *, bp);
170 
171 	return ((*strategy)(bp));
172 }
173 
174 /*ARGSUSED*/
175 int
176 anocancel(struct buf *bp)
177 {
178 	return (ENXIO);
179 }
180 
181 /*
182  * Called from biodone().
183  * Notify process that a pending AIO has finished.
184  */
185 
186 /*
187  * Clustering: This function is made non-static as it is used
188  * by clustering s/w as contract private interface.
189  */
190 
191 void
192 aio_done(struct buf *bp)
193 {
194 	proc_t *p;
195 	struct as *as;
196 	aio_req_t *reqp;
197 	aio_lio_t *head = NULL;
198 	aio_t *aiop;
199 	sigqueue_t *sigev = NULL;
200 	sigqueue_t *lio_sigev = NULL;
201 	port_kevent_t *pkevp = NULL;
202 	port_kevent_t *lio_pkevp = NULL;
203 	int fd;
204 	int cleanupqflag;
205 	int pollqflag;
206 	int portevpend;
207 	void (*func)();
208 	int use_port = 0;
209 	int reqp_flags = 0;
210 	int send_signal = 0;
211 
212 	p = bp->b_proc;
213 	as = p->p_as;
214 	reqp = (aio_req_t *)bp->b_forw;
215 	fd = reqp->aio_req_fd;
216 
217 	TNF_PROBE_5(aphysio_end, "kaio", /* CSTYLED */,
218 	    tnf_opaque, bp, bp,
219 	    tnf_device, device, bp->b_edev,
220 	    tnf_offset, blkno, btodt(reqp->aio_req_uio.uio_loffset),
221 	    tnf_size, size, reqp->aio_req_uio.uio_iov->iov_len,
222 	    tnf_bioflags, rw, (bp->b_flags & (B_READ|B_WRITE)));
223 
224 	/*
225 	 * mapout earlier so that more kmem is available when aio is
226 	 * heavily used. bug #1262082
227 	 */
228 	if (bp->b_flags & B_REMAPPED)
229 		bp_mapout(bp);
230 
231 	/* decrement fd's ref count by one, now that aio request is done. */
232 	areleasef(fd, P_FINFO(p));
233 
234 	aiop = p->p_aio;
235 	ASSERT(aiop != NULL);
236 
237 	mutex_enter(&aiop->aio_portq_mutex);
238 	mutex_enter(&aiop->aio_mutex);
239 	ASSERT(aiop->aio_pending > 0);
240 	ASSERT(reqp->aio_req_flags & AIO_PENDING);
241 	aiop->aio_pending--;
242 	reqp->aio_req_flags &= ~AIO_PENDING;
243 	reqp_flags = reqp->aio_req_flags;
244 	if ((pkevp = reqp->aio_req_portkev) != NULL) {
245 		/* Event port notification is desired for this transaction */
246 		if (reqp->aio_req_flags & AIO_CLOSE_PORT) {
247 			/*
248 			 * The port is being closed and it is waiting for
249 			 * pending asynchronous I/O transactions to complete.
250 			 */
251 			portevpend = --aiop->aio_portpendcnt;
252 			aio_deq(&aiop->aio_portpending, reqp);
253 			aio_enq(&aiop->aio_portq, reqp, 0);
254 			mutex_exit(&aiop->aio_mutex);
255 			mutex_exit(&aiop->aio_portq_mutex);
256 			port_send_event(pkevp);
257 			if (portevpend == 0)
258 				cv_broadcast(&aiop->aio_portcv);
259 			return;
260 		}
261 
262 		if (aiop->aio_flags & AIO_CLEANUP) {
263 			/*
264 			 * aio_cleanup_thread() is waiting for completion of
265 			 * transactions.
266 			 */
267 			mutex_enter(&as->a_contents);
268 			aio_deq(&aiop->aio_portpending, reqp);
269 			aio_enq(&aiop->aio_portcleanupq, reqp, 0);
270 			cv_signal(&aiop->aio_cleanupcv);
271 			mutex_exit(&as->a_contents);
272 			mutex_exit(&aiop->aio_mutex);
273 			mutex_exit(&aiop->aio_portq_mutex);
274 			return;
275 		}
276 
277 		aio_deq(&aiop->aio_portpending, reqp);
278 		aio_enq(&aiop->aio_portq, reqp, 0);
279 
280 		use_port = 1;
281 	} else {
282 		/*
283 		 * when the AIO_CLEANUP flag is enabled for this
284 		 * process, or when the AIO_POLL bit is set for
285 		 * this request, special handling is required.
286 		 * otherwise the request is put onto the doneq.
287 		 */
288 		cleanupqflag = (aiop->aio_flags & AIO_CLEANUP);
289 		pollqflag = (reqp->aio_req_flags & AIO_POLL);
290 		if (cleanupqflag | pollqflag) {
291 
292 			if (cleanupqflag)
293 				mutex_enter(&as->a_contents);
294 
295 			/*
296 			 * requests with their AIO_POLL bit set are put
297 			 * on the pollq, requests with sigevent structures
298 			 * or with listio heads are put on the notifyq, and
299 			 * the remaining requests don't require any special
300 			 * cleanup handling, so they're put onto the default
301 			 * cleanupq.
302 			 */
303 			if (pollqflag)
304 				aio_enq(&aiop->aio_pollq, reqp, AIO_POLLQ);
305 			else if (reqp->aio_req_sigqp || reqp->aio_req_lio)
306 				aio_enq(&aiop->aio_notifyq, reqp, AIO_NOTIFYQ);
307 			else
308 				aio_enq(&aiop->aio_cleanupq, reqp,
309 				    AIO_CLEANUPQ);
310 
311 			if (cleanupqflag) {
312 				cv_signal(&aiop->aio_cleanupcv);
313 				mutex_exit(&as->a_contents);
314 				mutex_exit(&aiop->aio_mutex);
315 				mutex_exit(&aiop->aio_portq_mutex);
316 			} else {
317 				ASSERT(pollqflag);
318 				/* block aio_cleanup_exit until we're done */
319 				aiop->aio_flags |= AIO_DONE_ACTIVE;
320 				mutex_exit(&aiop->aio_mutex);
321 				mutex_exit(&aiop->aio_portq_mutex);
322 				/*
323 				 * let the cleanup processing happen from an AST
324 				 * set an AST on all threads in this process
325 				 */
326 				mutex_enter(&p->p_lock);
327 				set_proc_ast(p);
328 				mutex_exit(&p->p_lock);
329 				mutex_enter(&aiop->aio_mutex);
330 				/* wakeup anybody waiting in aiowait() */
331 				cv_broadcast(&aiop->aio_waitcv);
332 
333 				/* wakeup aio_cleanup_exit if needed */
334 				if (aiop->aio_flags & AIO_CLEANUP)
335 					cv_signal(&aiop->aio_cleanupcv);
336 				aiop->aio_flags &= ~AIO_DONE_ACTIVE;
337 				mutex_exit(&aiop->aio_mutex);
338 			}
339 			return;
340 		}
341 
342 		/*
343 		 * save req's sigevent pointer, and check its
344 		 * value after releasing aio_mutex lock.
345 		 */
346 		sigev = reqp->aio_req_sigqp;
347 		reqp->aio_req_sigqp = NULL;
348 
349 		/* put request on done queue. */
350 		aio_enq(&aiop->aio_doneq, reqp, AIO_DONEQ);
351 	} /* portkevent */
352 
353 	/*
354 	 * when list IO notification is enabled, a notification or
355 	 * signal is sent only when all entries in the list are done.
356 	 */
357 	if ((head = reqp->aio_req_lio) != NULL) {
358 		ASSERT(head->lio_refcnt > 0);
359 		if (--head->lio_refcnt == 0) {
360 			/*
361 			 * save lio's sigevent pointer, and check
362 			 * its value after releasing aio_mutex lock.
363 			 */
364 			lio_sigev = head->lio_sigqp;
365 			head->lio_sigqp = NULL;
366 			cv_signal(&head->lio_notify);
367 			if (head->lio_port >= 0 &&
368 			    (lio_pkevp = head->lio_portkev) != NULL)
369 				head->lio_port = -1;
370 		}
371 	}
372 
373 	/*
374 	 * if AIO_WAITN set then
375 	 * send signal only when we reached the
376 	 * required amount of IO's finished
377 	 * or when all IO's are done
378 	 */
379 	if (aiop->aio_flags & AIO_WAITN) {
380 		if (aiop->aio_waitncnt > 0)
381 			aiop->aio_waitncnt--;
382 		if (aiop->aio_pending == 0 ||
383 		    aiop->aio_waitncnt == 0)
384 			cv_broadcast(&aiop->aio_waitcv);
385 	} else {
386 		cv_broadcast(&aiop->aio_waitcv);
387 	}
388 
389 	/*
390 	 * No need to set this flag for pollq, portq, lio requests.
391 	 * Send a SIGIO signal when the process has a handler enabled.
392 	 */
393 	if (!sigev && !use_port && head == NULL &&
394 	    (func = PTOU(p)->u_signal[SIGIO - 1]) != SIG_DFL &&
395 	    (func != SIG_IGN)) {
396 		send_signal = 1;
397 		reqp->aio_req_flags |= AIO_SIGNALLED;
398 	}
399 
400 	mutex_exit(&aiop->aio_mutex);
401 	mutex_exit(&aiop->aio_portq_mutex);
402 
403 	/*
404 	 * Could the cleanup thread be waiting for AIO with locked
405 	 * resources to finish?
406 	 * Ideally in that case cleanup thread should block on cleanupcv,
407 	 * but there is a window, where it could miss to see a new aio
408 	 * request that sneaked in.
409 	 */
410 	mutex_enter(&as->a_contents);
411 	if ((reqp_flags & AIO_PAGELOCKDONE) && AS_ISUNMAPWAIT(as))
412 		cv_broadcast(&as->a_cv);
413 	mutex_exit(&as->a_contents);
414 
415 	if (sigev)
416 		aio_sigev_send(p, sigev);
417 	else if (send_signal)
418 		psignal(p, SIGIO);
419 
420 	if (pkevp)
421 		port_send_event(pkevp);
422 	if (lio_sigev)
423 		aio_sigev_send(p, lio_sigev);
424 	if (lio_pkevp)
425 		port_send_event(lio_pkevp);
426 }
427 
428 /*
429  * send a queued signal to the specified process when
430  * the event signal is non-NULL. A return value of 1
431  * will indicate that a signal is queued, and 0 means that
432  * no signal was specified, nor sent.
433  */
434 static void
435 aio_sigev_send(proc_t *p, sigqueue_t *sigev)
436 {
437 	ASSERT(sigev != NULL);
438 
439 	mutex_enter(&p->p_lock);
440 	sigaddqa(p, NULL, sigev);
441 	mutex_exit(&p->p_lock);
442 }
443 
444 /*
445  * special case handling for zero length requests. the aio request
446  * short circuits the normal completion path since all that's required
447  * to complete this request is to copyout a zero to the aio request's
448  * return value.
449  */
450 void
451 aio_zerolen(aio_req_t *reqp)
452 {
453 
454 	struct buf *bp = &reqp->aio_req_buf;
455 
456 	reqp->aio_req_flags |= AIO_ZEROLEN;
457 
458 	bp->b_forw = (struct buf *)reqp;
459 	bp->b_proc = curproc;
460 
461 	bp->b_resid = 0;
462 	bp->b_flags = 0;
463 
464 	aio_done(bp);
465 }
466 
467 /*
468  * unlock pages previously locked by as_pagelock
469  */
470 void
471 aphysio_unlock(aio_req_t *reqp)
472 {
473 	struct buf *bp;
474 	struct iovec *iov;
475 	int flags;
476 
477 	if (reqp->aio_req_flags & AIO_PHYSIODONE)
478 		return;
479 
480 	reqp->aio_req_flags |= AIO_PHYSIODONE;
481 
482 	if (reqp->aio_req_flags & AIO_ZEROLEN)
483 		return;
484 
485 	bp = &reqp->aio_req_buf;
486 	iov = reqp->aio_req_uio.uio_iov;
487 	flags = (((bp->b_flags & B_READ) == B_READ) ? S_WRITE : S_READ);
488 	if (reqp->aio_req_flags & AIO_PAGELOCKDONE) {
489 		as_pageunlock(bp->b_proc->p_as,
490 		    bp->b_flags & B_SHADOW ? bp->b_shadow : NULL,
491 		    iov->iov_base, iov->iov_len, flags);
492 		reqp->aio_req_flags &= ~AIO_PAGELOCKDONE;
493 	}
494 	bp->b_flags &= ~(B_BUSY|B_WANTED|B_PHYS|B_SHADOW);
495 	bp->b_flags |= B_DONE;
496 }
497 
498 /*
499  * deletes a requests id from the hash table of outstanding io.
500  */
501 static void
502 aio_hash_delete(aio_t *aiop, struct aio_req_t *reqp)
503 {
504 	long index;
505 	aio_result_t *resultp = reqp->aio_req_resultp;
506 	aio_req_t *current;
507 	aio_req_t **nextp;
508 
509 	index = AIO_HASH(resultp);
510 	nextp = (aiop->aio_hash + index);
511 	while ((current = *nextp) != NULL) {
512 		if (current->aio_req_resultp == resultp) {
513 			*nextp = current->aio_hash_next;
514 			return;
515 		}
516 		nextp = &current->aio_hash_next;
517 	}
518 }
519 
520 /*
521  * Put a list head struct onto its free list.
522  */
523 static void
524 aio_lio_free(aio_t *aiop, aio_lio_t *head)
525 {
526 	ASSERT(MUTEX_HELD(&aiop->aio_mutex));
527 
528 	if (head->lio_sigqp != NULL)
529 		kmem_free(head->lio_sigqp, sizeof (sigqueue_t));
530 	head->lio_next = aiop->aio_lio_free;
531 	aiop->aio_lio_free = head;
532 }
533 
534 /*
535  * Put a reqp onto the freelist.
536  */
537 void
538 aio_req_free(aio_t *aiop, aio_req_t *reqp)
539 {
540 	aio_lio_t *liop;
541 
542 	ASSERT(MUTEX_HELD(&aiop->aio_mutex));
543 
544 	if (reqp->aio_req_portkev) {
545 		port_free_event(reqp->aio_req_portkev);
546 		reqp->aio_req_portkev = NULL;
547 	}
548 
549 	if ((liop = reqp->aio_req_lio) != NULL) {
550 		if (--liop->lio_nent == 0)
551 			aio_lio_free(aiop, liop);
552 		reqp->aio_req_lio = NULL;
553 	}
554 	if (reqp->aio_req_sigqp != NULL) {
555 		kmem_free(reqp->aio_req_sigqp, sizeof (sigqueue_t));
556 		reqp->aio_req_sigqp = NULL;
557 	}
558 	reqp->aio_req_next = aiop->aio_free;
559 	reqp->aio_req_prev = NULL;
560 	aiop->aio_free = reqp;
561 	aiop->aio_outstanding--;
562 	if (aiop->aio_outstanding == 0)
563 		cv_broadcast(&aiop->aio_waitcv);
564 	aio_hash_delete(aiop, reqp);
565 }
566 
567 /*
568  * Put a reqp onto the freelist.
569  */
570 void
571 aio_req_free_port(aio_t *aiop, aio_req_t *reqp)
572 {
573 	ASSERT(MUTEX_HELD(&aiop->aio_mutex));
574 
575 	reqp->aio_req_next = aiop->aio_free;
576 	reqp->aio_req_prev = NULL;
577 	aiop->aio_free = reqp;
578 	aiop->aio_outstanding--;
579 	aio_hash_delete(aiop, reqp);
580 }
581 
582 
583 /*
584  * Verify the integrity of a queue.
585  */
586 #if defined(DEBUG)
587 static void
588 aio_verify_queue(aio_req_t *head,
589 	aio_req_t *entry_present, aio_req_t *entry_missing)
590 {
591 	aio_req_t *reqp;
592 	int found = 0;
593 	int present = 0;
594 
595 	if ((reqp = head) != NULL) {
596 		do {
597 			ASSERT(reqp->aio_req_prev->aio_req_next == reqp);
598 			ASSERT(reqp->aio_req_next->aio_req_prev == reqp);
599 			if (entry_present == reqp)
600 				found++;
601 			if (entry_missing == reqp)
602 				present++;
603 		} while ((reqp = reqp->aio_req_next) != head);
604 	}
605 	ASSERT(entry_present == NULL || found == 1);
606 	ASSERT(entry_missing == NULL || present == 0);
607 }
608 #else
609 #define	aio_verify_queue(x, y, z)
610 #endif
611 
612 /*
613  * Put a request onto the tail of a queue.
614  */
615 void
616 aio_enq(aio_req_t **qhead, aio_req_t *reqp, int qflg_new)
617 {
618 	aio_req_t *head;
619 	aio_req_t *prev;
620 
621 	aio_verify_queue(*qhead, NULL, reqp);
622 
623 	if ((head = *qhead) == NULL) {
624 		reqp->aio_req_next = reqp;
625 		reqp->aio_req_prev = reqp;
626 		*qhead = reqp;
627 	} else {
628 		reqp->aio_req_next = head;
629 		reqp->aio_req_prev = prev = head->aio_req_prev;
630 		prev->aio_req_next = reqp;
631 		head->aio_req_prev = reqp;
632 	}
633 	reqp->aio_req_flags |= qflg_new;
634 }
635 
636 /*
637  * Remove a request from its queue.
638  */
639 void
640 aio_deq(aio_req_t **qhead, aio_req_t *reqp)
641 {
642 	aio_verify_queue(*qhead, reqp, NULL);
643 
644 	if (reqp->aio_req_next == reqp) {
645 		*qhead = NULL;
646 	} else {
647 		reqp->aio_req_prev->aio_req_next = reqp->aio_req_next;
648 		reqp->aio_req_next->aio_req_prev = reqp->aio_req_prev;
649 		if (*qhead == reqp)
650 			*qhead = reqp->aio_req_next;
651 	}
652 	reqp->aio_req_next = NULL;
653 	reqp->aio_req_prev = NULL;
654 }
655 
656 /*
657  * concatenate a specified queue with the cleanupq. the specified
658  * queue is put onto the tail of the cleanupq. all elements on the
659  * specified queue should have their aio_req_flags field cleared.
660  */
661 /*ARGSUSED*/
662 void
663 aio_cleanupq_concat(aio_t *aiop, aio_req_t *q2, int qflg)
664 {
665 	aio_req_t *cleanupqhead, *q2tail;
666 	aio_req_t *reqp = q2;
667 
668 	do {
669 		ASSERT(reqp->aio_req_flags & qflg);
670 		reqp->aio_req_flags &= ~qflg;
671 		reqp->aio_req_flags |= AIO_CLEANUPQ;
672 	} while ((reqp = reqp->aio_req_next) != q2);
673 
674 	cleanupqhead = aiop->aio_cleanupq;
675 	if (cleanupqhead == NULL)
676 		aiop->aio_cleanupq = q2;
677 	else {
678 		cleanupqhead->aio_req_prev->aio_req_next = q2;
679 		q2tail = q2->aio_req_prev;
680 		q2tail->aio_req_next = cleanupqhead;
681 		q2->aio_req_prev = cleanupqhead->aio_req_prev;
682 		cleanupqhead->aio_req_prev = q2tail;
683 	}
684 }
685 
686 /*
687  * cleanup aio requests that are on the per-process poll queue.
688  */
689 void
690 aio_cleanup(int flag)
691 {
692 	aio_t *aiop = curproc->p_aio;
693 	aio_req_t *pollqhead, *cleanupqhead, *notifyqhead;
694 	aio_req_t *cleanupport;
695 	aio_req_t *portq = NULL;
696 	void (*func)();
697 	int signalled = 0;
698 	int qflag = 0;
699 	int exitflg;
700 
701 	ASSERT(aiop != NULL);
702 
703 	if (flag == AIO_CLEANUP_EXIT)
704 		exitflg = AIO_CLEANUP_EXIT;
705 	else
706 		exitflg = 0;
707 
708 	/*
709 	 * We need to get the aio_cleanupq_mutex because we are calling
710 	 * aio_cleanup_cleanupq()
711 	 */
712 	mutex_enter(&aiop->aio_cleanupq_mutex);
713 	/*
714 	 * take all the requests off the cleanupq, the notifyq,
715 	 * and the pollq.
716 	 */
717 	mutex_enter(&aiop->aio_mutex);
718 	if ((cleanupqhead = aiop->aio_cleanupq) != NULL) {
719 		aiop->aio_cleanupq = NULL;
720 		qflag++;
721 	}
722 	if ((notifyqhead = aiop->aio_notifyq) != NULL) {
723 		aiop->aio_notifyq = NULL;
724 		qflag++;
725 	}
726 	if ((pollqhead = aiop->aio_pollq) != NULL) {
727 		aiop->aio_pollq = NULL;
728 		qflag++;
729 	}
730 	if (flag) {
731 		if ((portq = aiop->aio_portq) != NULL)
732 			qflag++;
733 
734 		if ((cleanupport = aiop->aio_portcleanupq) != NULL) {
735 			aiop->aio_portcleanupq = NULL;
736 			qflag++;
737 		}
738 	}
739 	mutex_exit(&aiop->aio_mutex);
740 
741 	/*
742 	 * return immediately if cleanupq, pollq, and
743 	 * notifyq are all empty. someone else must have
744 	 * emptied them.
745 	 */
746 	if (!qflag) {
747 		mutex_exit(&aiop->aio_cleanupq_mutex);
748 		return;
749 	}
750 
751 	/*
752 	 * do cleanup for the various queues.
753 	 */
754 	if (cleanupqhead)
755 		signalled = aio_cleanup_cleanupq(aiop, cleanupqhead, exitflg);
756 	mutex_exit(&aiop->aio_cleanupq_mutex);
757 	if (notifyqhead)
758 		signalled = aio_cleanup_notifyq(aiop, notifyqhead, exitflg);
759 	if (pollqhead)
760 		aio_cleanup_pollq(aiop, pollqhead, exitflg);
761 	if (flag && (cleanupport || portq))
762 		aio_cleanup_portq(aiop, cleanupport, exitflg);
763 
764 	if (exitflg)
765 		return;
766 
767 	/*
768 	 * If we have an active aio_cleanup_thread it's possible for
769 	 * this routine to push something on to the done queue after
770 	 * an aiowait/aiosuspend thread has already decided to block.
771 	 * This being the case, we need a cv_broadcast here to wake
772 	 * these threads up. It is simpler and cleaner to do this
773 	 * broadcast here than in the individual cleanup routines.
774 	 */
775 
776 	mutex_enter(&aiop->aio_mutex);
777 	cv_broadcast(&aiop->aio_waitcv);
778 	mutex_exit(&aiop->aio_mutex);
779 
780 	/*
781 	 * Only if the process wasn't already signalled,
782 	 * determine if a SIGIO signal should be delievered.
783 	 */
784 	if (!signalled &&
785 	    (func = PTOU(curproc)->u_signal[SIGIO - 1]) != SIG_DFL &&
786 	    func != SIG_IGN)
787 		psignal(curproc, SIGIO);
788 }
789 
790 
791 /*
792  * Do cleanup for every element of the port cleanup queue.
793  */
794 static void
795 aio_cleanup_portq(aio_t *aiop, aio_req_t *cleanupq, int exitflag)
796 {
797 	aio_req_t	*reqp;
798 	aio_req_t	*next;
799 	aio_req_t	*headp;
800 	aio_lio_t	*liop;
801 
802 	/* first check the portq */
803 	if (exitflag || ((aiop->aio_flags & AIO_CLEANUP_PORT) == 0)) {
804 		mutex_enter(&aiop->aio_mutex);
805 		if (aiop->aio_flags & AIO_CLEANUP)
806 			aiop->aio_flags |= AIO_CLEANUP_PORT;
807 		mutex_exit(&aiop->aio_mutex);
808 
809 		/*
810 		 * It is not allowed to hold locks during aphysio_unlock().
811 		 * The aio_done() interrupt function will try to acquire
812 		 * aio_mutex and aio_portq_mutex.  Therefore we disconnect
813 		 * the portq list from the aiop for the duration of the
814 		 * aphysio_unlock() loop below.
815 		 */
816 		mutex_enter(&aiop->aio_portq_mutex);
817 		headp = aiop->aio_portq;
818 		aiop->aio_portq = NULL;
819 		mutex_exit(&aiop->aio_portq_mutex);
820 		if ((reqp = headp) != NULL) {
821 			do {
822 				next = reqp->aio_req_next;
823 				aphysio_unlock(reqp);
824 				if (exitflag) {
825 					mutex_enter(&aiop->aio_mutex);
826 					aio_req_free(aiop, reqp);
827 					mutex_exit(&aiop->aio_mutex);
828 				}
829 			} while ((reqp = next) != headp);
830 		}
831 
832 		if (headp != NULL && exitflag == 0) {
833 			/* move unlocked requests back to the port queue */
834 			aio_req_t *newq;
835 
836 			mutex_enter(&aiop->aio_portq_mutex);
837 			if ((newq = aiop->aio_portq) != NULL) {
838 				aio_req_t *headprev = headp->aio_req_prev;
839 				aio_req_t *newqprev = newq->aio_req_prev;
840 
841 				headp->aio_req_prev = newqprev;
842 				newq->aio_req_prev = headprev;
843 				headprev->aio_req_next = newq;
844 				newqprev->aio_req_next = headp;
845 			}
846 			aiop->aio_portq = headp;
847 			cv_broadcast(&aiop->aio_portcv);
848 			mutex_exit(&aiop->aio_portq_mutex);
849 		}
850 	}
851 
852 	/* now check the port cleanup queue */
853 	if ((reqp = cleanupq) == NULL)
854 		return;
855 	do {
856 		next = reqp->aio_req_next;
857 		aphysio_unlock(reqp);
858 		if (exitflag) {
859 			mutex_enter(&aiop->aio_mutex);
860 			aio_req_free(aiop, reqp);
861 			mutex_exit(&aiop->aio_mutex);
862 		} else {
863 			mutex_enter(&aiop->aio_portq_mutex);
864 			aio_enq(&aiop->aio_portq, reqp, 0);
865 			mutex_exit(&aiop->aio_portq_mutex);
866 			port_send_event(reqp->aio_req_portkev);
867 			if ((liop = reqp->aio_req_lio) != NULL) {
868 				int send_event = 0;
869 
870 				mutex_enter(&aiop->aio_mutex);
871 				ASSERT(liop->lio_refcnt > 0);
872 				if (--liop->lio_refcnt == 0) {
873 					if (liop->lio_port >= 0 &&
874 					    liop->lio_portkev) {
875 						liop->lio_port = -1;
876 						send_event = 1;
877 					}
878 				}
879 				mutex_exit(&aiop->aio_mutex);
880 				if (send_event)
881 					port_send_event(liop->lio_portkev);
882 			}
883 		}
884 	} while ((reqp = next) != cleanupq);
885 }
886 
887 /*
888  * Do cleanup for every element of the cleanupq.
889  */
890 static int
891 aio_cleanup_cleanupq(aio_t *aiop, aio_req_t *qhead, int exitflg)
892 {
893 	aio_req_t *reqp, *next;
894 	int signalled = 0;
895 
896 	ASSERT(MUTEX_HELD(&aiop->aio_cleanupq_mutex));
897 
898 	/*
899 	 * Since aio_req_done() or aio_req_find() use the HASH list to find
900 	 * the required requests, they could potentially take away elements
901 	 * if they are already done (AIO_DONEQ is set).
902 	 * The aio_cleanupq_mutex protects the queue for the duration of the
903 	 * loop from aio_req_done() and aio_req_find().
904 	 */
905 	if ((reqp = qhead) == NULL)
906 		return (0);
907 	do {
908 		ASSERT(reqp->aio_req_flags & AIO_CLEANUPQ);
909 		ASSERT(reqp->aio_req_portkev == NULL);
910 		next = reqp->aio_req_next;
911 		aphysio_unlock(reqp);
912 		mutex_enter(&aiop->aio_mutex);
913 		if (exitflg)
914 			aio_req_free(aiop, reqp);
915 		else
916 			aio_enq(&aiop->aio_doneq, reqp, AIO_DONEQ);
917 		if (!exitflg) {
918 			if (reqp->aio_req_flags & AIO_SIGNALLED)
919 				signalled++;
920 			else
921 				reqp->aio_req_flags |= AIO_SIGNALLED;
922 		}
923 		mutex_exit(&aiop->aio_mutex);
924 	} while ((reqp = next) != qhead);
925 	return (signalled);
926 }
927 
928 /*
929  * do cleanup for every element of the notify queue.
930  */
931 static int
932 aio_cleanup_notifyq(aio_t *aiop, aio_req_t *qhead, int exitflg)
933 {
934 	aio_req_t *reqp, *next;
935 	aio_lio_t *liohead;
936 	sigqueue_t *sigev, *lio_sigev = NULL;
937 	int signalled = 0;
938 
939 	if ((reqp = qhead) == NULL)
940 		return (0);
941 	do {
942 		ASSERT(reqp->aio_req_flags & AIO_NOTIFYQ);
943 		next = reqp->aio_req_next;
944 		aphysio_unlock(reqp);
945 		if (exitflg) {
946 			mutex_enter(&aiop->aio_mutex);
947 			aio_req_free(aiop, reqp);
948 			mutex_exit(&aiop->aio_mutex);
949 		} else {
950 			mutex_enter(&aiop->aio_mutex);
951 			aio_enq(&aiop->aio_doneq, reqp, AIO_DONEQ);
952 			sigev = reqp->aio_req_sigqp;
953 			reqp->aio_req_sigqp = NULL;
954 			if ((liohead = reqp->aio_req_lio) != NULL) {
955 				ASSERT(liohead->lio_refcnt > 0);
956 				if (--liohead->lio_refcnt == 0) {
957 					cv_signal(&liohead->lio_notify);
958 					lio_sigev = liohead->lio_sigqp;
959 					liohead->lio_sigqp = NULL;
960 				}
961 			}
962 			mutex_exit(&aiop->aio_mutex);
963 			if (sigev) {
964 				signalled++;
965 				aio_sigev_send(reqp->aio_req_buf.b_proc,
966 				    sigev);
967 			}
968 			if (lio_sigev) {
969 				signalled++;
970 				aio_sigev_send(reqp->aio_req_buf.b_proc,
971 				    lio_sigev);
972 			}
973 		}
974 	} while ((reqp = next) != qhead);
975 
976 	return (signalled);
977 }
978 
979 /*
980  * Do cleanup for every element of the poll queue.
981  */
982 static void
983 aio_cleanup_pollq(aio_t *aiop, aio_req_t *qhead, int exitflg)
984 {
985 	aio_req_t *reqp, *next;
986 
987 	/*
988 	 * As no other threads should be accessing the queue at this point,
989 	 * it isn't necessary to hold aio_mutex while we traverse its elements.
990 	 */
991 	if ((reqp = qhead) == NULL)
992 		return;
993 	do {
994 		ASSERT(reqp->aio_req_flags & AIO_POLLQ);
995 		next = reqp->aio_req_next;
996 		aphysio_unlock(reqp);
997 		if (exitflg) {
998 			mutex_enter(&aiop->aio_mutex);
999 			aio_req_free(aiop, reqp);
1000 			mutex_exit(&aiop->aio_mutex);
1001 		} else {
1002 			aio_copyout_result(reqp);
1003 			mutex_enter(&aiop->aio_mutex);
1004 			aio_enq(&aiop->aio_doneq, reqp, AIO_DONEQ);
1005 			mutex_exit(&aiop->aio_mutex);
1006 		}
1007 	} while ((reqp = next) != qhead);
1008 }
1009 
1010 /*
1011  * called by exit(). waits for all outstanding kaio to finish
1012  * before the kaio resources are freed.
1013  */
1014 void
1015 aio_cleanup_exit(void)
1016 {
1017 	proc_t *p = curproc;
1018 	aio_t *aiop = p->p_aio;
1019 	aio_req_t *reqp, *next, *head;
1020 	aio_lio_t *nxtlio, *liop;
1021 
1022 	/*
1023 	 * wait for all outstanding kaio to complete. process
1024 	 * is now single-threaded; no other kaio requests can
1025 	 * happen once aio_pending is zero.
1026 	 */
1027 	mutex_enter(&aiop->aio_mutex);
1028 	aiop->aio_flags |= AIO_CLEANUP;
1029 	while ((aiop->aio_pending != 0) || (aiop->aio_flags & AIO_DONE_ACTIVE))
1030 		cv_wait(&aiop->aio_cleanupcv, &aiop->aio_mutex);
1031 	mutex_exit(&aiop->aio_mutex);
1032 
1033 	/* cleanup the cleanup-thread queues. */
1034 	aio_cleanup(AIO_CLEANUP_EXIT);
1035 
1036 	/*
1037 	 * Although this process is now single-threaded, we
1038 	 * still need to protect ourselves against a race with
1039 	 * aio_cleanup_dr_delete_memory().
1040 	 */
1041 	mutex_enter(&p->p_lock);
1042 
1043 	/*
1044 	 * free up the done queue's resources.
1045 	 */
1046 	if ((head = aiop->aio_doneq) != NULL) {
1047 		aiop->aio_doneq = NULL;
1048 		reqp = head;
1049 		do {
1050 			next = reqp->aio_req_next;
1051 			aphysio_unlock(reqp);
1052 			kmem_free(reqp, sizeof (struct aio_req_t));
1053 		} while ((reqp = next) != head);
1054 	}
1055 	/*
1056 	 * release aio request freelist.
1057 	 */
1058 	for (reqp = aiop->aio_free; reqp != NULL; reqp = next) {
1059 		next = reqp->aio_req_next;
1060 		kmem_free(reqp, sizeof (struct aio_req_t));
1061 	}
1062 
1063 	/*
1064 	 * release io list head freelist.
1065 	 */
1066 	for (liop = aiop->aio_lio_free; liop != NULL; liop = nxtlio) {
1067 		nxtlio = liop->lio_next;
1068 		kmem_free(liop, sizeof (aio_lio_t));
1069 	}
1070 
1071 	if (aiop->aio_iocb)
1072 		kmem_free(aiop->aio_iocb, aiop->aio_iocbsz);
1073 
1074 	mutex_destroy(&aiop->aio_mutex);
1075 	mutex_destroy(&aiop->aio_portq_mutex);
1076 	mutex_destroy(&aiop->aio_cleanupq_mutex);
1077 	p->p_aio = NULL;
1078 	mutex_exit(&p->p_lock);
1079 	kmem_free(aiop, sizeof (struct aio));
1080 }
1081 
1082 /*
1083  * copy out aio request's result to a user-level result_t buffer.
1084  */
1085 void
1086 aio_copyout_result(aio_req_t *reqp)
1087 {
1088 	struct buf	*bp;
1089 	struct iovec	*iov;
1090 	void		*resultp;
1091 	int		error;
1092 	size_t		retval;
1093 
1094 	if (reqp->aio_req_flags & AIO_COPYOUTDONE)
1095 		return;
1096 
1097 	reqp->aio_req_flags |= AIO_COPYOUTDONE;
1098 
1099 	iov = reqp->aio_req_uio.uio_iov;
1100 	bp = &reqp->aio_req_buf;
1101 	/* "resultp" points to user-level result_t buffer */
1102 	resultp = (void *)reqp->aio_req_resultp;
1103 	if (bp->b_flags & B_ERROR) {
1104 		if (bp->b_error)
1105 			error = bp->b_error;
1106 		else
1107 			error = EIO;
1108 		retval = (size_t)-1;
1109 	} else {
1110 		error = 0;
1111 		retval = iov->iov_len - bp->b_resid;
1112 	}
1113 #ifdef	_SYSCALL32_IMPL
1114 	if (get_udatamodel() == DATAMODEL_NATIVE) {
1115 		(void) sulword(&((aio_result_t *)resultp)->aio_return, retval);
1116 		(void) suword32(&((aio_result_t *)resultp)->aio_errno, error);
1117 	} else {
1118 		(void) suword32(&((aio_result32_t *)resultp)->aio_return,
1119 		    (int)retval);
1120 		(void) suword32(&((aio_result32_t *)resultp)->aio_errno, error);
1121 	}
1122 #else
1123 	(void) suword32(&((aio_result_t *)resultp)->aio_return, retval);
1124 	(void) suword32(&((aio_result_t *)resultp)->aio_errno, error);
1125 #endif
1126 }
1127 
1128 
1129 void
1130 aio_copyout_result_port(struct iovec *iov, struct buf *bp, void *resultp)
1131 {
1132 	int errno;
1133 	size_t retval;
1134 
1135 	if (bp->b_flags & B_ERROR) {
1136 		if (bp->b_error)
1137 			errno = bp->b_error;
1138 		else
1139 			errno = EIO;
1140 		retval = (size_t)-1;
1141 	} else {
1142 		errno = 0;
1143 		retval = iov->iov_len - bp->b_resid;
1144 	}
1145 #ifdef	_SYSCALL32_IMPL
1146 	if (get_udatamodel() == DATAMODEL_NATIVE) {
1147 		(void) sulword(&((aio_result_t *)resultp)->aio_return, retval);
1148 		(void) suword32(&((aio_result_t *)resultp)->aio_errno, errno);
1149 	} else {
1150 		(void) suword32(&((aio_result32_t *)resultp)->aio_return,
1151 		    (int)retval);
1152 		(void) suword32(&((aio_result32_t *)resultp)->aio_errno, errno);
1153 	}
1154 #else
1155 	(void) suword32(&((aio_result_t *)resultp)->aio_return, retval);
1156 	(void) suword32(&((aio_result_t *)resultp)->aio_errno, errno);
1157 #endif
1158 }
1159 
1160 /*
1161  * This function is used to remove a request from the done queue.
1162  */
1163 
1164 void
1165 aio_req_remove_portq(aio_t *aiop, aio_req_t *reqp)
1166 {
1167 	ASSERT(MUTEX_HELD(&aiop->aio_portq_mutex));
1168 	while (aiop->aio_portq == NULL) {
1169 		/*
1170 		 * aio_portq is set to NULL when aio_cleanup_portq()
1171 		 * is working with the event queue.
1172 		 * The aio_cleanup_thread() uses aio_cleanup_portq()
1173 		 * to unlock all AIO buffers with completed transactions.
1174 		 * Wait here until aio_cleanup_portq() restores the
1175 		 * list of completed transactions in aio_portq.
1176 		 */
1177 		cv_wait(&aiop->aio_portcv, &aiop->aio_portq_mutex);
1178 	}
1179 	aio_deq(&aiop->aio_portq, reqp);
1180 }
1181 
1182 /* ARGSUSED */
1183 void
1184 aio_close_port(void *arg, int port, pid_t pid, int lastclose)
1185 {
1186 	aio_t		*aiop;
1187 	aio_req_t 	*reqp;
1188 	aio_req_t 	*next;
1189 	aio_req_t	*headp;
1190 	int		counter;
1191 
1192 	if (arg == NULL)
1193 		aiop = curproc->p_aio;
1194 	else
1195 		aiop = (aio_t *)arg;
1196 
1197 	/*
1198 	 * The PORT_SOURCE_AIO source is always associated with every new
1199 	 * created port by default.
1200 	 * If no asynchronous I/O transactions were associated with the port
1201 	 * then the aiop pointer will still be set to NULL.
1202 	 */
1203 	if (aiop == NULL)
1204 		return;
1205 
1206 	/*
1207 	 * Within a process event ports can be used to collect events other
1208 	 * than PORT_SOURCE_AIO events. At the same time the process can submit
1209 	 * asynchronous I/Os transactions which are not associated with the
1210 	 * current port.
1211 	 * The current process oriented model of AIO uses a sigle queue for
1212 	 * pending events. On close the pending queue (queue of asynchronous
1213 	 * I/O transactions using event port notification) must be scanned
1214 	 * to detect and handle pending I/Os using the current port.
1215 	 */
1216 	mutex_enter(&aiop->aio_portq_mutex);
1217 	mutex_enter(&aiop->aio_mutex);
1218 	counter = 0;
1219 	if ((headp = aiop->aio_portpending) != NULL) {
1220 		reqp = headp;
1221 		do {
1222 			if (reqp->aio_req_portkev &&
1223 			    reqp->aio_req_port == port) {
1224 				reqp->aio_req_flags |= AIO_CLOSE_PORT;
1225 				counter++;
1226 			}
1227 		} while ((reqp = reqp->aio_req_next) != headp);
1228 	}
1229 	if (counter == 0) {
1230 		/* no AIOs pending */
1231 		mutex_exit(&aiop->aio_mutex);
1232 		mutex_exit(&aiop->aio_portq_mutex);
1233 		return;
1234 	}
1235 	aiop->aio_portpendcnt += counter;
1236 	mutex_exit(&aiop->aio_mutex);
1237 	while (aiop->aio_portpendcnt)
1238 		cv_wait(&aiop->aio_portcv, &aiop->aio_portq_mutex);
1239 
1240 	/*
1241 	 * all pending AIOs are completed.
1242 	 * check port doneq
1243 	 */
1244 	headp = NULL;
1245 	if ((reqp = aiop->aio_portq) != NULL) {
1246 		do {
1247 			next = reqp->aio_req_next;
1248 			if (reqp->aio_req_port == port) {
1249 				/* dequeue request and discard event */
1250 				aio_req_remove_portq(aiop, reqp);
1251 				port_free_event(reqp->aio_req_portkev);
1252 				/* put request in temporary queue */
1253 				reqp->aio_req_next = headp;
1254 				headp = reqp;
1255 			}
1256 		} while ((reqp = next) != aiop->aio_portq);
1257 	}
1258 	mutex_exit(&aiop->aio_portq_mutex);
1259 
1260 	/* headp points to the list of requests to be discarded */
1261 	for (reqp = headp; reqp != NULL; reqp = next) {
1262 		next = reqp->aio_req_next;
1263 		aphysio_unlock(reqp);
1264 		mutex_enter(&aiop->aio_mutex);
1265 		aio_req_free_port(aiop, reqp);
1266 		mutex_exit(&aiop->aio_mutex);
1267 	}
1268 
1269 	if (aiop->aio_flags & AIO_CLEANUP)
1270 		cv_broadcast(&aiop->aio_waitcv);
1271 }
1272 
1273 /*
1274  * aio_cleanup_dr_delete_memory is used by dr's delete_memory_thread
1275  * to kick start the aio_cleanup_thread for the give process to do the
1276  * necessary cleanup.
1277  * This is needed so that delete_memory_thread can obtain writer locks
1278  * on pages that need to be relocated during a dr memory delete operation,
1279  * otherwise a deadly embrace may occur.
1280  */
1281 int
1282 aio_cleanup_dr_delete_memory(proc_t *procp)
1283 {
1284 	struct aio *aiop = procp->p_aio;
1285 	struct as *as = procp->p_as;
1286 	int ret = 0;
1287 
1288 	ASSERT(MUTEX_HELD(&procp->p_lock));
1289 
1290 	mutex_enter(&as->a_contents);
1291 
1292 	if (aiop != NULL) {
1293 		aiop->aio_rqclnup = 1;
1294 		cv_broadcast(&as->a_cv);
1295 		ret = 1;
1296 	}
1297 	mutex_exit(&as->a_contents);
1298 	return (ret);
1299 }
1300