xref: /illumos-gate/usr/src/uts/common/io/scsi/targets/sd_xbuf.c (revision 67d74cc3e7c9d9461311136a0b2069813a3fd927)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/scsi/scsi.h>
27 #include <sys/ddi.h>
28 #include <sys/sunddi.h>
29 #include <sys/thread.h>
30 #include <sys/var.h>
31 
32 #include "sd_xbuf.h"
33 
34 /*
35  * xbuf.c: buf(9s) extension facility.
36  *
37  * The buf(9S) extension facility is intended to allow block drivers to
38  * allocate additional memory that is associated with a particular buf(9S)
39  * struct.  It is further intended to help in addressing the usual set of
40  * problems associated with such allocations, in particular those involving
41  * recovery from allocation failures, especially in code paths that the
42  * system relies on to free memory.
43  *
44  * CAVEAT: Currently this code is completely private to the sd driver and in
45  * NO WAY constitutes a public or supported interface of any kind. It is
46  * envisioned that this may one day migrate into the Solaris DDI, but until
47  * that time this ought to be considered completely unstable and is subject
48  * to change without notice. This code may NOT in any way be utilized by
49  * ANY code outside the sd driver.
50  */
51 
52 
53 static int xbuf_iostart(ddi_xbuf_attr_t xap);
54 static void xbuf_dispatch(ddi_xbuf_attr_t xap);
55 static void xbuf_restart_callback(void *arg);
56 static int xbuf_brk_done(struct buf *bp);
57 
58 
59 /*
60  * Note: Should this be exposed to the caller.... do we want to give the
61  * caller the fexibility of specifying the parameters for the thread pool?
62  * Note: these values are just estimates at this time, based upon what
63  * seems reasonable for the sd driver. It may be preferable to make these
64  * parameters self-scaling in a real (future) implementation.
65  */
66 #define	XBUF_TQ_MINALLOC	64
67 #define	XBUF_TQ_MAXALLOC	512
68 #define	XBUF_DISPATCH_DELAY	(drv_usectohz(50000))	/* 50 msec */
69 
70 static taskq_t *xbuf_tq = NULL;
71 static int xbuf_attr_tq_minalloc = XBUF_TQ_MINALLOC;
72 static int xbuf_attr_tq_maxalloc = XBUF_TQ_MAXALLOC;
73 
74 static kmutex_t	xbuf_mutex = { 0 };
75 static uint32_t	xbuf_refcount = 0;
76 
77 /*
78  * Private wrapper for buf cloned via ddi_xbuf_qstrategy()
79  */
80 struct xbuf_brk {
81 	kmutex_t mutex;
82 	struct buf *bp0;
83 	uint8_t nbufs;	/* number of buf allocated */
84 	uint8_t active; /* number of active xfer */
85 
86 	size_t brksize;	/* break size used for this buf */
87 	int brkblk;
88 
89 	/* xfer position */
90 	off_t off;
91 	off_t noff;
92 	daddr_t blkno;
93 };
94 
95 _NOTE(DATA_READABLE_WITHOUT_LOCK(xbuf_brk::off))
96 
97 /*
98  * Hack needed in the prototype so buf breakup will work.
99  * Here we can rely on the sd code not changing the value in
100  * b_forw.
101  */
102 #define	b_clone_private b_forw
103 
104 
105 /* ARGSUSED */
106 DDII ddi_xbuf_attr_t
107 ddi_xbuf_attr_create(size_t xsize,
108     void (*xa_strategy)(struct buf *bp, ddi_xbuf_t xp, void *attr_arg),
109     void *attr_arg, uint32_t active_limit, uint32_t reserve_limit,
110     major_t major, int flags)
111 {
112 	ddi_xbuf_attr_t	xap;
113 
114 	xap = kmem_zalloc(sizeof (struct __ddi_xbuf_attr), KM_SLEEP);
115 
116 	mutex_init(&xap->xa_mutex, NULL, MUTEX_DRIVER, NULL);
117 	mutex_init(&xap->xa_reserve_mutex, NULL, MUTEX_DRIVER, NULL);
118 
119 	/* Future: Allow the caller to specify alignment requirements? */
120 	xap->xa_allocsize	= max(xsize, sizeof (void *));
121 	xap->xa_active_limit	= active_limit;
122 	xap->xa_active_lowater	= xap->xa_active_limit / 2;
123 	xap->xa_reserve_limit	= reserve_limit;
124 	xap->xa_strategy	= xa_strategy;
125 	xap->xa_attr_arg	= attr_arg;
126 
127 	mutex_enter(&xbuf_mutex);
128 	if (xbuf_refcount == 0) {
129 		ASSERT(xbuf_tq == NULL);
130 		/*
131 		 * Note: Would be nice if: (1) #threads in the taskq pool (set
132 		 * to the value of 'ncpus' at the time the taskq is created)
133 		 * could adjust automatically with DR; (2) the taskq
134 		 * minalloc/maxalloc counts could be grown/shrunk on the fly.
135 		 */
136 		xbuf_tq = taskq_create("xbuf_taskq", ncpus,
137 		    (v.v_maxsyspri - 2), xbuf_attr_tq_minalloc,
138 		    xbuf_attr_tq_maxalloc, TASKQ_PREPOPULATE);
139 	}
140 	xbuf_refcount++;
141 	mutex_exit(&xbuf_mutex);
142 
143 	/* In this prototype we just always use the global system pool. */
144 	xap->xa_tq = xbuf_tq;
145 
146 	return (xap);
147 }
148 
149 
150 DDII void
151 ddi_xbuf_attr_destroy(ddi_xbuf_attr_t xap)
152 {
153 	ddi_xbuf_t	xp;
154 
155 	mutex_destroy(&xap->xa_mutex);
156 	mutex_destroy(&xap->xa_reserve_mutex);
157 
158 	/* Free any xbufs on the reserve list */
159 	while (xap->xa_reserve_count != 0) {
160 		xp = xap->xa_reserve_headp;
161 		xap->xa_reserve_headp = *((void **)xp);
162 		xap->xa_reserve_count--;
163 		kmem_free(xp, xap->xa_allocsize);
164 	}
165 	ASSERT(xap->xa_reserve_headp == NULL);
166 
167 	mutex_enter(&xbuf_mutex);
168 	ASSERT((xbuf_refcount != 0) && (xbuf_tq != NULL));
169 	xbuf_refcount--;
170 	if (xbuf_refcount == 0) {
171 		taskq_destroy(xbuf_tq);
172 		xbuf_tq = NULL;
173 	}
174 	mutex_exit(&xbuf_mutex);
175 
176 	kmem_free(xap, sizeof (struct __ddi_xbuf_attr));
177 }
178 
179 
180 /* ARGSUSED */
181 DDII void
182 ddi_xbuf_attr_register_devinfo(ddi_xbuf_attr_t xbuf_attr, dev_info_t *dip)
183 {
184 	/* Currently a no-op in this prototype */
185 }
186 
187 
188 /* ARGSUSED */
189 DDII void
190 ddi_xbuf_attr_unregister_devinfo(ddi_xbuf_attr_t xbuf_attr, dev_info_t *dip)
191 {
192 	/* Currently a no-op in this prototype */
193 }
194 
195 DDII int
196 ddi_xbuf_attr_setup_brk(ddi_xbuf_attr_t xap, size_t size)
197 {
198 	if (size < DEV_BSIZE)
199 		return (0);
200 
201 	mutex_enter(&xap->xa_mutex);
202 	xap->xa_brksize = size & ~(DEV_BSIZE - 1);
203 	mutex_exit(&xap->xa_mutex);
204 	return (1);
205 }
206 
207 
208 
209 /*
210  * Enqueue the given buf and attempt to initiate IO.
211  * Called from the driver strategy(9E) routine.
212  */
213 
214 DDII int
215 ddi_xbuf_qstrategy(struct buf *bp, ddi_xbuf_attr_t xap)
216 {
217 	ASSERT(xap != NULL);
218 	ASSERT(!mutex_owned(&xap->xa_mutex));
219 	ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
220 
221 	mutex_enter(&xap->xa_mutex);
222 
223 	ASSERT((bp->b_bcount & (DEV_BSIZE - 1)) == 0);
224 
225 	/*
226 	 * Breakup buf if necessary. bp->b_private is temporarily
227 	 * used to save xbuf_brk
228 	 */
229 	if (xap->xa_brksize && bp->b_bcount > xap->xa_brksize) {
230 		struct xbuf_brk *brkp;
231 
232 		brkp = kmem_zalloc(sizeof (struct xbuf_brk), KM_SLEEP);
233 		_NOTE(NOW_INVISIBLE_TO_OTHER_THREADS(*brkp))
234 		mutex_init(&brkp->mutex, NULL, MUTEX_DRIVER, NULL);
235 		brkp->bp0 = bp;
236 		brkp->brksize = xap->xa_brksize;
237 		brkp->brkblk = btodt(xap->xa_brksize);
238 		brkp->noff = xap->xa_brksize;
239 		brkp->blkno = bp->b_blkno;
240 		_NOTE(NOW_VISIBLE_TO_OTHER_THREADS(*brkp))
241 		bp->b_private = brkp;
242 	} else {
243 		bp->b_private = NULL;
244 	}
245 
246 	/* Enqueue buf */
247 	if (xap->xa_headp == NULL) {
248 		xap->xa_headp = xap->xa_tailp = bp;
249 	} else {
250 		xap->xa_tailp->av_forw = bp;
251 		xap->xa_tailp = bp;
252 	}
253 	bp->av_forw = NULL;
254 
255 	xap->xa_pending++;
256 	mutex_exit(&xap->xa_mutex);
257 	return (xbuf_iostart(xap));
258 }
259 
260 
261 /*
262  * Drivers call this immediately before calling biodone(9F), to notify the
263  * framework that the indicated xbuf is no longer being used by the driver.
264  * May be called under interrupt context.
265  */
266 
267 DDII int
268 ddi_xbuf_done(struct buf *bp, ddi_xbuf_attr_t xap)
269 {
270 	ddi_xbuf_t xp;
271 	int done;
272 
273 	ASSERT(bp != NULL);
274 	ASSERT(xap != NULL);
275 	ASSERT(!mutex_owned(&xap->xa_mutex));
276 	ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
277 
278 	xp = ddi_xbuf_get(bp, xap);
279 
280 	mutex_enter(&xap->xa_mutex);
281 
282 #ifdef	SDDEBUG
283 	if (xap->xa_active_limit != 0) {
284 		ASSERT(xap->xa_active_count > 0);
285 	}
286 #endif
287 	xap->xa_active_count--;
288 
289 	if (xap->xa_reserve_limit != 0) {
290 		mutex_enter(&xap->xa_reserve_mutex);
291 		if (xap->xa_reserve_count < xap->xa_reserve_limit) {
292 			/* Put this xbuf onto the reserve list & exit */
293 			*((void **)xp) = xap->xa_reserve_headp;
294 			xap->xa_reserve_headp = xp;
295 			xap->xa_reserve_count++;
296 			mutex_exit(&xap->xa_reserve_mutex);
297 			goto done;
298 		}
299 		mutex_exit(&xap->xa_reserve_mutex);
300 	}
301 
302 	kmem_free(xp, xap->xa_allocsize);	/* return it to the system */
303 
304 done:
305 	if (bp->b_iodone == xbuf_brk_done) {
306 		struct xbuf_brk *brkp = (struct xbuf_brk *)bp->b_clone_private;
307 
308 		brkp->active--;
309 		if (brkp->active || xap->xa_headp == brkp->bp0) {
310 			done = 0;
311 		} else {
312 			brkp->off = -1;	/* mark bp0 as completed */
313 			done = 1;
314 		}
315 	} else {
316 		done = 1;
317 	}
318 
319 	if ((xap->xa_active_limit == 0) ||
320 	    (xap->xa_active_count <= xap->xa_active_lowater)) {
321 		xbuf_dispatch(xap);
322 	}
323 
324 	mutex_exit(&xap->xa_mutex);
325 	return (done);
326 }
327 
328 static int
329 xbuf_brk_done(struct buf *bp)
330 {
331 	struct xbuf_brk *brkp = (struct xbuf_brk *)bp->b_clone_private;
332 	struct buf *bp0 = brkp->bp0;
333 	int done;
334 
335 	mutex_enter(&brkp->mutex);
336 	if (bp->b_flags & B_ERROR && !(bp0->b_flags & B_ERROR)) {
337 		bp0->b_flags |= B_ERROR;
338 		bp0->b_error = bp->b_error;
339 	}
340 	if (bp->b_resid)
341 		bp0->b_resid = bp0->b_bcount;
342 
343 	freerbuf(bp);
344 	brkp->nbufs--;
345 
346 	done = (brkp->off == -1 && brkp->nbufs == 0);
347 	mutex_exit(&brkp->mutex);
348 
349 	/* All buf segments done */
350 	if (done) {
351 		mutex_destroy(&brkp->mutex);
352 		kmem_free(brkp, sizeof (struct xbuf_brk));
353 		biodone(bp0);
354 	}
355 	return (0);
356 }
357 
358 DDII void
359 ddi_xbuf_dispatch(ddi_xbuf_attr_t xap)
360 {
361 	mutex_enter(&xap->xa_mutex);
362 	if ((xap->xa_active_limit == 0) ||
363 	    (xap->xa_active_count <= xap->xa_active_lowater)) {
364 		xbuf_dispatch(xap);
365 	}
366 	mutex_exit(&xap->xa_mutex);
367 }
368 
369 
370 /*
371  * ISSUE: in this prototype we cannot really implement ddi_xbuf_get()
372  * unless we explicitly hide the xbuf pointer somewhere in the buf
373  * during allocation, and then rely on the driver never changing it.
374  * We can probably get away with using b_private for this for now,
375  * tho it really is kinda gnarly.....
376  */
377 
378 /* ARGSUSED */
379 DDII ddi_xbuf_t
380 ddi_xbuf_get(struct buf *bp, ddi_xbuf_attr_t xap)
381 {
382 	return (bp->b_private);
383 }
384 
385 
386 /*
387  * Initiate IOs for bufs on the queue.  Called from kernel thread or taskq
388  * thread context. May execute concurrently for the same ddi_xbuf_attr_t.
389  */
390 
391 static int
392 xbuf_iostart(ddi_xbuf_attr_t xap)
393 {
394 	struct buf *bp;
395 	ddi_xbuf_t xp;
396 
397 	ASSERT(xap != NULL);
398 	ASSERT(!mutex_owned(&xap->xa_mutex));
399 	ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
400 
401 	/*
402 	 * For each request on the queue, attempt to allocate the specified
403 	 * xbuf extension area, and call the driver's iostart() routine.
404 	 * We process as many requests on the queue as we can, until either
405 	 * (1) we run out of requests; or
406 	 * (2) we run out of resources; or
407 	 * (3) we reach the maximum limit for the given ddi_xbuf_attr_t.
408 	 */
409 	for (;;) {
410 		mutex_enter(&xap->xa_mutex);
411 
412 		if ((bp = xap->xa_headp) == NULL) {
413 			break;	/* queue empty */
414 		}
415 
416 		if ((xap->xa_active_limit != 0) &&
417 		    (xap->xa_active_count >= xap->xa_active_limit)) {
418 			break;	/* allocation limit reached */
419 		}
420 
421 		/*
422 		 * If the reserve_limit is non-zero then work with the
423 		 * reserve else always allocate a new struct.
424 		 */
425 		if (xap->xa_reserve_limit != 0) {
426 			/*
427 			 * Don't penalize EVERY I/O by always allocating a new
428 			 * struct. for the sake of maintaining and not touching
429 			 * a reserve for a pathalogical condition that may never
430 			 * happen. Use the reserve entries first, this uses it
431 			 * like a local pool rather than a reserve that goes
432 			 * untouched. Make sure it's re-populated whenever it
433 			 * gets fully depleted just in case it really is needed.
434 			 * This is safe because under the pathalogical
435 			 * condition, when the system runs out of memory such
436 			 * that the below allocs fail, the reserve will still
437 			 * be available whether the entries are saved away on
438 			 * the queue unused or in-transport somewhere. Thus
439 			 * progress can still continue, however slowly.
440 			 */
441 			mutex_enter(&xap->xa_reserve_mutex);
442 			if (xap->xa_reserve_count != 0) {
443 				ASSERT(xap->xa_reserve_headp != NULL);
444 				/* Grab an xbuf from the reserve */
445 				xp = xap->xa_reserve_headp;
446 				xap->xa_reserve_headp = *((void **)xp);
447 				ASSERT(xap->xa_reserve_count > 0);
448 				xap->xa_reserve_count--;
449 			} else {
450 				/*
451 				 * Either this is the first time through,
452 				 * or the reserve has been totally depleted.
453 				 * Re-populate the reserve (pool). Excess
454 				 * structs. get released in the done path.
455 				 */
456 				while (xap->xa_reserve_count <
457 				    xap->xa_reserve_limit) {
458 					xp = kmem_alloc(xap->xa_allocsize,
459 					    KM_NOSLEEP);
460 					if (xp == NULL) {
461 						break;
462 					}
463 					*((void **)xp) = xap->xa_reserve_headp;
464 					xap->xa_reserve_headp = xp;
465 					xap->xa_reserve_count++;
466 				}
467 				/* And one more to use right now. */
468 				xp = kmem_alloc(xap->xa_allocsize, KM_NOSLEEP);
469 			}
470 			mutex_exit(&xap->xa_reserve_mutex);
471 		} else {
472 			/*
473 			 * Try to alloc a new xbuf struct. If this fails just
474 			 * exit for now. We'll get back here again either upon
475 			 * cmd completion or via the timer handler.
476 			 * Question: what if the allocation attempt for the very
477 			 * first cmd. fails? There are no outstanding cmds so
478 			 * how do we get back here?
479 			 * Should look at un_ncmds_in_transport, if it's zero
480 			 * then schedule xbuf_restart_callback via the timer.
481 			 * Athough that breaks the architecture by bringing
482 			 * softstate data into this code.
483 			 */
484 			xp = kmem_alloc(xap->xa_allocsize, KM_NOSLEEP);
485 		}
486 		if (xp == NULL) {
487 			break; /* Can't process a cmd. right now. */
488 		}
489 
490 		/*
491 		 * Always run the counter. It's used/needed when xa_active_limit
492 		 * is non-zero which is the typical (and right now only) case.
493 		 */
494 		xap->xa_active_count++;
495 
496 		if (bp->b_private) {
497 			struct xbuf_brk *brkp = bp->b_private;
498 			struct buf *bp0 = bp;
499 
500 			brkp->active++;
501 
502 			mutex_enter(&brkp->mutex);
503 			brkp->nbufs++;
504 			mutex_exit(&brkp->mutex);
505 
506 			if (brkp->noff < bp0->b_bcount) {
507 				bp = bioclone(bp0, brkp->off, brkp->brksize,
508 				    bp0->b_edev, brkp->blkno, xbuf_brk_done,
509 				    NULL, KM_SLEEP);
510 
511 				/* update xfer position */
512 				brkp->off = brkp->noff;
513 				brkp->noff += brkp->brksize;
514 				brkp->blkno += brkp->brkblk;
515 			} else {
516 				bp = bioclone(bp0, brkp->off,
517 				    bp0->b_bcount - brkp->off, bp0->b_edev,
518 				    brkp->blkno, xbuf_brk_done, NULL, KM_SLEEP);
519 
520 				/* unlink the buf from the list */
521 				xap->xa_headp = bp0->av_forw;
522 				bp0->av_forw = NULL;
523 			}
524 			bp->b_clone_private = (struct buf *)brkp;
525 		} else {
526 			/* unlink the buf from the list */
527 			xap->xa_headp = bp->av_forw;
528 			bp->av_forw = NULL;
529 		}
530 
531 		/*
532 		 * Hack needed in the prototype so ddi_xbuf_get() will work.
533 		 * Here we can rely on the sd code not changing the value in
534 		 * b_private (in fact it wants it there). See ddi_get_xbuf()
535 		 */
536 		bp->b_private = xp;
537 
538 		/* call the driver's iostart routine */
539 		mutex_exit(&xap->xa_mutex);
540 		(*(xap->xa_strategy))(bp, xp, xap->xa_attr_arg);
541 	}
542 
543 	ASSERT(xap->xa_pending > 0);
544 	xap->xa_pending--;
545 	mutex_exit(&xap->xa_mutex);
546 	return (0);
547 }
548 
549 static void
550 xbuf_taskq_cb(void *arg)
551 {
552 	(void) xbuf_iostart(arg);
553 }
554 
555 /*
556  * Re-start IO processing if there is anything on the queue, AND if the
557  * restart function is not already running/pending for this ddi_xbuf_attr_t
558  */
559 static void
560 xbuf_dispatch(ddi_xbuf_attr_t xap)
561 {
562 	ASSERT(xap != NULL);
563 	ASSERT(xap->xa_tq != NULL);
564 	ASSERT(mutex_owned(&xap->xa_mutex));
565 
566 	if ((xap->xa_headp != NULL) && (xap->xa_timeid == NULL) &&
567 	    (xap->xa_pending == 0)) {
568 		/*
569 		 * First try to see if we can dispatch the restart function
570 		 * immediately, in a taskq thread.  If this fails, then
571 		 * schedule a timeout(9F) callback to try again later.
572 		 */
573 		if (taskq_dispatch(xap->xa_tq,
574 		    xbuf_taskq_cb, xap, KM_NOSLEEP) == TASKQID_INVALID) {
575 			/*
576 			 * Unable to enqueue the request for the taskq thread,
577 			 * try again later.  Note that this will keep re-trying
578 			 * until taskq_dispatch() succeeds.
579 			 */
580 			xap->xa_timeid = timeout(xbuf_restart_callback, xap,
581 			    XBUF_DISPATCH_DELAY);
582 		} else {
583 			/*
584 			 * This indicates that xbuf_iostart() will soon be
585 			 * run for this ddi_xbuf_attr_t, and we do not need to
586 			 * schedule another invocation via timeout/taskq
587 			 */
588 			xap->xa_pending++;
589 		}
590 	}
591 }
592 
593 /* timeout(9F) callback routine for xbuf restart mechanism. */
594 static void
595 xbuf_restart_callback(void *arg)
596 {
597 	ddi_xbuf_attr_t	xap = arg;
598 
599 	ASSERT(xap != NULL);
600 	ASSERT(xap->xa_tq != NULL);
601 	ASSERT(!mutex_owned(&xap->xa_mutex));
602 
603 	mutex_enter(&xap->xa_mutex);
604 	xap->xa_timeid = NULL;
605 	xbuf_dispatch(xap);
606 	mutex_exit(&xap->xa_mutex);
607 }
608 
609 
610 DDII void
611 ddi_xbuf_flushq(ddi_xbuf_attr_t xap, int (*funcp)(struct buf *))
612 {
613 	struct buf *bp;
614 	struct buf *next_bp;
615 	struct buf *prev_bp = NULL;
616 
617 	ASSERT(xap != NULL);
618 	ASSERT(xap->xa_tq != NULL);
619 	ASSERT(!mutex_owned(&xap->xa_mutex));
620 
621 	mutex_enter(&xap->xa_mutex);
622 
623 	for (bp = xap->xa_headp; bp != NULL; bp = next_bp) {
624 
625 		next_bp = bp->av_forw;	/* Save for next iteration */
626 
627 		/*
628 		 * If the user-supplied function is non-NULL and returns
629 		 * FALSE, then just leave the current bp on the queue.
630 		 */
631 		if ((funcp != NULL) && (!(*funcp)(bp))) {
632 			prev_bp = bp;
633 			continue;
634 		}
635 
636 		/* de-queue the bp */
637 		if (bp == xap->xa_headp) {
638 			xap->xa_headp = next_bp;
639 			if (xap->xa_headp == NULL) {
640 				xap->xa_tailp = NULL;
641 			}
642 		} else {
643 			ASSERT(xap->xa_headp != NULL);
644 			ASSERT(prev_bp != NULL);
645 			if (bp == xap->xa_tailp) {
646 				ASSERT(next_bp == NULL);
647 				xap->xa_tailp = prev_bp;
648 			}
649 			prev_bp->av_forw = next_bp;
650 		}
651 		bp->av_forw = NULL;
652 
653 		/* Add the bp to the flush queue */
654 		if (xap->xa_flush_headp == NULL) {
655 			ASSERT(xap->xa_flush_tailp == NULL);
656 			xap->xa_flush_headp = xap->xa_flush_tailp = bp;
657 		} else {
658 			ASSERT(xap->xa_flush_tailp != NULL);
659 			xap->xa_flush_tailp->av_forw = bp;
660 			xap->xa_flush_tailp = bp;
661 		}
662 	}
663 
664 	while ((bp = xap->xa_flush_headp) != NULL) {
665 		xap->xa_flush_headp = bp->av_forw;
666 		if (xap->xa_flush_headp == NULL) {
667 			xap->xa_flush_tailp = NULL;
668 		}
669 		mutex_exit(&xap->xa_mutex);
670 		bioerror(bp, EIO);
671 		bp->b_resid = bp->b_bcount;
672 		biodone(bp);
673 		mutex_enter(&xap->xa_mutex);
674 	}
675 
676 	mutex_exit(&xap->xa_mutex);
677 }
678