1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/scsi/scsi.h>
27 #include <sys/ddi.h>
28 #include <sys/sunddi.h>
29 #include <sys/thread.h>
30 #include <sys/var.h>
31 
32 #include "sd_xbuf.h"
33 
34 /*
35  * xbuf.c: buf(9s) extension facility.
36  *
37  * The buf(9S) extension facility is intended to allow block drivers to
38  * allocate additional memory that is associated with a particular buf(9S)
39  * struct.  It is further intended to help in addressing the usual set of
40  * problems associated with such allocations, in particular those involving
41  * recovery from allocation failures, especially in code paths that the
42  * system relies on to free memory.
43  *
44  * CAVEAT: Currently this code is completely private to the sd driver and in
45  * NO WAY constitutes a public or supported interface of any kind. It is
46  * envisioned that this may one day migrate into the Solaris DDI, but until
47  * that time this ought to be considered completely unstable and is subject
48  * to change without notice. This code may NOT in any way be utilized by
49  * ANY code outside the sd driver.
50  */
51 
52 
53 static int xbuf_iostart(ddi_xbuf_attr_t xap);
54 static void xbuf_dispatch(ddi_xbuf_attr_t xap);
55 static void xbuf_restart_callback(void *arg);
56 static void xbuf_enqueue(struct buf *bp, ddi_xbuf_attr_t xap);
57 static int xbuf_brk_done(struct buf *bp);
58 
59 
60 /*
61  * Note: Should this be exposed to the caller.... do we want to give the
62  * caller the fexibility of specifying the parameters for the thread pool?
63  * Note: these values are just estimates at this time, based upon what
64  * seems reasonable for the sd driver. It may be preferable to make these
65  * parameters self-scaling in a real (future) implementation.
66  */
67 #define	XBUF_TQ_MINALLOC	64
68 #define	XBUF_TQ_MAXALLOC	512
69 #define	XBUF_DISPATCH_DELAY	(drv_usectohz(50000))	/* 50 msec */
70 
71 static taskq_t *xbuf_tq = NULL;
72 static int xbuf_attr_tq_minalloc = XBUF_TQ_MINALLOC;
73 static int xbuf_attr_tq_maxalloc = XBUF_TQ_MAXALLOC;
74 
75 static kmutex_t	xbuf_mutex = { 0 };
76 static uint32_t	xbuf_refcount = 0;
77 
78 /*
79  * Private wrapper for buf cloned via ddi_xbuf_qstrategy()
80  */
81 struct xbuf_brk {
82 	kmutex_t mutex;
83 	struct buf *bp0;
84 	uint8_t nbufs;	/* number of buf allocated */
85 	uint8_t active; /* number of active xfer */
86 
87 	size_t brksize;	/* break size used for this buf */
88 	int brkblk;
89 
90 	/* xfer position */
91 	off_t off;
92 	off_t noff;
93 	daddr_t blkno;
94 };
95 
96 _NOTE(DATA_READABLE_WITHOUT_LOCK(xbuf_brk::off))
97 
98 /*
99  * Hack needed in the prototype so buf breakup will work.
100  * Here we can rely on the sd code not changing the value in
101  * b_forw.
102  */
103 #define	b_clone_private b_forw
104 
105 
106 /* ARGSUSED */
107 DDII ddi_xbuf_attr_t
108 ddi_xbuf_attr_create(size_t xsize,
109 	void (*xa_strategy)(struct buf *bp, ddi_xbuf_t xp, void *attr_arg),
110 	void *attr_arg, uint32_t active_limit, uint32_t reserve_limit,
111 	major_t major, int flags)
112 {
113 	ddi_xbuf_attr_t	xap;
114 
115 	xap = kmem_zalloc(sizeof (struct __ddi_xbuf_attr), KM_SLEEP);
116 
117 	mutex_init(&xap->xa_mutex, NULL, MUTEX_DRIVER, NULL);
118 	mutex_init(&xap->xa_reserve_mutex, NULL, MUTEX_DRIVER, NULL);
119 
120 	/* Future: Allow the caller to specify alignment requirements? */
121 	xap->xa_allocsize	= max(xsize, sizeof (void *));
122 	xap->xa_active_limit	= active_limit;
123 	xap->xa_active_lowater	= xap->xa_active_limit / 2;
124 	xap->xa_reserve_limit	= reserve_limit;
125 	xap->xa_strategy	= xa_strategy;
126 	xap->xa_attr_arg	= attr_arg;
127 
128 	mutex_enter(&xbuf_mutex);
129 	if (xbuf_refcount == 0) {
130 		ASSERT(xbuf_tq == NULL);
131 		/*
132 		 * Note: Would be nice if: (1) #threads in the taskq pool (set
133 		 * to the value of 'ncpus' at the time the taskq is created)
134 		 * could adjust automatically with DR; (2) the taskq
135 		 * minalloc/maxalloc counts could be grown/shrunk on the fly.
136 		 */
137 		xbuf_tq = taskq_create("xbuf_taskq", ncpus,
138 		    (v.v_maxsyspri - 2), xbuf_attr_tq_minalloc,
139 		    xbuf_attr_tq_maxalloc, TASKQ_PREPOPULATE);
140 	}
141 	xbuf_refcount++;
142 	mutex_exit(&xbuf_mutex);
143 
144 	/* In this prototype we just always use the global system pool. */
145 	xap->xa_tq = xbuf_tq;
146 
147 	return (xap);
148 }
149 
150 
151 DDII void
152 ddi_xbuf_attr_destroy(ddi_xbuf_attr_t xap)
153 {
154 	ddi_xbuf_t	xp;
155 
156 	mutex_destroy(&xap->xa_mutex);
157 	mutex_destroy(&xap->xa_reserve_mutex);
158 
159 	/* Free any xbufs on the reserve list */
160 	while (xap->xa_reserve_count != 0) {
161 		xp = xap->xa_reserve_headp;
162 		xap->xa_reserve_headp = *((void **)xp);
163 		xap->xa_reserve_count--;
164 		kmem_free(xp, xap->xa_allocsize);
165 	}
166 	ASSERT(xap->xa_reserve_headp == NULL);
167 
168 	mutex_enter(&xbuf_mutex);
169 	ASSERT((xbuf_refcount != 0) && (xbuf_tq != NULL));
170 	xbuf_refcount--;
171 	if (xbuf_refcount == 0) {
172 		taskq_destroy(xbuf_tq);
173 		xbuf_tq = NULL;
174 	}
175 	mutex_exit(&xbuf_mutex);
176 
177 	kmem_free(xap, sizeof (struct __ddi_xbuf_attr));
178 }
179 
180 
181 /* ARGSUSED */
182 DDII void
183 ddi_xbuf_attr_register_devinfo(ddi_xbuf_attr_t xbuf_attr, dev_info_t *dip)
184 {
185 	/* Currently a no-op in this prototype */
186 }
187 
188 
189 /* ARGSUSED */
190 DDII void
191 ddi_xbuf_attr_unregister_devinfo(ddi_xbuf_attr_t xbuf_attr, dev_info_t *dip)
192 {
193 	/* Currently a no-op in this prototype */
194 }
195 
196 DDII int
197 ddi_xbuf_attr_setup_brk(ddi_xbuf_attr_t xap, size_t size)
198 {
199 	if (size < DEV_BSIZE)
200 		return (0);
201 
202 	mutex_enter(&xap->xa_mutex);
203 	xap->xa_brksize = size & ~(DEV_BSIZE - 1);
204 	mutex_exit(&xap->xa_mutex);
205 	return (1);
206 }
207 
208 
209 
210 /*
211  * Enqueue the given buf and attempt to initiate IO.
212  * Called from the driver strategy(9E) routine.
213  */
214 
215 DDII int
216 ddi_xbuf_qstrategy(struct buf *bp, ddi_xbuf_attr_t xap)
217 {
218 	ASSERT(xap != NULL);
219 	ASSERT(!mutex_owned(&xap->xa_mutex));
220 	ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
221 
222 	mutex_enter(&xap->xa_mutex);
223 
224 	ASSERT((bp->b_bcount & (DEV_BSIZE - 1)) == 0);
225 
226 	/*
227 	 * Breakup buf if necessary. bp->b_private is temporarily
228 	 * used to save xbuf_brk
229 	 */
230 	if (xap->xa_brksize && bp->b_bcount > xap->xa_brksize) {
231 		struct xbuf_brk *brkp;
232 
233 		brkp = kmem_zalloc(sizeof (struct xbuf_brk), KM_SLEEP);
234 		_NOTE(NOW_INVISIBLE_TO_OTHER_THREADS(*brkp))
235 		mutex_init(&brkp->mutex, NULL, MUTEX_DRIVER, NULL);
236 		brkp->bp0 = bp;
237 		brkp->brksize = xap->xa_brksize;
238 		brkp->brkblk = btodt(xap->xa_brksize);
239 		brkp->noff = xap->xa_brksize;
240 		brkp->blkno = bp->b_blkno;
241 		_NOTE(NOW_VISIBLE_TO_OTHER_THREADS(*brkp))
242 		bp->b_private = brkp;
243 	} else {
244 		bp->b_private = NULL;
245 	}
246 
247 	/* Enqueue buf */
248 	if (xap->xa_headp == NULL) {
249 		xap->xa_headp = xap->xa_tailp = bp;
250 	} else {
251 		xap->xa_tailp->av_forw = bp;
252 		xap->xa_tailp = bp;
253 	}
254 	bp->av_forw = NULL;
255 
256 	xap->xa_pending++;
257 	mutex_exit(&xap->xa_mutex);
258 	return (xbuf_iostart(xap));
259 }
260 
261 
262 /*
263  * Drivers call this immediately before calling biodone(9F), to notify the
264  * framework that the indicated xbuf is no longer being used by the driver.
265  * May be called under interrupt context.
266  */
267 
268 DDII int
269 ddi_xbuf_done(struct buf *bp, ddi_xbuf_attr_t xap)
270 {
271 	ddi_xbuf_t xp;
272 	int done;
273 
274 	ASSERT(bp != NULL);
275 	ASSERT(xap != NULL);
276 	ASSERT(!mutex_owned(&xap->xa_mutex));
277 	ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
278 
279 	xp = ddi_xbuf_get(bp, xap);
280 
281 	mutex_enter(&xap->xa_mutex);
282 
283 #ifdef	SDDEBUG
284 	if (xap->xa_active_limit != 0) {
285 		ASSERT(xap->xa_active_count > 0);
286 	}
287 #endif
288 	xap->xa_active_count--;
289 
290 	if (xap->xa_reserve_limit != 0) {
291 		mutex_enter(&xap->xa_reserve_mutex);
292 		if (xap->xa_reserve_count < xap->xa_reserve_limit) {
293 			/* Put this xbuf onto the reserve list & exit */
294 			*((void **)xp) = xap->xa_reserve_headp;
295 			xap->xa_reserve_headp = xp;
296 			xap->xa_reserve_count++;
297 			mutex_exit(&xap->xa_reserve_mutex);
298 			goto done;
299 		}
300 		mutex_exit(&xap->xa_reserve_mutex);
301 	}
302 
303 	kmem_free(xp, xap->xa_allocsize);	/* return it to the system */
304 
305 done:
306 	if (bp->b_iodone == xbuf_brk_done) {
307 		struct xbuf_brk *brkp = (struct xbuf_brk *)bp->b_clone_private;
308 
309 		brkp->active--;
310 		if (brkp->active || xap->xa_headp == brkp->bp0) {
311 			done = 0;
312 		} else {
313 			brkp->off = -1;	/* mark bp0 as completed */
314 			done = 1;
315 		}
316 	} else {
317 		done = 1;
318 	}
319 
320 	if ((xap->xa_active_limit == 0) ||
321 	    (xap->xa_active_count <= xap->xa_active_lowater)) {
322 		xbuf_dispatch(xap);
323 	}
324 
325 	mutex_exit(&xap->xa_mutex);
326 	return (done);
327 }
328 
329 static int
330 xbuf_brk_done(struct buf *bp)
331 {
332 	struct xbuf_brk *brkp = (struct xbuf_brk *)bp->b_clone_private;
333 	struct buf *bp0 = brkp->bp0;
334 	int done;
335 
336 	mutex_enter(&brkp->mutex);
337 	if (bp->b_flags & B_ERROR && !(bp0->b_flags & B_ERROR)) {
338 		bp0->b_flags |= B_ERROR;
339 		bp0->b_error = bp->b_error;
340 	}
341 	if (bp->b_resid)
342 		bp0->b_resid = bp0->b_bcount;
343 
344 	freerbuf(bp);
345 	brkp->nbufs--;
346 
347 	done = (brkp->off == -1 && brkp->nbufs == 0);
348 	mutex_exit(&brkp->mutex);
349 
350 	/* All buf segments done */
351 	if (done) {
352 		mutex_destroy(&brkp->mutex);
353 		kmem_free(brkp, sizeof (struct xbuf_brk));
354 		biodone(bp0);
355 	}
356 	return (0);
357 }
358 
359 DDII void
360 ddi_xbuf_dispatch(ddi_xbuf_attr_t xap)
361 {
362 	mutex_enter(&xap->xa_mutex);
363 	if ((xap->xa_active_limit == 0) ||
364 	    (xap->xa_active_count <= xap->xa_active_lowater)) {
365 		xbuf_dispatch(xap);
366 	}
367 	mutex_exit(&xap->xa_mutex);
368 }
369 
370 
371 /*
372  * ISSUE: in this prototype we cannot really implement ddi_xbuf_get()
373  * unless we explicitly hide the xbuf pointer somewhere in the buf
374  * during allocation, and then rely on the driver never changing it.
375  * We can probably get away with using b_private for this for now,
376  * tho it really is kinda gnarly.....
377  */
378 
379 /* ARGSUSED */
380 DDII ddi_xbuf_t
381 ddi_xbuf_get(struct buf *bp, ddi_xbuf_attr_t xap)
382 {
383 	return (bp->b_private);
384 }
385 
386 
387 /*
388  * Initiate IOs for bufs on the queue.  Called from kernel thread or taskq
389  * thread context. May execute concurrently for the same ddi_xbuf_attr_t.
390  */
391 
392 static int
393 xbuf_iostart(ddi_xbuf_attr_t xap)
394 {
395 	struct buf *bp;
396 	ddi_xbuf_t xp;
397 
398 	ASSERT(xap != NULL);
399 	ASSERT(!mutex_owned(&xap->xa_mutex));
400 	ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
401 
402 	/*
403 	 * For each request on the queue, attempt to allocate the specified
404 	 * xbuf extension area, and call the driver's iostart() routine.
405 	 * We process as many requests on the queue as we can, until either
406 	 * (1) we run out of requests; or
407 	 * (2) we run out of resources; or
408 	 * (3) we reach the maximum limit for the given ddi_xbuf_attr_t.
409 	 */
410 	for (;;) {
411 		mutex_enter(&xap->xa_mutex);
412 
413 		if ((bp = xap->xa_headp) == NULL) {
414 			break;	/* queue empty */
415 		}
416 
417 		if ((xap->xa_active_limit != 0) &&
418 		    (xap->xa_active_count >= xap->xa_active_limit)) {
419 			break;	/* allocation limit reached */
420 		}
421 
422 		/*
423 		 * If the reserve_limit is non-zero then work with the
424 		 * reserve else always allocate a new struct.
425 		 */
426 		if (xap->xa_reserve_limit != 0) {
427 			/*
428 			 * Don't penalize EVERY I/O by always allocating a new
429 			 * struct. for the sake of maintaining and not touching
430 			 * a reserve for a pathalogical condition that may never
431 			 * happen. Use the reserve entries first, this uses it
432 			 * like a local pool rather than a reserve that goes
433 			 * untouched. Make sure it's re-populated whenever it
434 			 * gets fully depleted just in case it really is needed.
435 			 * This is safe because under the pathalogical
436 			 * condition, when the system runs out of memory such
437 			 * that the below allocs fail, the reserve will still
438 			 * be available whether the entries are saved away on
439 			 * the queue unused or in-transport somewhere. Thus
440 			 * progress can still continue, however slowly.
441 			 */
442 			mutex_enter(&xap->xa_reserve_mutex);
443 			if (xap->xa_reserve_count != 0) {
444 				ASSERT(xap->xa_reserve_headp != NULL);
445 				/* Grab an xbuf from the reserve */
446 				xp = xap->xa_reserve_headp;
447 				xap->xa_reserve_headp = *((void **)xp);
448 				ASSERT(xap->xa_reserve_count > 0);
449 				xap->xa_reserve_count--;
450 			} else {
451 				/*
452 				 * Either this is the first time through,
453 				 * or the reserve has been totally depleted.
454 				 * Re-populate the reserve (pool). Excess
455 				 * structs. get released in the done path.
456 				 */
457 				while (xap->xa_reserve_count <
458 				    xap->xa_reserve_limit) {
459 					xp = kmem_alloc(xap->xa_allocsize,
460 					    KM_NOSLEEP);
461 					if (xp == NULL) {
462 						break;
463 					}
464 					*((void **)xp) = xap->xa_reserve_headp;
465 					xap->xa_reserve_headp = xp;
466 					xap->xa_reserve_count++;
467 				}
468 				/* And one more to use right now. */
469 				xp = kmem_alloc(xap->xa_allocsize, KM_NOSLEEP);
470 			}
471 			mutex_exit(&xap->xa_reserve_mutex);
472 		} else {
473 			/*
474 			 * Try to alloc a new xbuf struct. If this fails just
475 			 * exit for now. We'll get back here again either upon
476 			 * cmd completion or via the timer handler.
477 			 * Question: what if the allocation attempt for the very
478 			 * first cmd. fails? There are no outstanding cmds so
479 			 * how do we get back here?
480 			 * Should look at un_ncmds_in_transport, if it's zero
481 			 * then schedule xbuf_restart_callback via the timer.
482 			 * Athough that breaks the architecture by bringing
483 			 * softstate data into this code.
484 			 */
485 			xp = kmem_alloc(xap->xa_allocsize, KM_NOSLEEP);
486 		}
487 		if (xp == NULL) {
488 			break; /* Can't process a cmd. right now. */
489 		}
490 
491 		/*
492 		 * Always run the counter. It's used/needed when xa_active_limit
493 		 * is non-zero which is the typical (and right now only) case.
494 		 */
495 		xap->xa_active_count++;
496 
497 		if (bp->b_private) {
498 			struct xbuf_brk *brkp = bp->b_private;
499 			struct buf *bp0 = bp;
500 
501 			brkp->active++;
502 
503 			mutex_enter(&brkp->mutex);
504 			brkp->nbufs++;
505 			mutex_exit(&brkp->mutex);
506 
507 			if (brkp->noff < bp0->b_bcount) {
508 				bp = bioclone(bp0, brkp->off, brkp->brksize,
509 				    bp0->b_edev, brkp->blkno, xbuf_brk_done,
510 				    NULL, KM_SLEEP);
511 
512 				/* update xfer position */
513 				brkp->off = brkp->noff;
514 				brkp->noff += brkp->brksize;
515 				brkp->blkno += brkp->brkblk;
516 			} else {
517 				bp = bioclone(bp0, brkp->off,
518 				    bp0->b_bcount - brkp->off, bp0->b_edev,
519 				    brkp->blkno, xbuf_brk_done, NULL, KM_SLEEP);
520 
521 				/* unlink the buf from the list */
522 				xap->xa_headp = bp0->av_forw;
523 				bp0->av_forw = NULL;
524 			}
525 			bp->b_clone_private = (struct buf *)brkp;
526 		} else {
527 			/* unlink the buf from the list */
528 			xap->xa_headp = bp->av_forw;
529 			bp->av_forw = NULL;
530 		}
531 
532 		/*
533 		 * Hack needed in the prototype so ddi_xbuf_get() will work.
534 		 * Here we can rely on the sd code not changing the value in
535 		 * b_private (in fact it wants it there). See ddi_get_xbuf()
536 		 */
537 		bp->b_private = xp;
538 
539 		/* call the driver's iostart routine */
540 		mutex_exit(&xap->xa_mutex);
541 		(*(xap->xa_strategy))(bp, xp, xap->xa_attr_arg);
542 	}
543 
544 	ASSERT(xap->xa_pending > 0);
545 	xap->xa_pending--;
546 	mutex_exit(&xap->xa_mutex);
547 	return (0);
548 }
549 
550 
551 /*
552  * Re-start IO processing if there is anything on the queue, AND if the
553  * restart function is not already running/pending for this ddi_xbuf_attr_t
554  */
555 static void
556 xbuf_dispatch(ddi_xbuf_attr_t xap)
557 {
558 	ASSERT(xap != NULL);
559 	ASSERT(xap->xa_tq != NULL);
560 	ASSERT(mutex_owned(&xap->xa_mutex));
561 
562 	if ((xap->xa_headp != NULL) && (xap->xa_timeid == NULL) &&
563 	    (xap->xa_pending == 0)) {
564 		/*
565 		 * First try to see if we can dispatch the restart function
566 		 * immediately, in a taskq thread.  If this fails, then
567 		 * schedule a timeout(9F) callback to try again later.
568 		 */
569 		if (taskq_dispatch(xap->xa_tq,
570 		    (void (*)(void *)) xbuf_iostart, xap, KM_NOSLEEP) == 0) {
571 			/*
572 			 * Unable to enqueue the request for the taskq thread,
573 			 * try again later.  Note that this will keep re-trying
574 			 * until taskq_dispatch() succeeds.
575 			 */
576 			xap->xa_timeid = timeout(xbuf_restart_callback, xap,
577 			    XBUF_DISPATCH_DELAY);
578 		} else {
579 			/*
580 			 * This indicates that xbuf_iostart() will soon be
581 			 * run for this ddi_xbuf_attr_t, and we do not need to
582 			 * schedule another invocation via timeout/taskq
583 			 */
584 			xap->xa_pending++;
585 		}
586 	}
587 }
588 
589 /* timeout(9F) callback routine for xbuf restart mechanism. */
590 static void
591 xbuf_restart_callback(void *arg)
592 {
593 	ddi_xbuf_attr_t	xap = arg;
594 
595 	ASSERT(xap != NULL);
596 	ASSERT(xap->xa_tq != NULL);
597 	ASSERT(!mutex_owned(&xap->xa_mutex));
598 
599 	mutex_enter(&xap->xa_mutex);
600 	xap->xa_timeid = NULL;
601 	xbuf_dispatch(xap);
602 	mutex_exit(&xap->xa_mutex);
603 }
604 
605 
606 DDII void
607 ddi_xbuf_flushq(ddi_xbuf_attr_t xap, int (*funcp)(struct buf *))
608 {
609 	struct buf *bp;
610 	struct buf *next_bp;
611 	struct buf *prev_bp = NULL;
612 
613 	ASSERT(xap != NULL);
614 	ASSERT(xap->xa_tq != NULL);
615 	ASSERT(!mutex_owned(&xap->xa_mutex));
616 
617 	mutex_enter(&xap->xa_mutex);
618 
619 	for (bp = xap->xa_headp; bp != NULL; bp = next_bp) {
620 
621 		next_bp = bp->av_forw;	/* Save for next iteration */
622 
623 		/*
624 		 * If the user-supplied function is non-NULL and returns
625 		 * FALSE, then just leave the current bp on the queue.
626 		 */
627 		if ((funcp != NULL) && (!(*funcp)(bp))) {
628 			prev_bp = bp;
629 			continue;
630 		}
631 
632 		/* de-queue the bp */
633 		if (bp == xap->xa_headp) {
634 			xap->xa_headp = next_bp;
635 			if (xap->xa_headp == NULL) {
636 				xap->xa_tailp = NULL;
637 			}
638 		} else {
639 			ASSERT(xap->xa_headp != NULL);
640 			ASSERT(prev_bp != NULL);
641 			if (bp == xap->xa_tailp) {
642 				ASSERT(next_bp == NULL);
643 				xap->xa_tailp = prev_bp;
644 			}
645 			prev_bp->av_forw = next_bp;
646 		}
647 		bp->av_forw = NULL;
648 
649 		/* Add the bp to the flush queue */
650 		if (xap->xa_flush_headp == NULL) {
651 			ASSERT(xap->xa_flush_tailp == NULL);
652 			xap->xa_flush_headp = xap->xa_flush_tailp = bp;
653 		} else {
654 			ASSERT(xap->xa_flush_tailp != NULL);
655 			xap->xa_flush_tailp->av_forw = bp;
656 			xap->xa_flush_tailp = bp;
657 		}
658 	}
659 
660 	while ((bp = xap->xa_flush_headp) != NULL) {
661 		xap->xa_flush_headp = bp->av_forw;
662 		if (xap->xa_flush_headp == NULL) {
663 			xap->xa_flush_tailp = NULL;
664 		}
665 		mutex_exit(&xap->xa_mutex);
666 		bioerror(bp, EIO);
667 		bp->b_resid = bp->b_bcount;
668 		biodone(bp);
669 		mutex_enter(&xap->xa_mutex);
670 	}
671 
672 	mutex_exit(&xap->xa_mutex);
673 }
674