xref: /titanic_52/usr/src/uts/common/fs/zfs/zio.c (revision a83cadce5d3331b64803bfc641036cec23602c74)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/zfs_context.h>
29 #include <sys/fm/fs/zfs.h>
30 #include <sys/spa.h>
31 #include <sys/txg.h>
32 #include <sys/spa_impl.h>
33 #include <sys/vdev_impl.h>
34 #include <sys/zio_impl.h>
35 #include <sys/zio_compress.h>
36 #include <sys/zio_checksum.h>
37 
38 /*
39  * ==========================================================================
40  * I/O priority table
41  * ==========================================================================
42  */
43 uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
44 	0,	/* ZIO_PRIORITY_NOW		*/
45 	0,	/* ZIO_PRIORITY_SYNC_READ	*/
46 	0,	/* ZIO_PRIORITY_SYNC_WRITE	*/
47 	6,	/* ZIO_PRIORITY_ASYNC_READ	*/
48 	4,	/* ZIO_PRIORITY_ASYNC_WRITE	*/
49 	4,	/* ZIO_PRIORITY_FREE		*/
50 	0,	/* ZIO_PRIORITY_CACHE_FILL	*/
51 	0,	/* ZIO_PRIORITY_LOG_WRITE	*/
52 	10,	/* ZIO_PRIORITY_RESILVER	*/
53 	20,	/* ZIO_PRIORITY_SCRUB		*/
54 };
55 
56 /*
57  * ==========================================================================
58  * I/O type descriptions
59  * ==========================================================================
60  */
61 char *zio_type_name[ZIO_TYPES] = {
62 	"null", "read", "write", "free", "claim", "ioctl" };
63 
64 /* At or above this size, force gang blocking - for testing */
65 uint64_t zio_gang_bang = SPA_MAXBLOCKSIZE + 1;
66 
67 /* Force an allocation failure when non-zero */
68 uint16_t zio_zil_fail_shift = 0;
69 
70 typedef struct zio_sync_pass {
71 	int	zp_defer_free;		/* defer frees after this pass */
72 	int	zp_dontcompress;	/* don't compress after this pass */
73 	int	zp_rewrite;		/* rewrite new bps after this pass */
74 } zio_sync_pass_t;
75 
76 zio_sync_pass_t zio_sync_pass = {
77 	1,	/* zp_defer_free */
78 	4,	/* zp_dontcompress */
79 	1,	/* zp_rewrite */
80 };
81 
82 /*
83  * ==========================================================================
84  * I/O kmem caches
85  * ==========================================================================
86  */
87 kmem_cache_t *zio_cache;
88 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
89 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
90 
91 #ifdef _KERNEL
92 extern vmem_t *zio_alloc_arena;
93 #endif
94 
95 void
96 zio_init(void)
97 {
98 	size_t c;
99 	vmem_t *data_alloc_arena = NULL;
100 
101 #ifdef _KERNEL
102 	data_alloc_arena = zio_alloc_arena;
103 #endif
104 
105 	zio_cache = kmem_cache_create("zio_cache", sizeof (zio_t), 0,
106 	    NULL, NULL, NULL, NULL, NULL, 0);
107 
108 	/*
109 	 * For small buffers, we want a cache for each multiple of
110 	 * SPA_MINBLOCKSIZE.  For medium-size buffers, we want a cache
111 	 * for each quarter-power of 2.  For large buffers, we want
112 	 * a cache for each multiple of PAGESIZE.
113 	 */
114 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
115 		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
116 		size_t p2 = size;
117 		size_t align = 0;
118 
119 		while (p2 & (p2 - 1))
120 			p2 &= p2 - 1;
121 
122 		if (size <= 4 * SPA_MINBLOCKSIZE) {
123 			align = SPA_MINBLOCKSIZE;
124 		} else if (P2PHASE(size, PAGESIZE) == 0) {
125 			align = PAGESIZE;
126 		} else if (P2PHASE(size, p2 >> 2) == 0) {
127 			align = p2 >> 2;
128 		}
129 
130 		if (align != 0) {
131 			char name[36];
132 			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
133 			zio_buf_cache[c] = kmem_cache_create(name, size,
134 			    align, NULL, NULL, NULL, NULL, NULL, KMC_NODEBUG);
135 
136 			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
137 			zio_data_buf_cache[c] = kmem_cache_create(name, size,
138 			    align, NULL, NULL, NULL, NULL, data_alloc_arena,
139 			    KMC_NODEBUG);
140 
141 			dprintf("creating cache for size %5lx align %5lx\n",
142 			    size, align);
143 		}
144 	}
145 
146 	while (--c != 0) {
147 		ASSERT(zio_buf_cache[c] != NULL);
148 		if (zio_buf_cache[c - 1] == NULL)
149 			zio_buf_cache[c - 1] = zio_buf_cache[c];
150 
151 		ASSERT(zio_data_buf_cache[c] != NULL);
152 		if (zio_data_buf_cache[c - 1] == NULL)
153 			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
154 	}
155 
156 	zio_inject_init();
157 }
158 
159 void
160 zio_fini(void)
161 {
162 	size_t c;
163 	kmem_cache_t *last_cache = NULL;
164 	kmem_cache_t *last_data_cache = NULL;
165 
166 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
167 		if (zio_buf_cache[c] != last_cache) {
168 			last_cache = zio_buf_cache[c];
169 			kmem_cache_destroy(zio_buf_cache[c]);
170 		}
171 		zio_buf_cache[c] = NULL;
172 
173 		if (zio_data_buf_cache[c] != last_data_cache) {
174 			last_data_cache = zio_data_buf_cache[c];
175 			kmem_cache_destroy(zio_data_buf_cache[c]);
176 		}
177 		zio_data_buf_cache[c] = NULL;
178 	}
179 
180 	kmem_cache_destroy(zio_cache);
181 
182 	zio_inject_fini();
183 }
184 
185 /*
186  * ==========================================================================
187  * Allocate and free I/O buffers
188  * ==========================================================================
189  */
190 
191 /*
192  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
193  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
194  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
195  * excess / transient data in-core during a crashdump.
196  */
197 void *
198 zio_buf_alloc(size_t size)
199 {
200 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
201 
202 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
203 
204 	return (kmem_cache_alloc(zio_buf_cache[c], KM_SLEEP));
205 }
206 
207 /*
208  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
209  * crashdump if the kernel panics.  This exists so that we will limit the amount
210  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
211  * of kernel heap dumped to disk when the kernel panics)
212  */
213 void *
214 zio_data_buf_alloc(size_t size)
215 {
216 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
217 
218 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
219 
220 	return (kmem_cache_alloc(zio_data_buf_cache[c], KM_SLEEP));
221 }
222 
223 void
224 zio_buf_free(void *buf, size_t size)
225 {
226 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
227 
228 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
229 
230 	kmem_cache_free(zio_buf_cache[c], buf);
231 }
232 
233 void
234 zio_data_buf_free(void *buf, size_t size)
235 {
236 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
237 
238 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
239 
240 	kmem_cache_free(zio_data_buf_cache[c], buf);
241 }
242 
243 /*
244  * ==========================================================================
245  * Push and pop I/O transform buffers
246  * ==========================================================================
247  */
248 static void
249 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize)
250 {
251 	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
252 
253 	zt->zt_data = data;
254 	zt->zt_size = size;
255 	zt->zt_bufsize = bufsize;
256 
257 	zt->zt_next = zio->io_transform_stack;
258 	zio->io_transform_stack = zt;
259 
260 	zio->io_data = data;
261 	zio->io_size = size;
262 }
263 
264 static void
265 zio_pop_transform(zio_t *zio, void **data, uint64_t *size, uint64_t *bufsize)
266 {
267 	zio_transform_t *zt = zio->io_transform_stack;
268 
269 	*data = zt->zt_data;
270 	*size = zt->zt_size;
271 	*bufsize = zt->zt_bufsize;
272 
273 	zio->io_transform_stack = zt->zt_next;
274 	kmem_free(zt, sizeof (zio_transform_t));
275 
276 	if ((zt = zio->io_transform_stack) != NULL) {
277 		zio->io_data = zt->zt_data;
278 		zio->io_size = zt->zt_size;
279 	}
280 }
281 
282 static void
283 zio_clear_transform_stack(zio_t *zio)
284 {
285 	void *data;
286 	uint64_t size, bufsize;
287 
288 	ASSERT(zio->io_transform_stack != NULL);
289 
290 	zio_pop_transform(zio, &data, &size, &bufsize);
291 	while (zio->io_transform_stack != NULL) {
292 		zio_buf_free(data, bufsize);
293 		zio_pop_transform(zio, &data, &size, &bufsize);
294 	}
295 }
296 
297 /*
298  * ==========================================================================
299  * Create the various types of I/O (read, write, free)
300  * ==========================================================================
301  */
302 static zio_t *
303 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
304     void *data, uint64_t size, zio_done_func_t *done, void *private,
305     zio_type_t type, int priority, int flags, uint8_t stage, uint32_t pipeline)
306 {
307 	zio_t *zio;
308 
309 	ASSERT3U(size, <=, SPA_MAXBLOCKSIZE);
310 	ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
311 
312 	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
313 	bzero(zio, sizeof (zio_t));
314 	zio->io_parent = pio;
315 	zio->io_spa = spa;
316 	zio->io_txg = txg;
317 	zio->io_flags = flags;
318 	if (bp != NULL) {
319 		zio->io_bp = bp;
320 		zio->io_bp_copy = *bp;
321 		zio->io_bp_orig = *bp;
322 		if (dmu_ot[BP_GET_TYPE(bp)].ot_metadata ||
323 		    BP_GET_LEVEL(bp) != 0)
324 			zio->io_flags |= ZIO_FLAG_METADATA;
325 	}
326 	zio->io_done = done;
327 	zio->io_private = private;
328 	zio->io_type = type;
329 	zio->io_priority = priority;
330 	zio->io_stage = stage;
331 	zio->io_pipeline = pipeline;
332 	zio->io_async_stages = ZIO_ASYNC_PIPELINE_STAGES;
333 	zio->io_timestamp = lbolt64;
334 	if (pio != NULL)
335 		zio->io_flags |= (pio->io_flags & ZIO_FLAG_METADATA);
336 	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
337 	zio_push_transform(zio, data, size, size);
338 
339 	/*
340 	 * Note on config lock:
341 	 *
342 	 * If CONFIG_HELD is set, then the caller already has the config
343 	 * lock, so we don't need it for this io.
344 	 *
345 	 * We set CONFIG_GRABBED to indicate that we have grabbed the
346 	 * config lock on behalf of this io, so it should be released
347 	 * in zio_done.
348 	 *
349 	 * Unless CONFIG_HELD is set, we will grab the config lock for
350 	 * any top-level (parent-less) io, *except* NULL top-level ios.
351 	 * The NULL top-level ios rarely have any children, so we delay
352 	 * grabbing the lock until the first child is added (but it is
353 	 * still grabbed on behalf of the top-level i/o, so additional
354 	 * children don't need to also grab it).  This greatly reduces
355 	 * contention on the config lock.
356 	 */
357 	if (pio == NULL) {
358 		if (type != ZIO_TYPE_NULL &&
359 		    !(flags & ZIO_FLAG_CONFIG_HELD)) {
360 			spa_config_enter(zio->io_spa, RW_READER, zio);
361 			zio->io_flags |= ZIO_FLAG_CONFIG_GRABBED;
362 		}
363 		zio->io_root = zio;
364 	} else {
365 		zio->io_root = pio->io_root;
366 		if (!(flags & ZIO_FLAG_NOBOOKMARK))
367 			zio->io_logical = pio->io_logical;
368 		mutex_enter(&pio->io_lock);
369 		if (pio->io_parent == NULL &&
370 		    pio->io_type == ZIO_TYPE_NULL &&
371 		    !(pio->io_flags & ZIO_FLAG_CONFIG_GRABBED) &&
372 		    !(pio->io_flags & ZIO_FLAG_CONFIG_HELD)) {
373 			pio->io_flags |= ZIO_FLAG_CONFIG_GRABBED;
374 			spa_config_enter(zio->io_spa, RW_READER, pio);
375 		}
376 		if (stage < ZIO_STAGE_READY)
377 			pio->io_children_notready++;
378 		pio->io_children_notdone++;
379 		zio->io_sibling_next = pio->io_child;
380 		zio->io_sibling_prev = NULL;
381 		if (pio->io_child != NULL)
382 			pio->io_child->io_sibling_prev = zio;
383 		pio->io_child = zio;
384 		zio->io_ndvas = pio->io_ndvas;
385 		mutex_exit(&pio->io_lock);
386 	}
387 
388 	return (zio);
389 }
390 
391 zio_t *
392 zio_null(zio_t *pio, spa_t *spa, zio_done_func_t *done, void *private,
393 	int flags)
394 {
395 	zio_t *zio;
396 
397 	zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
398 	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, ZIO_STAGE_OPEN,
399 	    ZIO_WAIT_FOR_CHILDREN_PIPELINE);
400 
401 	return (zio);
402 }
403 
404 zio_t *
405 zio_root(spa_t *spa, zio_done_func_t *done, void *private, int flags)
406 {
407 	return (zio_null(NULL, spa, done, private, flags));
408 }
409 
410 zio_t *
411 zio_read(zio_t *pio, spa_t *spa, blkptr_t *bp, void *data,
412     uint64_t size, zio_done_func_t *done, void *private,
413     int priority, int flags, zbookmark_t *zb)
414 {
415 	zio_t *zio;
416 
417 	ASSERT3U(size, ==, BP_GET_LSIZE(bp));
418 
419 	zio = zio_create(pio, spa, bp->blk_birth, bp, data, size, done, private,
420 	    ZIO_TYPE_READ, priority, flags | ZIO_FLAG_USER,
421 	    ZIO_STAGE_OPEN, ZIO_READ_PIPELINE);
422 	zio->io_bookmark = *zb;
423 
424 	zio->io_logical = zio;
425 
426 	/*
427 	 * Work off our copy of the bp so the caller can free it.
428 	 */
429 	zio->io_bp = &zio->io_bp_copy;
430 
431 	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF) {
432 		uint64_t csize = BP_GET_PSIZE(bp);
433 		void *cbuf = zio_buf_alloc(csize);
434 
435 		zio_push_transform(zio, cbuf, csize, csize);
436 		zio->io_pipeline |= 1U << ZIO_STAGE_READ_DECOMPRESS;
437 	}
438 
439 	if (BP_IS_GANG(bp)) {
440 		uint64_t gsize = SPA_GANGBLOCKSIZE;
441 		void *gbuf = zio_buf_alloc(gsize);
442 
443 		zio_push_transform(zio, gbuf, gsize, gsize);
444 		zio->io_pipeline |= 1U << ZIO_STAGE_READ_GANG_MEMBERS;
445 	}
446 
447 	return (zio);
448 }
449 
450 zio_t *
451 zio_write(zio_t *pio, spa_t *spa, int checksum, int compress, int ncopies,
452     uint64_t txg, blkptr_t *bp, void *data, uint64_t size,
453     zio_done_func_t *ready, zio_done_func_t *done, void *private, int priority,
454     int flags, zbookmark_t *zb)
455 {
456 	zio_t *zio;
457 
458 	ASSERT(checksum >= ZIO_CHECKSUM_OFF &&
459 	    checksum < ZIO_CHECKSUM_FUNCTIONS);
460 
461 	ASSERT(compress >= ZIO_COMPRESS_OFF &&
462 	    compress < ZIO_COMPRESS_FUNCTIONS);
463 
464 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
465 	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_USER,
466 	    ZIO_STAGE_OPEN, ZIO_WRITE_PIPELINE);
467 
468 	zio->io_ready = ready;
469 
470 	zio->io_bookmark = *zb;
471 
472 	zio->io_logical = zio;
473 
474 	zio->io_checksum = checksum;
475 	zio->io_compress = compress;
476 	zio->io_ndvas = ncopies;
477 
478 	if (compress != ZIO_COMPRESS_OFF)
479 		zio->io_async_stages |= 1U << ZIO_STAGE_WRITE_COMPRESS;
480 
481 	if (bp->blk_birth != txg) {
482 		/* XXX the bp usually (always?) gets re-zeroed later */
483 		BP_ZERO(bp);
484 		BP_SET_LSIZE(bp, size);
485 		BP_SET_PSIZE(bp, size);
486 	} else {
487 		/* Make sure someone doesn't change their mind on overwrites */
488 		ASSERT(MIN(zio->io_ndvas + BP_IS_GANG(bp),
489 		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
490 	}
491 
492 	return (zio);
493 }
494 
495 zio_t *
496 zio_rewrite(zio_t *pio, spa_t *spa, int checksum,
497     uint64_t txg, blkptr_t *bp, void *data, uint64_t size,
498     zio_done_func_t *done, void *private, int priority, int flags,
499     zbookmark_t *zb)
500 {
501 	zio_t *zio;
502 
503 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
504 	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_USER,
505 	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
506 
507 	zio->io_bookmark = *zb;
508 	zio->io_checksum = checksum;
509 	zio->io_compress = ZIO_COMPRESS_OFF;
510 
511 	if (pio != NULL)
512 		ASSERT3U(zio->io_ndvas, <=, BP_GET_NDVAS(bp));
513 
514 	return (zio);
515 }
516 
517 static zio_t *
518 zio_write_allocate(zio_t *pio, spa_t *spa, int checksum,
519     uint64_t txg, blkptr_t *bp, void *data, uint64_t size,
520     zio_done_func_t *done, void *private, int priority, int flags)
521 {
522 	zio_t *zio;
523 
524 	BP_ZERO(bp);
525 	BP_SET_LSIZE(bp, size);
526 	BP_SET_PSIZE(bp, size);
527 	BP_SET_COMPRESS(bp, ZIO_COMPRESS_OFF);
528 
529 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
530 	    ZIO_TYPE_WRITE, priority, flags,
531 	    ZIO_STAGE_OPEN, ZIO_WRITE_ALLOCATE_PIPELINE);
532 
533 	zio->io_checksum = checksum;
534 	zio->io_compress = ZIO_COMPRESS_OFF;
535 
536 	return (zio);
537 }
538 
539 zio_t *
540 zio_free(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
541     zio_done_func_t *done, void *private)
542 {
543 	zio_t *zio;
544 
545 	ASSERT(!BP_IS_HOLE(bp));
546 
547 	if (txg == spa->spa_syncing_txg &&
548 	    spa->spa_sync_pass > zio_sync_pass.zp_defer_free) {
549 		bplist_enqueue_deferred(&spa->spa_sync_bplist, bp);
550 		return (zio_null(pio, spa, NULL, NULL, 0));
551 	}
552 
553 	zio = zio_create(pio, spa, txg, bp, NULL, 0, done, private,
554 	    ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, ZIO_FLAG_USER,
555 	    ZIO_STAGE_OPEN, ZIO_FREE_PIPELINE);
556 
557 	zio->io_bp = &zio->io_bp_copy;
558 
559 	return (zio);
560 }
561 
562 zio_t *
563 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
564     zio_done_func_t *done, void *private)
565 {
566 	zio_t *zio;
567 
568 	/*
569 	 * A claim is an allocation of a specific block.  Claims are needed
570 	 * to support immediate writes in the intent log.  The issue is that
571 	 * immediate writes contain committed data, but in a txg that was
572 	 * *not* committed.  Upon opening the pool after an unclean shutdown,
573 	 * the intent log claims all blocks that contain immediate write data
574 	 * so that the SPA knows they're in use.
575 	 *
576 	 * All claims *must* be resolved in the first txg -- before the SPA
577 	 * starts allocating blocks -- so that nothing is allocated twice.
578 	 */
579 	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
580 	ASSERT3U(spa_first_txg(spa), <=, txg);
581 
582 	zio = zio_create(pio, spa, txg, bp, NULL, 0, done, private,
583 	    ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW, 0,
584 	    ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
585 
586 	zio->io_bp = &zio->io_bp_copy;
587 
588 	return (zio);
589 }
590 
591 zio_t *
592 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
593     zio_done_func_t *done, void *private, int priority, int flags)
594 {
595 	zio_t *zio;
596 	int c;
597 
598 	if (vd->vdev_children == 0) {
599 		zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
600 		    ZIO_TYPE_IOCTL, priority, flags,
601 		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
602 
603 		zio->io_vd = vd;
604 		zio->io_cmd = cmd;
605 	} else {
606 		zio = zio_null(pio, spa, NULL, NULL, flags);
607 
608 		for (c = 0; c < vd->vdev_children; c++)
609 			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
610 			    done, private, priority, flags));
611 	}
612 
613 	return (zio);
614 }
615 
616 static void
617 zio_phys_bp_init(vdev_t *vd, blkptr_t *bp, uint64_t offset, uint64_t size,
618     int checksum)
619 {
620 	ASSERT(vd->vdev_children == 0);
621 
622 	ASSERT(size <= SPA_MAXBLOCKSIZE);
623 	ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
624 	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
625 
626 	ASSERT(offset + size <= VDEV_LABEL_START_SIZE ||
627 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
628 	ASSERT3U(offset + size, <=, vd->vdev_psize);
629 
630 	BP_ZERO(bp);
631 
632 	BP_SET_LSIZE(bp, size);
633 	BP_SET_PSIZE(bp, size);
634 
635 	BP_SET_CHECKSUM(bp, checksum);
636 	BP_SET_COMPRESS(bp, ZIO_COMPRESS_OFF);
637 	BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
638 
639 	if (checksum != ZIO_CHECKSUM_OFF)
640 		ZIO_SET_CHECKSUM(&bp->blk_cksum, offset, 0, 0, 0);
641 }
642 
643 zio_t *
644 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
645     void *data, int checksum, zio_done_func_t *done, void *private,
646     int priority, int flags)
647 {
648 	zio_t *zio;
649 	blkptr_t blk;
650 
651 	zio_phys_bp_init(vd, &blk, offset, size, checksum);
652 
653 	zio = zio_create(pio, vd->vdev_spa, 0, &blk, data, size, done, private,
654 	    ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL,
655 	    ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
656 
657 	zio->io_vd = vd;
658 	zio->io_offset = offset;
659 
660 	/*
661 	 * Work off our copy of the bp so the caller can free it.
662 	 */
663 	zio->io_bp = &zio->io_bp_copy;
664 
665 	return (zio);
666 }
667 
668 zio_t *
669 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
670     void *data, int checksum, zio_done_func_t *done, void *private,
671     int priority, int flags)
672 {
673 	zio_block_tail_t *zbt;
674 	void *wbuf;
675 	zio_t *zio;
676 	blkptr_t blk;
677 
678 	zio_phys_bp_init(vd, &blk, offset, size, checksum);
679 
680 	zio = zio_create(pio, vd->vdev_spa, 0, &blk, data, size, done, private,
681 	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL,
682 	    ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
683 
684 	zio->io_vd = vd;
685 	zio->io_offset = offset;
686 
687 	zio->io_bp = &zio->io_bp_copy;
688 	zio->io_checksum = checksum;
689 
690 	if (zio_checksum_table[checksum].ci_zbt) {
691 		/*
692 		 * zbt checksums are necessarily destructive -- they modify
693 		 * one word of the write buffer to hold the verifier/checksum.
694 		 * Therefore, we must make a local copy in case the data is
695 		 * being written to multiple places.
696 		 */
697 		wbuf = zio_buf_alloc(size);
698 		bcopy(data, wbuf, size);
699 		zio_push_transform(zio, wbuf, size, size);
700 
701 		zbt = (zio_block_tail_t *)((char *)wbuf + size) - 1;
702 		zbt->zbt_cksum = blk.blk_cksum;
703 	}
704 
705 	return (zio);
706 }
707 
708 /*
709  * Create a child I/O to do some work for us.  It has no associated bp.
710  */
711 zio_t *
712 zio_vdev_child_io(zio_t *zio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
713 	void *data, uint64_t size, int type, int priority, int flags,
714 	zio_done_func_t *done, void *private)
715 {
716 	uint32_t pipeline = ZIO_VDEV_CHILD_PIPELINE;
717 	zio_t *cio;
718 
719 	if (type == ZIO_TYPE_READ && bp != NULL) {
720 		/*
721 		 * If we have the bp, then the child should perform the
722 		 * checksum and the parent need not.  This pushes error
723 		 * detection as close to the leaves as possible and
724 		 * eliminates redundant checksums in the interior nodes.
725 		 */
726 		pipeline |= 1U << ZIO_STAGE_CHECKSUM_VERIFY;
727 		zio->io_pipeline &= ~(1U << ZIO_STAGE_CHECKSUM_VERIFY);
728 	}
729 
730 	cio = zio_create(zio, zio->io_spa, zio->io_txg, bp, data, size,
731 	    done, private, type, priority,
732 	    (zio->io_flags & ZIO_FLAG_VDEV_INHERIT) | ZIO_FLAG_CANFAIL | flags,
733 	    ZIO_STAGE_VDEV_IO_START - 1, pipeline);
734 
735 	cio->io_vd = vd;
736 	cio->io_offset = offset;
737 
738 	return (cio);
739 }
740 
741 /*
742  * ==========================================================================
743  * Initiate I/O, either sync or async
744  * ==========================================================================
745  */
746 int
747 zio_wait(zio_t *zio)
748 {
749 	int error;
750 
751 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
752 
753 	zio->io_waiter = curthread;
754 
755 	zio_next_stage_async(zio);
756 
757 	mutex_enter(&zio->io_lock);
758 	while (zio->io_stalled != ZIO_STAGE_DONE)
759 		cv_wait(&zio->io_cv, &zio->io_lock);
760 	mutex_exit(&zio->io_lock);
761 
762 	error = zio->io_error;
763 	mutex_destroy(&zio->io_lock);
764 	kmem_cache_free(zio_cache, zio);
765 
766 	return (error);
767 }
768 
769 void
770 zio_nowait(zio_t *zio)
771 {
772 	zio_next_stage_async(zio);
773 }
774 
775 /*
776  * ==========================================================================
777  * I/O pipeline interlocks: parent/child dependency scoreboarding
778  * ==========================================================================
779  */
780 static void
781 zio_wait_for_children(zio_t *zio, uint32_t stage, uint64_t *countp)
782 {
783 	mutex_enter(&zio->io_lock);
784 	if (*countp == 0) {
785 		ASSERT(zio->io_stalled == 0);
786 		mutex_exit(&zio->io_lock);
787 		zio_next_stage(zio);
788 	} else {
789 		zio->io_stalled = stage;
790 		mutex_exit(&zio->io_lock);
791 	}
792 }
793 
794 static void
795 zio_notify_parent(zio_t *zio, uint32_t stage, uint64_t *countp)
796 {
797 	zio_t *pio = zio->io_parent;
798 
799 	mutex_enter(&pio->io_lock);
800 	if (pio->io_error == 0 && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
801 		pio->io_error = zio->io_error;
802 	if (--*countp == 0 && pio->io_stalled == stage) {
803 		pio->io_stalled = 0;
804 		mutex_exit(&pio->io_lock);
805 		zio_next_stage_async(pio);
806 	} else {
807 		mutex_exit(&pio->io_lock);
808 	}
809 }
810 
811 static void
812 zio_wait_children_ready(zio_t *zio)
813 {
814 	zio_wait_for_children(zio, ZIO_STAGE_WAIT_CHILDREN_READY,
815 	    &zio->io_children_notready);
816 }
817 
818 void
819 zio_wait_children_done(zio_t *zio)
820 {
821 	zio_wait_for_children(zio, ZIO_STAGE_WAIT_CHILDREN_DONE,
822 	    &zio->io_children_notdone);
823 }
824 
825 static void
826 zio_ready(zio_t *zio)
827 {
828 	zio_t *pio = zio->io_parent;
829 
830 	if (zio->io_ready)
831 		zio->io_ready(zio);
832 
833 	if (pio != NULL)
834 		zio_notify_parent(zio, ZIO_STAGE_WAIT_CHILDREN_READY,
835 		    &pio->io_children_notready);
836 
837 	if (zio->io_bp)
838 		zio->io_bp_copy = *zio->io_bp;
839 
840 	zio_next_stage(zio);
841 }
842 
843 static void
844 zio_done(zio_t *zio)
845 {
846 	zio_t *pio = zio->io_parent;
847 	spa_t *spa = zio->io_spa;
848 	blkptr_t *bp = zio->io_bp;
849 	vdev_t *vd = zio->io_vd;
850 
851 	ASSERT(zio->io_children_notready == 0);
852 	ASSERT(zio->io_children_notdone == 0);
853 
854 	if (bp != NULL) {
855 		ASSERT(bp->blk_pad[0] == 0);
856 		ASSERT(bp->blk_pad[1] == 0);
857 		ASSERT(bp->blk_pad[2] == 0);
858 		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0);
859 		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
860 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
861 			ASSERT(!BP_SHOULD_BYTESWAP(bp));
862 			if (zio->io_ndvas != 0)
863 				ASSERT3U(zio->io_ndvas, <=, BP_GET_NDVAS(bp));
864 			ASSERT(BP_COUNT_GANG(bp) == 0 ||
865 			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
866 		}
867 	}
868 
869 	if (vd != NULL)
870 		vdev_stat_update(zio);
871 
872 	if (zio->io_error) {
873 		/*
874 		 * If this I/O is attached to a particular vdev,
875 		 * generate an error message describing the I/O failure
876 		 * at the block level.  We ignore these errors if the
877 		 * device is currently unavailable.
878 		 */
879 		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
880 			zfs_ereport_post(FM_EREPORT_ZFS_IO,
881 			    zio->io_spa, vd, zio, 0, 0);
882 
883 		if ((zio->io_error == EIO ||
884 		    !(zio->io_flags & ZIO_FLAG_SPECULATIVE)) &&
885 		    zio->io_logical == zio) {
886 			/*
887 			 * For root I/O requests, tell the SPA to log the error
888 			 * appropriately.  Also, generate a logical data
889 			 * ereport.
890 			 */
891 			spa_log_error(zio->io_spa, zio);
892 
893 			zfs_ereport_post(FM_EREPORT_ZFS_DATA,
894 			    zio->io_spa, NULL, zio, 0, 0);
895 		}
896 
897 		/*
898 		 * For I/O requests that cannot fail, panic appropriately.
899 		 */
900 		if (!(zio->io_flags & ZIO_FLAG_CANFAIL)) {
901 			char *blkbuf;
902 
903 			blkbuf = kmem_alloc(BP_SPRINTF_LEN, KM_NOSLEEP);
904 			if (blkbuf) {
905 				sprintf_blkptr(blkbuf, BP_SPRINTF_LEN,
906 				    bp ? bp : &zio->io_bp_copy);
907 			}
908 			panic("ZFS: %s (%s on %s off %llx: zio %p %s): error "
909 			    "%d", zio->io_error == ECKSUM ?
910 			    "bad checksum" : "I/O failure",
911 			    zio_type_name[zio->io_type],
912 			    vdev_description(vd),
913 			    (u_longlong_t)zio->io_offset,
914 			    zio, blkbuf ? blkbuf : "", zio->io_error);
915 		}
916 	}
917 	zio_clear_transform_stack(zio);
918 
919 	if (zio->io_done)
920 		zio->io_done(zio);
921 
922 	ASSERT(zio->io_delegate_list == NULL);
923 	ASSERT(zio->io_delegate_next == NULL);
924 
925 	if (pio != NULL) {
926 		zio_t *next, *prev;
927 
928 		mutex_enter(&pio->io_lock);
929 		next = zio->io_sibling_next;
930 		prev = zio->io_sibling_prev;
931 		if (next != NULL)
932 			next->io_sibling_prev = prev;
933 		if (prev != NULL)
934 			prev->io_sibling_next = next;
935 		if (pio->io_child == zio)
936 			pio->io_child = next;
937 		mutex_exit(&pio->io_lock);
938 
939 		zio_notify_parent(zio, ZIO_STAGE_WAIT_CHILDREN_DONE,
940 		    &pio->io_children_notdone);
941 	}
942 
943 	/*
944 	 * Note: this I/O is now done, and will shortly be freed, so there is no
945 	 * need to clear this (or any other) flag.
946 	 */
947 	if (zio->io_flags & ZIO_FLAG_CONFIG_GRABBED)
948 		spa_config_exit(spa, zio);
949 
950 	if (zio->io_waiter != NULL) {
951 		mutex_enter(&zio->io_lock);
952 		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
953 		zio->io_stalled = zio->io_stage;
954 		cv_broadcast(&zio->io_cv);
955 		mutex_exit(&zio->io_lock);
956 	} else {
957 		kmem_cache_free(zio_cache, zio);
958 	}
959 }
960 
961 /*
962  * ==========================================================================
963  * Compression support
964  * ==========================================================================
965  */
966 static void
967 zio_write_compress(zio_t *zio)
968 {
969 	int compress = zio->io_compress;
970 	blkptr_t *bp = zio->io_bp;
971 	void *cbuf;
972 	uint64_t lsize = zio->io_size;
973 	uint64_t csize = lsize;
974 	uint64_t cbufsize = 0;
975 	int pass;
976 
977 	if (bp->blk_birth == zio->io_txg) {
978 		/*
979 		 * We're rewriting an existing block, which means we're
980 		 * working on behalf of spa_sync().  For spa_sync() to
981 		 * converge, it must eventually be the case that we don't
982 		 * have to allocate new blocks.  But compression changes
983 		 * the blocksize, which forces a reallocate, and makes
984 		 * convergence take longer.  Therefore, after the first
985 		 * few passes, stop compressing to ensure convergence.
986 		 */
987 		pass = spa_sync_pass(zio->io_spa);
988 		if (pass > zio_sync_pass.zp_dontcompress)
989 			compress = ZIO_COMPRESS_OFF;
990 	} else {
991 		ASSERT(BP_IS_HOLE(bp));
992 		pass = 1;
993 	}
994 
995 	if (compress != ZIO_COMPRESS_OFF)
996 		if (!zio_compress_data(compress, zio->io_data, zio->io_size,
997 		    &cbuf, &csize, &cbufsize))
998 			compress = ZIO_COMPRESS_OFF;
999 
1000 	if (compress != ZIO_COMPRESS_OFF && csize != 0)
1001 		zio_push_transform(zio, cbuf, csize, cbufsize);
1002 
1003 	/*
1004 	 * The final pass of spa_sync() must be all rewrites, but the first
1005 	 * few passes offer a trade-off: allocating blocks defers convergence,
1006 	 * but newly allocated blocks are sequential, so they can be written
1007 	 * to disk faster.  Therefore, we allow the first few passes of
1008 	 * spa_sync() to reallocate new blocks, but force rewrites after that.
1009 	 * There should only be a handful of blocks after pass 1 in any case.
1010 	 */
1011 	if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == csize &&
1012 	    pass > zio_sync_pass.zp_rewrite) {
1013 		ASSERT(csize != 0);
1014 		BP_SET_LSIZE(bp, lsize);
1015 		BP_SET_COMPRESS(bp, compress);
1016 		zio->io_pipeline = ZIO_REWRITE_PIPELINE;
1017 	} else {
1018 		if (bp->blk_birth == zio->io_txg)
1019 			BP_ZERO(bp);
1020 		if (csize == 0) {
1021 			BP_ZERO(bp);
1022 			zio->io_pipeline = ZIO_WAIT_FOR_CHILDREN_PIPELINE;
1023 		} else {
1024 			ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
1025 			BP_SET_LSIZE(bp, lsize);
1026 			BP_SET_PSIZE(bp, csize);
1027 			BP_SET_COMPRESS(bp, compress);
1028 			zio->io_pipeline = ZIO_WRITE_ALLOCATE_PIPELINE;
1029 		}
1030 	}
1031 
1032 	zio_next_stage(zio);
1033 }
1034 
1035 static void
1036 zio_read_decompress(zio_t *zio)
1037 {
1038 	blkptr_t *bp = zio->io_bp;
1039 	void *data;
1040 	uint64_t size;
1041 	uint64_t bufsize;
1042 	int compress = BP_GET_COMPRESS(bp);
1043 
1044 	ASSERT(compress != ZIO_COMPRESS_OFF);
1045 
1046 	zio_pop_transform(zio, &data, &size, &bufsize);
1047 
1048 	if (zio_decompress_data(compress, data, size,
1049 	    zio->io_data, zio->io_size))
1050 		zio->io_error = EIO;
1051 
1052 	zio_buf_free(data, bufsize);
1053 
1054 	zio_next_stage(zio);
1055 }
1056 
1057 /*
1058  * ==========================================================================
1059  * Gang block support
1060  * ==========================================================================
1061  */
1062 static void
1063 zio_gang_pipeline(zio_t *zio)
1064 {
1065 	/*
1066 	 * By default, the pipeline assumes that we're dealing with a gang
1067 	 * block.  If we're not, strip out any gang-specific stages.
1068 	 */
1069 	if (!BP_IS_GANG(zio->io_bp))
1070 		zio->io_pipeline &= ~ZIO_GANG_STAGES;
1071 
1072 	zio_next_stage(zio);
1073 }
1074 
1075 static void
1076 zio_gang_byteswap(zio_t *zio)
1077 {
1078 	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
1079 
1080 	if (BP_SHOULD_BYTESWAP(zio->io_bp))
1081 		byteswap_uint64_array(zio->io_data, zio->io_size);
1082 }
1083 
1084 static void
1085 zio_get_gang_header(zio_t *zio)
1086 {
1087 	blkptr_t *bp = zio->io_bp;
1088 	uint64_t gsize = SPA_GANGBLOCKSIZE;
1089 	void *gbuf = zio_buf_alloc(gsize);
1090 
1091 	ASSERT(BP_IS_GANG(bp));
1092 
1093 	zio_push_transform(zio, gbuf, gsize, gsize);
1094 
1095 	zio_nowait(zio_create(zio, zio->io_spa, bp->blk_birth, bp, gbuf, gsize,
1096 	    NULL, NULL, ZIO_TYPE_READ, zio->io_priority,
1097 	    zio->io_flags & ZIO_FLAG_GANG_INHERIT,
1098 	    ZIO_STAGE_OPEN, ZIO_READ_PIPELINE));
1099 
1100 	zio_wait_children_done(zio);
1101 }
1102 
1103 static void
1104 zio_read_gang_members(zio_t *zio)
1105 {
1106 	zio_gbh_phys_t *gbh;
1107 	uint64_t gsize, gbufsize, loff, lsize;
1108 	int i;
1109 
1110 	ASSERT(BP_IS_GANG(zio->io_bp));
1111 
1112 	zio_gang_byteswap(zio);
1113 	zio_pop_transform(zio, (void **)&gbh, &gsize, &gbufsize);
1114 
1115 	for (loff = 0, i = 0; loff != zio->io_size; loff += lsize, i++) {
1116 		blkptr_t *gbp = &gbh->zg_blkptr[i];
1117 		lsize = BP_GET_PSIZE(gbp);
1118 
1119 		ASSERT(BP_GET_COMPRESS(gbp) == ZIO_COMPRESS_OFF);
1120 		ASSERT3U(lsize, ==, BP_GET_LSIZE(gbp));
1121 		ASSERT3U(loff + lsize, <=, zio->io_size);
1122 		ASSERT(i < SPA_GBH_NBLKPTRS);
1123 		ASSERT(!BP_IS_HOLE(gbp));
1124 
1125 		zio_nowait(zio_read(zio, zio->io_spa, gbp,
1126 		    (char *)zio->io_data + loff, lsize, NULL, NULL,
1127 		    zio->io_priority, zio->io_flags & ZIO_FLAG_GANG_INHERIT,
1128 		    &zio->io_bookmark));
1129 	}
1130 
1131 	zio_buf_free(gbh, gbufsize);
1132 	zio_wait_children_done(zio);
1133 }
1134 
1135 static void
1136 zio_rewrite_gang_members(zio_t *zio)
1137 {
1138 	zio_gbh_phys_t *gbh;
1139 	uint64_t gsize, gbufsize, loff, lsize;
1140 	int i;
1141 
1142 	ASSERT(BP_IS_GANG(zio->io_bp));
1143 	ASSERT3U(zio->io_size, ==, SPA_GANGBLOCKSIZE);
1144 
1145 	zio_gang_byteswap(zio);
1146 	zio_pop_transform(zio, (void **)&gbh, &gsize, &gbufsize);
1147 
1148 	ASSERT(gsize == gbufsize);
1149 
1150 	for (loff = 0, i = 0; loff != zio->io_size; loff += lsize, i++) {
1151 		blkptr_t *gbp = &gbh->zg_blkptr[i];
1152 		lsize = BP_GET_PSIZE(gbp);
1153 
1154 		ASSERT(BP_GET_COMPRESS(gbp) == ZIO_COMPRESS_OFF);
1155 		ASSERT3U(lsize, ==, BP_GET_LSIZE(gbp));
1156 		ASSERT3U(loff + lsize, <=, zio->io_size);
1157 		ASSERT(i < SPA_GBH_NBLKPTRS);
1158 		ASSERT(!BP_IS_HOLE(gbp));
1159 
1160 		zio_nowait(zio_rewrite(zio, zio->io_spa, zio->io_checksum,
1161 		    zio->io_txg, gbp, (char *)zio->io_data + loff, lsize,
1162 		    NULL, NULL, zio->io_priority, zio->io_flags,
1163 		    &zio->io_bookmark));
1164 	}
1165 
1166 	zio_push_transform(zio, gbh, gsize, gbufsize);
1167 	zio_wait_children_ready(zio);
1168 }
1169 
1170 static void
1171 zio_free_gang_members(zio_t *zio)
1172 {
1173 	zio_gbh_phys_t *gbh;
1174 	uint64_t gsize, gbufsize;
1175 	int i;
1176 
1177 	ASSERT(BP_IS_GANG(zio->io_bp));
1178 
1179 	zio_gang_byteswap(zio);
1180 	zio_pop_transform(zio, (void **)&gbh, &gsize, &gbufsize);
1181 
1182 	for (i = 0; i < SPA_GBH_NBLKPTRS; i++) {
1183 		blkptr_t *gbp = &gbh->zg_blkptr[i];
1184 
1185 		if (BP_IS_HOLE(gbp))
1186 			continue;
1187 		zio_nowait(zio_free(zio, zio->io_spa, zio->io_txg,
1188 		    gbp, NULL, NULL));
1189 	}
1190 
1191 	zio_buf_free(gbh, gbufsize);
1192 	zio_next_stage(zio);
1193 }
1194 
1195 static void
1196 zio_claim_gang_members(zio_t *zio)
1197 {
1198 	zio_gbh_phys_t *gbh;
1199 	uint64_t gsize, gbufsize;
1200 	int i;
1201 
1202 	ASSERT(BP_IS_GANG(zio->io_bp));
1203 
1204 	zio_gang_byteswap(zio);
1205 	zio_pop_transform(zio, (void **)&gbh, &gsize, &gbufsize);
1206 
1207 	for (i = 0; i < SPA_GBH_NBLKPTRS; i++) {
1208 		blkptr_t *gbp = &gbh->zg_blkptr[i];
1209 		if (BP_IS_HOLE(gbp))
1210 			continue;
1211 		zio_nowait(zio_claim(zio, zio->io_spa, zio->io_txg,
1212 		    gbp, NULL, NULL));
1213 	}
1214 
1215 	zio_buf_free(gbh, gbufsize);
1216 	zio_next_stage(zio);
1217 }
1218 
1219 static void
1220 zio_write_allocate_gang_member_done(zio_t *zio)
1221 {
1222 	zio_t *pio = zio->io_parent;
1223 	dva_t *cdva = zio->io_bp->blk_dva;
1224 	dva_t *pdva = pio->io_bp->blk_dva;
1225 	uint64_t asize;
1226 	int d;
1227 
1228 	ASSERT3U(pio->io_ndvas, ==, zio->io_ndvas);
1229 	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
1230 	ASSERT3U(zio->io_ndvas, <=, BP_GET_NDVAS(zio->io_bp));
1231 	ASSERT3U(pio->io_ndvas, <=, BP_GET_NDVAS(pio->io_bp));
1232 
1233 	mutex_enter(&pio->io_lock);
1234 	for (d = 0; d < BP_GET_NDVAS(pio->io_bp); d++) {
1235 		ASSERT(DVA_GET_GANG(&pdva[d]));
1236 		asize = DVA_GET_ASIZE(&pdva[d]);
1237 		asize += DVA_GET_ASIZE(&cdva[d]);
1238 		DVA_SET_ASIZE(&pdva[d], asize);
1239 	}
1240 	mutex_exit(&pio->io_lock);
1241 }
1242 
1243 static void
1244 zio_write_allocate_gang_members(zio_t *zio, metaslab_class_t *mc)
1245 {
1246 	blkptr_t *bp = zio->io_bp;
1247 	dva_t *dva = bp->blk_dva;
1248 	spa_t *spa = zio->io_spa;
1249 	zio_gbh_phys_t *gbh;
1250 	uint64_t txg = zio->io_txg;
1251 	uint64_t resid = zio->io_size;
1252 	uint64_t maxalloc = P2ROUNDUP(zio->io_size >> 1, SPA_MINBLOCKSIZE);
1253 	uint64_t gsize, loff, lsize;
1254 	uint32_t gbps_left;
1255 	int ndvas = zio->io_ndvas;
1256 	int gbh_ndvas = MIN(ndvas + 1, spa_max_replication(spa));
1257 	int error;
1258 	int i, d;
1259 
1260 	gsize = SPA_GANGBLOCKSIZE;
1261 	gbps_left = SPA_GBH_NBLKPTRS;
1262 
1263 	error = metaslab_alloc(spa, mc, gsize, bp, gbh_ndvas, txg, NULL,
1264 	    B_FALSE);
1265 	if (error == ENOSPC)
1266 		panic("can't allocate gang block header");
1267 	ASSERT(error == 0);
1268 
1269 	for (d = 0; d < gbh_ndvas; d++)
1270 		DVA_SET_GANG(&dva[d], 1);
1271 
1272 	bp->blk_birth = txg;
1273 
1274 	gbh = zio_buf_alloc(gsize);
1275 	bzero(gbh, gsize);
1276 
1277 	/* We need to test multi-level gang blocks */
1278 	if (maxalloc >= zio_gang_bang && (lbolt & 0x1) == 0)
1279 		maxalloc = MAX(maxalloc >> 2, SPA_MINBLOCKSIZE);
1280 
1281 	for (loff = 0, i = 0; loff != zio->io_size;
1282 	    loff += lsize, resid -= lsize, gbps_left--, i++) {
1283 		blkptr_t *gbp = &gbh->zg_blkptr[i];
1284 		dva = gbp->blk_dva;
1285 
1286 		ASSERT(gbps_left != 0);
1287 		maxalloc = MIN(maxalloc, resid);
1288 
1289 		while (resid <= maxalloc * gbps_left) {
1290 			error = metaslab_alloc(spa, mc, maxalloc, gbp, ndvas,
1291 			    txg, bp, B_FALSE);
1292 			if (error == 0)
1293 				break;
1294 			ASSERT3U(error, ==, ENOSPC);
1295 			if (maxalloc == SPA_MINBLOCKSIZE)
1296 				panic("really out of space");
1297 			maxalloc = P2ROUNDUP(maxalloc >> 1, SPA_MINBLOCKSIZE);
1298 		}
1299 
1300 		if (resid <= maxalloc * gbps_left) {
1301 			lsize = maxalloc;
1302 			BP_SET_LSIZE(gbp, lsize);
1303 			BP_SET_PSIZE(gbp, lsize);
1304 			BP_SET_COMPRESS(gbp, ZIO_COMPRESS_OFF);
1305 			gbp->blk_birth = txg;
1306 			zio_nowait(zio_rewrite(zio, spa,
1307 			    zio->io_checksum, txg, gbp,
1308 			    (char *)zio->io_data + loff, lsize,
1309 			    zio_write_allocate_gang_member_done, NULL,
1310 			    zio->io_priority, zio->io_flags,
1311 			    &zio->io_bookmark));
1312 		} else {
1313 			lsize = P2ROUNDUP(resid / gbps_left, SPA_MINBLOCKSIZE);
1314 			ASSERT(lsize != SPA_MINBLOCKSIZE);
1315 			zio_nowait(zio_write_allocate(zio, spa,
1316 			    zio->io_checksum, txg, gbp,
1317 			    (char *)zio->io_data + loff, lsize,
1318 			    zio_write_allocate_gang_member_done, NULL,
1319 			    zio->io_priority, zio->io_flags));
1320 		}
1321 	}
1322 
1323 	ASSERT(resid == 0 && loff == zio->io_size);
1324 
1325 	zio->io_pipeline |= 1U << ZIO_STAGE_GANG_CHECKSUM_GENERATE;
1326 
1327 	zio_push_transform(zio, gbh, gsize, gsize);
1328 	/*
1329 	 * As much as we'd like this to be zio_wait_children_ready(),
1330 	 * updating our ASIZE doesn't happen until the io_done callback,
1331 	 * so we have to wait for that to finish in order for our BP
1332 	 * to be stable.
1333 	 */
1334 	zio_wait_children_done(zio);
1335 }
1336 
1337 /*
1338  * ==========================================================================
1339  * Allocate and free blocks
1340  * ==========================================================================
1341  */
1342 static void
1343 zio_dva_allocate(zio_t *zio)
1344 {
1345 	spa_t *spa = zio->io_spa;
1346 	metaslab_class_t *mc = spa->spa_normal_class;
1347 	blkptr_t *bp = zio->io_bp;
1348 	int error;
1349 
1350 	ASSERT(BP_IS_HOLE(bp));
1351 	ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
1352 	ASSERT3U(zio->io_ndvas, >, 0);
1353 	ASSERT3U(zio->io_ndvas, <=, spa_max_replication(spa));
1354 
1355 	/* For testing, make some blocks above a certain size be gang blocks */
1356 	if (zio->io_size >= zio_gang_bang && (lbolt & 0x3) == 0) {
1357 		zio_write_allocate_gang_members(zio, mc);
1358 		return;
1359 	}
1360 
1361 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
1362 
1363 	error = metaslab_alloc(spa, mc, zio->io_size, bp, zio->io_ndvas,
1364 	    zio->io_txg, NULL, B_FALSE);
1365 
1366 	if (error == 0) {
1367 		bp->blk_birth = zio->io_txg;
1368 	} else if (error == ENOSPC) {
1369 		if (zio->io_size == SPA_MINBLOCKSIZE)
1370 			panic("really, truly out of space");
1371 		zio_write_allocate_gang_members(zio, mc);
1372 		return;
1373 	} else {
1374 		zio->io_error = error;
1375 	}
1376 	zio_next_stage(zio);
1377 }
1378 
1379 static void
1380 zio_dva_free(zio_t *zio)
1381 {
1382 	blkptr_t *bp = zio->io_bp;
1383 
1384 	metaslab_free(zio->io_spa, bp, zio->io_txg, B_FALSE);
1385 
1386 	BP_ZERO(bp);
1387 
1388 	zio_next_stage(zio);
1389 }
1390 
1391 static void
1392 zio_dva_claim(zio_t *zio)
1393 {
1394 	zio->io_error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
1395 
1396 	zio_next_stage(zio);
1397 }
1398 
1399 /*
1400  * ==========================================================================
1401  * Read and write to physical devices
1402  * ==========================================================================
1403  */
1404 
1405 static void
1406 zio_vdev_io_start(zio_t *zio)
1407 {
1408 	vdev_t *vd = zio->io_vd;
1409 	vdev_t *tvd = vd ? vd->vdev_top : NULL;
1410 	blkptr_t *bp = zio->io_bp;
1411 	uint64_t align;
1412 
1413 	if (vd == NULL) {
1414 		/* The mirror_ops handle multiple DVAs in a single BP */
1415 		vdev_mirror_ops.vdev_op_io_start(zio);
1416 		return;
1417 	}
1418 
1419 	align = 1ULL << tvd->vdev_ashift;
1420 
1421 	if (zio->io_retries == 0 && vd == tvd)
1422 		zio->io_flags |= ZIO_FLAG_FAILFAST;
1423 
1424 	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
1425 	    vd->vdev_children == 0) {
1426 		zio->io_flags |= ZIO_FLAG_PHYSICAL;
1427 		zio->io_offset += VDEV_LABEL_START_SIZE;
1428 	}
1429 
1430 	if (P2PHASE(zio->io_size, align) != 0) {
1431 		uint64_t asize = P2ROUNDUP(zio->io_size, align);
1432 		char *abuf = zio_buf_alloc(asize);
1433 		ASSERT(vd == tvd);
1434 		if (zio->io_type == ZIO_TYPE_WRITE) {
1435 			bcopy(zio->io_data, abuf, zio->io_size);
1436 			bzero(abuf + zio->io_size, asize - zio->io_size);
1437 		}
1438 		zio_push_transform(zio, abuf, asize, asize);
1439 		ASSERT(!(zio->io_flags & ZIO_FLAG_SUBBLOCK));
1440 		zio->io_flags |= ZIO_FLAG_SUBBLOCK;
1441 	}
1442 
1443 	ASSERT(P2PHASE(zio->io_offset, align) == 0);
1444 	ASSERT(P2PHASE(zio->io_size, align) == 0);
1445 	ASSERT(bp == NULL ||
1446 	    P2ROUNDUP(ZIO_GET_IOSIZE(zio), align) == zio->io_size);
1447 	ASSERT(zio->io_type != ZIO_TYPE_WRITE || (spa_mode & FWRITE));
1448 
1449 	vdev_io_start(zio);
1450 
1451 	/* zio_next_stage_async() gets called from io completion interrupt */
1452 }
1453 
1454 static void
1455 zio_vdev_io_done(zio_t *zio)
1456 {
1457 	if (zio->io_vd == NULL)
1458 		/* The mirror_ops handle multiple DVAs in a single BP */
1459 		vdev_mirror_ops.vdev_op_io_done(zio);
1460 	else
1461 		vdev_io_done(zio);
1462 }
1463 
1464 /* XXPOLICY */
1465 boolean_t
1466 zio_should_retry(zio_t *zio)
1467 {
1468 	vdev_t *vd = zio->io_vd;
1469 
1470 	if (zio->io_error == 0)
1471 		return (B_FALSE);
1472 	if (zio->io_delegate_list != NULL)
1473 		return (B_FALSE);
1474 	if (vd && vd != vd->vdev_top)
1475 		return (B_FALSE);
1476 	if (zio->io_flags & ZIO_FLAG_DONT_RETRY)
1477 		return (B_FALSE);
1478 	if (zio->io_retries > 0)
1479 		return (B_FALSE);
1480 
1481 	return (B_TRUE);
1482 }
1483 
1484 static void
1485 zio_vdev_io_assess(zio_t *zio)
1486 {
1487 	vdev_t *vd = zio->io_vd;
1488 	vdev_t *tvd = vd ? vd->vdev_top : NULL;
1489 
1490 	ASSERT(zio->io_vsd == NULL);
1491 
1492 	if (zio->io_flags & ZIO_FLAG_SUBBLOCK) {
1493 		void *abuf;
1494 		uint64_t asize;
1495 		ASSERT(vd == tvd);
1496 		zio_pop_transform(zio, &abuf, &asize, &asize);
1497 		if (zio->io_type == ZIO_TYPE_READ)
1498 			bcopy(abuf, zio->io_data, zio->io_size);
1499 		zio_buf_free(abuf, asize);
1500 		zio->io_flags &= ~ZIO_FLAG_SUBBLOCK;
1501 	}
1502 
1503 	if (zio_injection_enabled && !zio->io_error)
1504 		zio->io_error = zio_handle_fault_injection(zio, EIO);
1505 
1506 	/*
1507 	 * If the I/O failed, determine whether we should attempt to retry it.
1508 	 */
1509 	/* XXPOLICY */
1510 	if (zio_should_retry(zio)) {
1511 		ASSERT(tvd == vd);
1512 
1513 		zio->io_retries++;
1514 		zio->io_error = 0;
1515 		zio->io_flags &= ZIO_FLAG_VDEV_INHERIT |
1516 		    ZIO_FLAG_CONFIG_GRABBED;
1517 		/* XXPOLICY */
1518 		zio->io_flags &= ~ZIO_FLAG_FAILFAST;
1519 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1520 		zio->io_stage = ZIO_STAGE_VDEV_IO_START - 1;
1521 
1522 		dprintf("retry #%d for %s to %s offset %llx\n",
1523 		    zio->io_retries, zio_type_name[zio->io_type],
1524 		    vdev_description(vd), zio->io_offset);
1525 
1526 		zio_next_stage_async(zio);
1527 		return;
1528 	}
1529 
1530 	zio_next_stage(zio);
1531 }
1532 
1533 void
1534 zio_vdev_io_reissue(zio_t *zio)
1535 {
1536 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
1537 	ASSERT(zio->io_error == 0);
1538 
1539 	zio->io_stage--;
1540 }
1541 
1542 void
1543 zio_vdev_io_redone(zio_t *zio)
1544 {
1545 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
1546 
1547 	zio->io_stage--;
1548 }
1549 
1550 void
1551 zio_vdev_io_bypass(zio_t *zio)
1552 {
1553 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
1554 	ASSERT(zio->io_error == 0);
1555 
1556 	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
1557 	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS - 1;
1558 }
1559 
1560 /*
1561  * ==========================================================================
1562  * Generate and verify checksums
1563  * ==========================================================================
1564  */
1565 static void
1566 zio_checksum_generate(zio_t *zio)
1567 {
1568 	int checksum = zio->io_checksum;
1569 	blkptr_t *bp = zio->io_bp;
1570 
1571 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
1572 
1573 	BP_SET_CHECKSUM(bp, checksum);
1574 	BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1575 
1576 	zio_checksum(checksum, &bp->blk_cksum, zio->io_data, zio->io_size);
1577 
1578 	zio_next_stage(zio);
1579 }
1580 
1581 static void
1582 zio_gang_checksum_generate(zio_t *zio)
1583 {
1584 	zio_cksum_t zc;
1585 	zio_gbh_phys_t *gbh = zio->io_data;
1586 
1587 	ASSERT(BP_IS_GANG(zio->io_bp));
1588 	ASSERT3U(zio->io_size, ==, SPA_GANGBLOCKSIZE);
1589 
1590 	zio_set_gang_verifier(zio, &gbh->zg_tail.zbt_cksum);
1591 
1592 	zio_checksum(ZIO_CHECKSUM_GANG_HEADER, &zc, zio->io_data, zio->io_size);
1593 
1594 	zio_next_stage(zio);
1595 }
1596 
1597 static void
1598 zio_checksum_verify(zio_t *zio)
1599 {
1600 	if (zio->io_bp != NULL) {
1601 		zio->io_error = zio_checksum_error(zio);
1602 		if (zio->io_error && !(zio->io_flags & ZIO_FLAG_SPECULATIVE))
1603 			zfs_ereport_post(FM_EREPORT_ZFS_CHECKSUM,
1604 			    zio->io_spa, zio->io_vd, zio, 0, 0);
1605 	}
1606 
1607 	zio_next_stage(zio);
1608 }
1609 
1610 /*
1611  * Called by RAID-Z to ensure we don't compute the checksum twice.
1612  */
1613 void
1614 zio_checksum_verified(zio_t *zio)
1615 {
1616 	zio->io_pipeline &= ~(1U << ZIO_STAGE_CHECKSUM_VERIFY);
1617 }
1618 
1619 /*
1620  * Set the external verifier for a gang block based on stuff in the bp
1621  */
1622 void
1623 zio_set_gang_verifier(zio_t *zio, zio_cksum_t *zcp)
1624 {
1625 	blkptr_t *bp = zio->io_bp;
1626 
1627 	zcp->zc_word[0] = DVA_GET_VDEV(BP_IDENTITY(bp));
1628 	zcp->zc_word[1] = DVA_GET_OFFSET(BP_IDENTITY(bp));
1629 	zcp->zc_word[2] = bp->blk_birth;
1630 	zcp->zc_word[3] = 0;
1631 }
1632 
1633 /*
1634  * ==========================================================================
1635  * Define the pipeline
1636  * ==========================================================================
1637  */
1638 typedef void zio_pipe_stage_t(zio_t *zio);
1639 
1640 static void
1641 zio_badop(zio_t *zio)
1642 {
1643 	panic("Invalid I/O pipeline stage %u for zio %p", zio->io_stage, zio);
1644 }
1645 
1646 zio_pipe_stage_t *zio_pipeline[ZIO_STAGE_DONE + 2] = {
1647 	zio_badop,
1648 	zio_wait_children_ready,
1649 	zio_write_compress,
1650 	zio_checksum_generate,
1651 	zio_gang_pipeline,
1652 	zio_get_gang_header,
1653 	zio_rewrite_gang_members,
1654 	zio_free_gang_members,
1655 	zio_claim_gang_members,
1656 	zio_dva_allocate,
1657 	zio_dva_free,
1658 	zio_dva_claim,
1659 	zio_gang_checksum_generate,
1660 	zio_ready,
1661 	zio_vdev_io_start,
1662 	zio_vdev_io_done,
1663 	zio_vdev_io_assess,
1664 	zio_wait_children_done,
1665 	zio_checksum_verify,
1666 	zio_read_gang_members,
1667 	zio_read_decompress,
1668 	zio_done,
1669 	zio_badop
1670 };
1671 
1672 /*
1673  * Move an I/O to the next stage of the pipeline and execute that stage.
1674  * There's no locking on io_stage because there's no legitimate way for
1675  * multiple threads to be attempting to process the same I/O.
1676  */
1677 void
1678 zio_next_stage(zio_t *zio)
1679 {
1680 	uint32_t pipeline = zio->io_pipeline;
1681 
1682 	ASSERT(!MUTEX_HELD(&zio->io_lock));
1683 
1684 	if (zio->io_error) {
1685 		dprintf("zio %p vdev %s offset %llx stage %d error %d\n",
1686 		    zio, vdev_description(zio->io_vd),
1687 		    zio->io_offset, zio->io_stage, zio->io_error);
1688 		if (((1U << zio->io_stage) & ZIO_VDEV_IO_PIPELINE) == 0)
1689 			pipeline &= ZIO_ERROR_PIPELINE_MASK;
1690 	}
1691 
1692 	while (((1U << ++zio->io_stage) & pipeline) == 0)
1693 		continue;
1694 
1695 	ASSERT(zio->io_stage <= ZIO_STAGE_DONE);
1696 	ASSERT(zio->io_stalled == 0);
1697 
1698 	/*
1699 	 * See the comment in zio_next_stage_async() about per-CPU taskqs.
1700 	 */
1701 	if (((1U << zio->io_stage) & zio->io_async_stages) &&
1702 	    (zio->io_stage == ZIO_STAGE_WRITE_COMPRESS) &&
1703 	    !(zio->io_flags & ZIO_FLAG_METADATA)) {
1704 		taskq_t *tq = zio->io_spa->spa_zio_issue_taskq[zio->io_type];
1705 		(void) taskq_dispatch(tq,
1706 		    (task_func_t *)zio_pipeline[zio->io_stage], zio, TQ_SLEEP);
1707 	} else {
1708 		zio_pipeline[zio->io_stage](zio);
1709 	}
1710 }
1711 
1712 void
1713 zio_next_stage_async(zio_t *zio)
1714 {
1715 	taskq_t *tq;
1716 	uint32_t pipeline = zio->io_pipeline;
1717 
1718 	ASSERT(!MUTEX_HELD(&zio->io_lock));
1719 
1720 	if (zio->io_error) {
1721 		dprintf("zio %p vdev %s offset %llx stage %d error %d\n",
1722 		    zio, vdev_description(zio->io_vd),
1723 		    zio->io_offset, zio->io_stage, zio->io_error);
1724 		if (((1U << zio->io_stage) & ZIO_VDEV_IO_PIPELINE) == 0)
1725 			pipeline &= ZIO_ERROR_PIPELINE_MASK;
1726 	}
1727 
1728 	while (((1U << ++zio->io_stage) & pipeline) == 0)
1729 		continue;
1730 
1731 	ASSERT(zio->io_stage <= ZIO_STAGE_DONE);
1732 	ASSERT(zio->io_stalled == 0);
1733 
1734 	/*
1735 	 * For performance, we'll probably want two sets of task queues:
1736 	 * per-CPU issue taskqs and per-CPU completion taskqs.  The per-CPU
1737 	 * part is for read performance: since we have to make a pass over
1738 	 * the data to checksum it anyway, we want to do this on the same CPU
1739 	 * that issued the read, because (assuming CPU scheduling affinity)
1740 	 * that thread is probably still there.  Getting this optimization
1741 	 * right avoids performance-hostile cache-to-cache transfers.
1742 	 *
1743 	 * Note that having two sets of task queues is also necessary for
1744 	 * correctness: if all of the issue threads get bogged down waiting
1745 	 * for dependent reads (e.g. metaslab freelist) to complete, then
1746 	 * there won't be any threads available to service I/O completion
1747 	 * interrupts.
1748 	 */
1749 	if ((1U << zio->io_stage) & zio->io_async_stages) {
1750 		if (zio->io_stage < ZIO_STAGE_VDEV_IO_DONE)
1751 			tq = zio->io_spa->spa_zio_issue_taskq[zio->io_type];
1752 		else
1753 			tq = zio->io_spa->spa_zio_intr_taskq[zio->io_type];
1754 		(void) taskq_dispatch(tq,
1755 		    (task_func_t *)zio_pipeline[zio->io_stage], zio, TQ_SLEEP);
1756 	} else {
1757 		zio_pipeline[zio->io_stage](zio);
1758 	}
1759 }
1760 
1761 static boolean_t
1762 zio_alloc_should_fail(void)
1763 {
1764 	static uint16_t	allocs = 0;
1765 
1766 	return (P2PHASE(allocs++, 1U<<zio_zil_fail_shift) == 0);
1767 }
1768 
1769 /*
1770  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
1771  */
1772 int
1773 zio_alloc_blk(spa_t *spa, uint64_t size, blkptr_t *new_bp, blkptr_t *old_bp,
1774     uint64_t txg)
1775 {
1776 	int error;
1777 
1778 	spa_config_enter(spa, RW_READER, FTAG);
1779 
1780 	if (zio_zil_fail_shift && zio_alloc_should_fail()) {
1781 		spa_config_exit(spa, FTAG);
1782 		return (ENOSPC);
1783 	}
1784 
1785 	/*
1786 	 * We were passed the previous log block's DVA in bp->blk_dva[0].
1787 	 * We use that as a hint for which vdev to allocate from next.
1788 	 */
1789 	error = metaslab_alloc(spa, spa->spa_log_class, size,
1790 	    new_bp, 1, txg, old_bp, B_TRUE);
1791 
1792 	if (error)
1793 		error = metaslab_alloc(spa, spa->spa_normal_class, size,
1794 		    new_bp, 1, txg, old_bp, B_TRUE);
1795 
1796 	if (error == 0) {
1797 		BP_SET_LSIZE(new_bp, size);
1798 		BP_SET_PSIZE(new_bp, size);
1799 		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
1800 		BP_SET_CHECKSUM(new_bp, ZIO_CHECKSUM_ZILOG);
1801 		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
1802 		BP_SET_LEVEL(new_bp, 0);
1803 		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
1804 		new_bp->blk_birth = txg;
1805 	}
1806 
1807 	spa_config_exit(spa, FTAG);
1808 
1809 	return (error);
1810 }
1811 
1812 /*
1813  * Free an intent log block.  We know it can't be a gang block, so there's
1814  * nothing to do except metaslab_free() it.
1815  */
1816 void
1817 zio_free_blk(spa_t *spa, blkptr_t *bp, uint64_t txg)
1818 {
1819 	ASSERT(!BP_IS_GANG(bp));
1820 
1821 	spa_config_enter(spa, RW_READER, FTAG);
1822 
1823 	metaslab_free(spa, bp, txg, B_FALSE);
1824 
1825 	spa_config_exit(spa, FTAG);
1826 }
1827 
1828 /*
1829  * start an async flush of the write cache for this vdev
1830  */
1831 void
1832 zio_flush_vdev(spa_t *spa, uint64_t vdev, zio_t **zio)
1833 {
1834 	vdev_t *vd;
1835 
1836 	/*
1837 	 * Lock out configuration changes.
1838 	 */
1839 	spa_config_enter(spa, RW_READER, FTAG);
1840 
1841 	if (*zio == NULL)
1842 		*zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
1843 
1844 	vd = vdev_lookup_top(spa, vdev);
1845 	ASSERT(vd);
1846 
1847 	(void) zio_nowait(zio_ioctl(*zio, spa, vd, DKIOCFLUSHWRITECACHE,
1848 	    NULL, NULL, ZIO_PRIORITY_NOW,
1849 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY));
1850 
1851 	spa_config_exit(spa, FTAG);
1852 }
1853