xref: /illumos-gate/usr/src/uts/common/fs/zfs/zio.c (revision 5b6e8d437b064342671e0a40b3146d7f98802a64)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2018 by Delphix. All rights reserved.
24  * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
25  * Copyright (c) 2014 Integros [integros.com]
26  * Copyright (c) 2017, Intel Corporation.
27  */
28 
29 #include <sys/sysmacros.h>
30 #include <sys/zfs_context.h>
31 #include <sys/fm/fs/zfs.h>
32 #include <sys/spa.h>
33 #include <sys/txg.h>
34 #include <sys/spa_impl.h>
35 #include <sys/vdev_impl.h>
36 #include <sys/zio_impl.h>
37 #include <sys/zio_compress.h>
38 #include <sys/zio_checksum.h>
39 #include <sys/dmu_objset.h>
40 #include <sys/arc.h>
41 #include <sys/ddt.h>
42 #include <sys/blkptr.h>
43 #include <sys/zfeature.h>
44 #include <sys/dsl_scan.h>
45 #include <sys/metaslab_impl.h>
46 #include <sys/abd.h>
47 #include <sys/cityhash.h>
48 #include <sys/dsl_crypt.h>
49 
50 /*
51  * ==========================================================================
52  * I/O type descriptions
53  * ==========================================================================
54  */
55 const char *zio_type_name[ZIO_TYPES] = {
56 	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
57 	"zio_ioctl"
58 };
59 
60 boolean_t zio_dva_throttle_enabled = B_TRUE;
61 
62 /*
63  * ==========================================================================
64  * I/O kmem caches
65  * ==========================================================================
66  */
67 kmem_cache_t *zio_cache;
68 kmem_cache_t *zio_link_cache;
69 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
70 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
71 
72 #ifdef _KERNEL
73 extern vmem_t *zio_alloc_arena;
74 #endif
75 
76 #define	ZIO_PIPELINE_CONTINUE		0x100
77 #define	ZIO_PIPELINE_STOP		0x101
78 
79 #define	BP_SPANB(indblkshift, level) \
80 	(((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
81 #define	COMPARE_META_LEVEL	0x80000000ul
82 /*
83  * The following actions directly effect the spa's sync-to-convergence logic.
84  * The values below define the sync pass when we start performing the action.
85  * Care should be taken when changing these values as they directly impact
86  * spa_sync() performance. Tuning these values may introduce subtle performance
87  * pathologies and should only be done in the context of performance analysis.
88  * These tunables will eventually be removed and replaced with #defines once
89  * enough analysis has been done to determine optimal values.
90  *
91  * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
92  * regular blocks are not deferred.
93  */
94 int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
95 int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
96 int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
97 
98 /*
99  * An allocating zio is one that either currently has the DVA allocate
100  * stage set or will have it later in its lifetime.
101  */
102 #define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
103 
104 boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
105 
106 #ifdef ZFS_DEBUG
107 int zio_buf_debug_limit = 16384;
108 #else
109 int zio_buf_debug_limit = 0;
110 #endif
111 
112 static void zio_taskq_dispatch(zio_t *, zio_taskq_type_t, boolean_t);
113 
114 void
115 zio_init(void)
116 {
117 	size_t c;
118 	vmem_t *data_alloc_arena = NULL;
119 
120 #ifdef _KERNEL
121 	data_alloc_arena = zio_alloc_arena;
122 #endif
123 	zio_cache = kmem_cache_create("zio_cache",
124 	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
125 	zio_link_cache = kmem_cache_create("zio_link_cache",
126 	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
127 
128 	/*
129 	 * For small buffers, we want a cache for each multiple of
130 	 * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
131 	 * for each quarter-power of 2.
132 	 */
133 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
134 		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
135 		size_t p2 = size;
136 		size_t align = 0;
137 		size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
138 
139 		while (!ISP2(p2))
140 			p2 &= p2 - 1;
141 
142 #ifndef _KERNEL
143 		/*
144 		 * If we are using watchpoints, put each buffer on its own page,
145 		 * to eliminate the performance overhead of trapping to the
146 		 * kernel when modifying a non-watched buffer that shares the
147 		 * page with a watched buffer.
148 		 */
149 		if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
150 			continue;
151 #endif
152 		if (size <= 4 * SPA_MINBLOCKSIZE) {
153 			align = SPA_MINBLOCKSIZE;
154 		} else if (IS_P2ALIGNED(size, p2 >> 2)) {
155 			align = MIN(p2 >> 2, PAGESIZE);
156 		}
157 
158 		if (align != 0) {
159 			char name[36];
160 			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
161 			zio_buf_cache[c] = kmem_cache_create(name, size,
162 			    align, NULL, NULL, NULL, NULL, NULL, cflags);
163 
164 			/*
165 			 * Since zio_data bufs do not appear in crash dumps, we
166 			 * pass KMC_NOTOUCH so that no allocator metadata is
167 			 * stored with the buffers.
168 			 */
169 			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
170 			zio_data_buf_cache[c] = kmem_cache_create(name, size,
171 			    align, NULL, NULL, NULL, NULL, data_alloc_arena,
172 			    cflags | KMC_NOTOUCH);
173 		}
174 	}
175 
176 	while (--c != 0) {
177 		ASSERT(zio_buf_cache[c] != NULL);
178 		if (zio_buf_cache[c - 1] == NULL)
179 			zio_buf_cache[c - 1] = zio_buf_cache[c];
180 
181 		ASSERT(zio_data_buf_cache[c] != NULL);
182 		if (zio_data_buf_cache[c - 1] == NULL)
183 			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
184 	}
185 
186 	zio_inject_init();
187 }
188 
189 void
190 zio_fini(void)
191 {
192 	size_t c;
193 	kmem_cache_t *last_cache = NULL;
194 	kmem_cache_t *last_data_cache = NULL;
195 
196 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
197 		if (zio_buf_cache[c] != last_cache) {
198 			last_cache = zio_buf_cache[c];
199 			kmem_cache_destroy(zio_buf_cache[c]);
200 		}
201 		zio_buf_cache[c] = NULL;
202 
203 		if (zio_data_buf_cache[c] != last_data_cache) {
204 			last_data_cache = zio_data_buf_cache[c];
205 			kmem_cache_destroy(zio_data_buf_cache[c]);
206 		}
207 		zio_data_buf_cache[c] = NULL;
208 	}
209 
210 	kmem_cache_destroy(zio_link_cache);
211 	kmem_cache_destroy(zio_cache);
212 
213 	zio_inject_fini();
214 }
215 
216 /*
217  * ==========================================================================
218  * Allocate and free I/O buffers
219  * ==========================================================================
220  */
221 
222 /*
223  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
224  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
225  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
226  * excess / transient data in-core during a crashdump.
227  */
228 void *
229 zio_buf_alloc(size_t size)
230 {
231 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
232 
233 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
234 
235 	return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
236 }
237 
238 /*
239  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
240  * crashdump if the kernel panics.  This exists so that we will limit the amount
241  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
242  * of kernel heap dumped to disk when the kernel panics)
243  */
244 void *
245 zio_data_buf_alloc(size_t size)
246 {
247 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
248 
249 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
250 
251 	return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
252 }
253 
254 void
255 zio_buf_free(void *buf, size_t size)
256 {
257 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
258 
259 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
260 
261 	kmem_cache_free(zio_buf_cache[c], buf);
262 }
263 
264 void
265 zio_data_buf_free(void *buf, size_t size)
266 {
267 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
268 
269 	VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
270 
271 	kmem_cache_free(zio_data_buf_cache[c], buf);
272 }
273 
274 /* ARGSUSED */
275 static void
276 zio_abd_free(void *abd, size_t size)
277 {
278 	abd_free((abd_t *)abd);
279 }
280 
281 /*
282  * ==========================================================================
283  * Push and pop I/O transform buffers
284  * ==========================================================================
285  */
286 void
287 zio_push_transform(zio_t *zio, abd_t *data, uint64_t size, uint64_t bufsize,
288     zio_transform_func_t *transform)
289 {
290 	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
291 
292 	/*
293 	 * Ensure that anyone expecting this zio to contain a linear ABD isn't
294 	 * going to get a nasty surprise when they try to access the data.
295 	 */
296 	IMPLY(abd_is_linear(zio->io_abd), abd_is_linear(data));
297 
298 	zt->zt_orig_abd = zio->io_abd;
299 	zt->zt_orig_size = zio->io_size;
300 	zt->zt_bufsize = bufsize;
301 	zt->zt_transform = transform;
302 
303 	zt->zt_next = zio->io_transform_stack;
304 	zio->io_transform_stack = zt;
305 
306 	zio->io_abd = data;
307 	zio->io_size = size;
308 }
309 
310 void
311 zio_pop_transforms(zio_t *zio)
312 {
313 	zio_transform_t *zt;
314 
315 	while ((zt = zio->io_transform_stack) != NULL) {
316 		if (zt->zt_transform != NULL)
317 			zt->zt_transform(zio,
318 			    zt->zt_orig_abd, zt->zt_orig_size);
319 
320 		if (zt->zt_bufsize != 0)
321 			abd_free(zio->io_abd);
322 
323 		zio->io_abd = zt->zt_orig_abd;
324 		zio->io_size = zt->zt_orig_size;
325 		zio->io_transform_stack = zt->zt_next;
326 
327 		kmem_free(zt, sizeof (zio_transform_t));
328 	}
329 }
330 
331 /*
332  * ==========================================================================
333  * I/O transform callbacks for subblocks, decompression, and decryption
334  * ==========================================================================
335  */
336 static void
337 zio_subblock(zio_t *zio, abd_t *data, uint64_t size)
338 {
339 	ASSERT(zio->io_size > size);
340 
341 	if (zio->io_type == ZIO_TYPE_READ)
342 		abd_copy(data, zio->io_abd, size);
343 }
344 
345 static void
346 zio_decompress(zio_t *zio, abd_t *data, uint64_t size)
347 {
348 	if (zio->io_error == 0) {
349 		void *tmp = abd_borrow_buf(data, size);
350 		int ret = zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
351 		    zio->io_abd, tmp, zio->io_size, size);
352 		abd_return_buf_copy(data, tmp, size);
353 
354 		if (ret != 0)
355 			zio->io_error = SET_ERROR(EIO);
356 	}
357 }
358 
359 static void
360 zio_decrypt(zio_t *zio, abd_t *data, uint64_t size)
361 {
362 	int ret;
363 	void *tmp;
364 	blkptr_t *bp = zio->io_bp;
365 	spa_t *spa = zio->io_spa;
366 	uint64_t dsobj = zio->io_bookmark.zb_objset;
367 	uint64_t lsize = BP_GET_LSIZE(bp);
368 	dmu_object_type_t ot = BP_GET_TYPE(bp);
369 	uint8_t salt[ZIO_DATA_SALT_LEN];
370 	uint8_t iv[ZIO_DATA_IV_LEN];
371 	uint8_t mac[ZIO_DATA_MAC_LEN];
372 	boolean_t no_crypt = B_FALSE;
373 
374 	ASSERT(BP_USES_CRYPT(bp));
375 	ASSERT3U(size, !=, 0);
376 
377 	if (zio->io_error != 0)
378 		return;
379 
380 	/*
381 	 * Verify the cksum of MACs stored in an indirect bp. It will always
382 	 * be possible to verify this since it does not require an encryption
383 	 * key.
384 	 */
385 	if (BP_HAS_INDIRECT_MAC_CKSUM(bp)) {
386 		zio_crypt_decode_mac_bp(bp, mac);
387 
388 		if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF) {
389 			/*
390 			 * We haven't decompressed the data yet, but
391 			 * zio_crypt_do_indirect_mac_checksum() requires
392 			 * decompressed data to be able to parse out the MACs
393 			 * from the indirect block. We decompress it now and
394 			 * throw away the result after we are finished.
395 			 */
396 			tmp = zio_buf_alloc(lsize);
397 			ret = zio_decompress_data(BP_GET_COMPRESS(bp),
398 			    zio->io_abd, tmp, zio->io_size, lsize);
399 			if (ret != 0) {
400 				ret = SET_ERROR(EIO);
401 				goto error;
402 			}
403 			ret = zio_crypt_do_indirect_mac_checksum(B_FALSE,
404 			    tmp, lsize, BP_SHOULD_BYTESWAP(bp), mac);
405 			zio_buf_free(tmp, lsize);
406 		} else {
407 			ret = zio_crypt_do_indirect_mac_checksum_abd(B_FALSE,
408 			    zio->io_abd, size, BP_SHOULD_BYTESWAP(bp), mac);
409 		}
410 		abd_copy(data, zio->io_abd, size);
411 
412 		if (ret != 0)
413 			goto error;
414 
415 		return;
416 	}
417 
418 	/*
419 	 * If this is an authenticated block, just check the MAC. It would be
420 	 * nice to separate this out into its own flag, but for the moment
421 	 * enum zio_flag is out of bits.
422 	 */
423 	if (BP_IS_AUTHENTICATED(bp)) {
424 		if (ot == DMU_OT_OBJSET) {
425 			ret = spa_do_crypt_objset_mac_abd(B_FALSE, spa,
426 			    dsobj, zio->io_abd, size, BP_SHOULD_BYTESWAP(bp));
427 		} else {
428 			zio_crypt_decode_mac_bp(bp, mac);
429 			ret = spa_do_crypt_mac_abd(B_FALSE, spa, dsobj,
430 			    zio->io_abd, size, mac);
431 		}
432 		abd_copy(data, zio->io_abd, size);
433 
434 		if (zio_injection_enabled && ot != DMU_OT_DNODE && ret == 0) {
435 			ret = zio_handle_decrypt_injection(spa,
436 			    &zio->io_bookmark, ot, ECKSUM);
437 		}
438 		if (ret != 0)
439 			goto error;
440 
441 		return;
442 	}
443 
444 	zio_crypt_decode_params_bp(bp, salt, iv);
445 
446 	if (ot == DMU_OT_INTENT_LOG) {
447 		tmp = abd_borrow_buf_copy(zio->io_abd, sizeof (zil_chain_t));
448 		zio_crypt_decode_mac_zil(tmp, mac);
449 		abd_return_buf(zio->io_abd, tmp, sizeof (zil_chain_t));
450 	} else {
451 		zio_crypt_decode_mac_bp(bp, mac);
452 	}
453 
454 	ret = spa_do_crypt_abd(B_FALSE, spa, &zio->io_bookmark, BP_GET_TYPE(bp),
455 	    BP_GET_DEDUP(bp), BP_SHOULD_BYTESWAP(bp), salt, iv, mac, size, data,
456 	    zio->io_abd, &no_crypt);
457 	if (no_crypt)
458 		abd_copy(data, zio->io_abd, size);
459 
460 	if (ret != 0)
461 		goto error;
462 
463 	return;
464 
465 error:
466 	/* assert that the key was found unless this was speculative */
467 	ASSERT(ret != EACCES || (zio->io_flags & ZIO_FLAG_SPECULATIVE));
468 
469 	/*
470 	 * If there was a decryption / authentication error return EIO as
471 	 * the io_error. If this was not a speculative zio, create an ereport.
472 	 */
473 	if (ret == ECKSUM) {
474 		zio->io_error = SET_ERROR(EIO);
475 		if ((zio->io_flags & ZIO_FLAG_SPECULATIVE) == 0) {
476 			spa_log_error(spa, &zio->io_bookmark);
477 			zfs_ereport_post(FM_EREPORT_ZFS_AUTHENTICATION,
478 			    spa, NULL, &zio->io_bookmark, zio, 0, 0);
479 		}
480 	} else {
481 		zio->io_error = ret;
482 	}
483 }
484 
485 /*
486  * ==========================================================================
487  * I/O parent/child relationships and pipeline interlocks
488  * ==========================================================================
489  */
490 zio_t *
491 zio_walk_parents(zio_t *cio, zio_link_t **zl)
492 {
493 	list_t *pl = &cio->io_parent_list;
494 
495 	*zl = (*zl == NULL) ? list_head(pl) : list_next(pl, *zl);
496 	if (*zl == NULL)
497 		return (NULL);
498 
499 	ASSERT((*zl)->zl_child == cio);
500 	return ((*zl)->zl_parent);
501 }
502 
503 zio_t *
504 zio_walk_children(zio_t *pio, zio_link_t **zl)
505 {
506 	list_t *cl = &pio->io_child_list;
507 
508 	ASSERT(MUTEX_HELD(&pio->io_lock));
509 
510 	*zl = (*zl == NULL) ? list_head(cl) : list_next(cl, *zl);
511 	if (*zl == NULL)
512 		return (NULL);
513 
514 	ASSERT((*zl)->zl_parent == pio);
515 	return ((*zl)->zl_child);
516 }
517 
518 zio_t *
519 zio_unique_parent(zio_t *cio)
520 {
521 	zio_link_t *zl = NULL;
522 	zio_t *pio = zio_walk_parents(cio, &zl);
523 
524 	VERIFY3P(zio_walk_parents(cio, &zl), ==, NULL);
525 	return (pio);
526 }
527 
528 void
529 zio_add_child(zio_t *pio, zio_t *cio)
530 {
531 	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
532 
533 	/*
534 	 * Logical I/Os can have logical, gang, or vdev children.
535 	 * Gang I/Os can have gang or vdev children.
536 	 * Vdev I/Os can only have vdev children.
537 	 * The following ASSERT captures all of these constraints.
538 	 */
539 	ASSERT3S(cio->io_child_type, <=, pio->io_child_type);
540 
541 	zl->zl_parent = pio;
542 	zl->zl_child = cio;
543 
544 	mutex_enter(&pio->io_lock);
545 	mutex_enter(&cio->io_lock);
546 
547 	ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
548 
549 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
550 		pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
551 
552 	list_insert_head(&pio->io_child_list, zl);
553 	list_insert_head(&cio->io_parent_list, zl);
554 
555 	pio->io_child_count++;
556 	cio->io_parent_count++;
557 
558 	mutex_exit(&cio->io_lock);
559 	mutex_exit(&pio->io_lock);
560 }
561 
562 static void
563 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
564 {
565 	ASSERT(zl->zl_parent == pio);
566 	ASSERT(zl->zl_child == cio);
567 
568 	mutex_enter(&pio->io_lock);
569 	mutex_enter(&cio->io_lock);
570 
571 	list_remove(&pio->io_child_list, zl);
572 	list_remove(&cio->io_parent_list, zl);
573 
574 	pio->io_child_count--;
575 	cio->io_parent_count--;
576 
577 	mutex_exit(&cio->io_lock);
578 	mutex_exit(&pio->io_lock);
579 
580 	kmem_cache_free(zio_link_cache, zl);
581 }
582 
583 static boolean_t
584 zio_wait_for_children(zio_t *zio, uint8_t childbits, enum zio_wait_type wait)
585 {
586 	boolean_t waiting = B_FALSE;
587 
588 	mutex_enter(&zio->io_lock);
589 	ASSERT(zio->io_stall == NULL);
590 	for (int c = 0; c < ZIO_CHILD_TYPES; c++) {
591 		if (!(ZIO_CHILD_BIT_IS_SET(childbits, c)))
592 			continue;
593 
594 		uint64_t *countp = &zio->io_children[c][wait];
595 		if (*countp != 0) {
596 			zio->io_stage >>= 1;
597 			ASSERT3U(zio->io_stage, !=, ZIO_STAGE_OPEN);
598 			zio->io_stall = countp;
599 			waiting = B_TRUE;
600 			break;
601 		}
602 	}
603 	mutex_exit(&zio->io_lock);
604 	return (waiting);
605 }
606 
607 static void
608 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
609 {
610 	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
611 	int *errorp = &pio->io_child_error[zio->io_child_type];
612 
613 	mutex_enter(&pio->io_lock);
614 	if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
615 		*errorp = zio_worst_error(*errorp, zio->io_error);
616 	pio->io_reexecute |= zio->io_reexecute;
617 	ASSERT3U(*countp, >, 0);
618 
619 	(*countp)--;
620 
621 	if (*countp == 0 && pio->io_stall == countp) {
622 		zio_taskq_type_t type =
623 		    pio->io_stage < ZIO_STAGE_VDEV_IO_START ? ZIO_TASKQ_ISSUE :
624 		    ZIO_TASKQ_INTERRUPT;
625 		pio->io_stall = NULL;
626 		mutex_exit(&pio->io_lock);
627 		/*
628 		 * Dispatch the parent zio in its own taskq so that
629 		 * the child can continue to make progress. This also
630 		 * prevents overflowing the stack when we have deeply nested
631 		 * parent-child relationships.
632 		 */
633 		zio_taskq_dispatch(pio, type, B_FALSE);
634 	} else {
635 		mutex_exit(&pio->io_lock);
636 	}
637 }
638 
639 static void
640 zio_inherit_child_errors(zio_t *zio, enum zio_child c)
641 {
642 	if (zio->io_child_error[c] != 0 && zio->io_error == 0)
643 		zio->io_error = zio->io_child_error[c];
644 }
645 
646 int
647 zio_bookmark_compare(const void *x1, const void *x2)
648 {
649 	const zio_t *z1 = x1;
650 	const zio_t *z2 = x2;
651 
652 	if (z1->io_bookmark.zb_objset < z2->io_bookmark.zb_objset)
653 		return (-1);
654 	if (z1->io_bookmark.zb_objset > z2->io_bookmark.zb_objset)
655 		return (1);
656 
657 	if (z1->io_bookmark.zb_object < z2->io_bookmark.zb_object)
658 		return (-1);
659 	if (z1->io_bookmark.zb_object > z2->io_bookmark.zb_object)
660 		return (1);
661 
662 	if (z1->io_bookmark.zb_level < z2->io_bookmark.zb_level)
663 		return (-1);
664 	if (z1->io_bookmark.zb_level > z2->io_bookmark.zb_level)
665 		return (1);
666 
667 	if (z1->io_bookmark.zb_blkid < z2->io_bookmark.zb_blkid)
668 		return (-1);
669 	if (z1->io_bookmark.zb_blkid > z2->io_bookmark.zb_blkid)
670 		return (1);
671 
672 	if (z1 < z2)
673 		return (-1);
674 	if (z1 > z2)
675 		return (1);
676 
677 	return (0);
678 }
679 
680 /*
681  * ==========================================================================
682  * Create the various types of I/O (read, write, free, etc)
683  * ==========================================================================
684  */
685 static zio_t *
686 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
687     abd_t *data, uint64_t lsize, uint64_t psize, zio_done_func_t *done,
688     void *private, zio_type_t type, zio_priority_t priority,
689     enum zio_flag flags, vdev_t *vd, uint64_t offset,
690     const zbookmark_phys_t *zb, enum zio_stage stage, enum zio_stage pipeline)
691 {
692 	zio_t *zio;
693 
694 	ASSERT3U(psize, <=, SPA_MAXBLOCKSIZE);
695 	ASSERT(P2PHASE(psize, SPA_MINBLOCKSIZE) == 0);
696 	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
697 
698 	ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
699 	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
700 	ASSERT(vd || stage == ZIO_STAGE_OPEN);
701 
702 	IMPLY(lsize != psize, (flags & ZIO_FLAG_RAW_COMPRESS) != 0);
703 
704 	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
705 	bzero(zio, sizeof (zio_t));
706 
707 	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
708 	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
709 
710 	list_create(&zio->io_parent_list, sizeof (zio_link_t),
711 	    offsetof(zio_link_t, zl_parent_node));
712 	list_create(&zio->io_child_list, sizeof (zio_link_t),
713 	    offsetof(zio_link_t, zl_child_node));
714 	metaslab_trace_init(&zio->io_alloc_list);
715 
716 	if (vd != NULL)
717 		zio->io_child_type = ZIO_CHILD_VDEV;
718 	else if (flags & ZIO_FLAG_GANG_CHILD)
719 		zio->io_child_type = ZIO_CHILD_GANG;
720 	else if (flags & ZIO_FLAG_DDT_CHILD)
721 		zio->io_child_type = ZIO_CHILD_DDT;
722 	else
723 		zio->io_child_type = ZIO_CHILD_LOGICAL;
724 
725 	if (bp != NULL) {
726 		zio->io_bp = (blkptr_t *)bp;
727 		zio->io_bp_copy = *bp;
728 		zio->io_bp_orig = *bp;
729 		if (type != ZIO_TYPE_WRITE ||
730 		    zio->io_child_type == ZIO_CHILD_DDT)
731 			zio->io_bp = &zio->io_bp_copy;	/* so caller can free */
732 		if (zio->io_child_type == ZIO_CHILD_LOGICAL)
733 			zio->io_logical = zio;
734 		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
735 			pipeline |= ZIO_GANG_STAGES;
736 	}
737 
738 	zio->io_spa = spa;
739 	zio->io_txg = txg;
740 	zio->io_done = done;
741 	zio->io_private = private;
742 	zio->io_type = type;
743 	zio->io_priority = priority;
744 	zio->io_vd = vd;
745 	zio->io_offset = offset;
746 	zio->io_orig_abd = zio->io_abd = data;
747 	zio->io_orig_size = zio->io_size = psize;
748 	zio->io_lsize = lsize;
749 	zio->io_orig_flags = zio->io_flags = flags;
750 	zio->io_orig_stage = zio->io_stage = stage;
751 	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
752 	zio->io_pipeline_trace = ZIO_STAGE_OPEN;
753 
754 	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
755 	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
756 
757 	if (zb != NULL)
758 		zio->io_bookmark = *zb;
759 
760 	if (pio != NULL) {
761 		if (zio->io_metaslab_class == NULL)
762 			zio->io_metaslab_class = pio->io_metaslab_class;
763 		if (zio->io_logical == NULL)
764 			zio->io_logical = pio->io_logical;
765 		if (zio->io_child_type == ZIO_CHILD_GANG)
766 			zio->io_gang_leader = pio->io_gang_leader;
767 		zio_add_child(pio, zio);
768 	}
769 
770 	return (zio);
771 }
772 
773 static void
774 zio_destroy(zio_t *zio)
775 {
776 	metaslab_trace_fini(&zio->io_alloc_list);
777 	list_destroy(&zio->io_parent_list);
778 	list_destroy(&zio->io_child_list);
779 	mutex_destroy(&zio->io_lock);
780 	cv_destroy(&zio->io_cv);
781 	kmem_cache_free(zio_cache, zio);
782 }
783 
784 zio_t *
785 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
786     void *private, enum zio_flag flags)
787 {
788 	zio_t *zio;
789 
790 	zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
791 	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
792 	    ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
793 
794 	return (zio);
795 }
796 
797 zio_t *
798 zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
799 {
800 	return (zio_null(NULL, spa, NULL, done, private, flags));
801 }
802 
803 void
804 zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp)
805 {
806 	if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
807 		zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
808 		    bp, (longlong_t)BP_GET_TYPE(bp));
809 	}
810 	if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||
811 	    BP_GET_CHECKSUM(bp) <= ZIO_CHECKSUM_ON) {
812 		zfs_panic_recover("blkptr at %p has invalid CHECKSUM %llu",
813 		    bp, (longlong_t)BP_GET_CHECKSUM(bp));
814 	}
815 	if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS ||
816 	    BP_GET_COMPRESS(bp) <= ZIO_COMPRESS_ON) {
817 		zfs_panic_recover("blkptr at %p has invalid COMPRESS %llu",
818 		    bp, (longlong_t)BP_GET_COMPRESS(bp));
819 	}
820 	if (BP_GET_LSIZE(bp) > SPA_MAXBLOCKSIZE) {
821 		zfs_panic_recover("blkptr at %p has invalid LSIZE %llu",
822 		    bp, (longlong_t)BP_GET_LSIZE(bp));
823 	}
824 	if (BP_GET_PSIZE(bp) > SPA_MAXBLOCKSIZE) {
825 		zfs_panic_recover("blkptr at %p has invalid PSIZE %llu",
826 		    bp, (longlong_t)BP_GET_PSIZE(bp));
827 	}
828 
829 	if (BP_IS_EMBEDDED(bp)) {
830 		if (BPE_GET_ETYPE(bp) > NUM_BP_EMBEDDED_TYPES) {
831 			zfs_panic_recover("blkptr at %p has invalid ETYPE %llu",
832 			    bp, (longlong_t)BPE_GET_ETYPE(bp));
833 		}
834 	}
835 
836 	/*
837 	 * Do not verify individual DVAs if the config is not trusted. This
838 	 * will be done once the zio is executed in vdev_mirror_map_alloc.
839 	 */
840 	if (!spa->spa_trust_config)
841 		return;
842 
843 	/*
844 	 * Pool-specific checks.
845 	 *
846 	 * Note: it would be nice to verify that the blk_birth and
847 	 * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
848 	 * allows the birth time of log blocks (and dmu_sync()-ed blocks
849 	 * that are in the log) to be arbitrarily large.
850 	 */
851 	for (int i = 0; i < BP_GET_NDVAS(bp); i++) {
852 		uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
853 		if (vdevid >= spa->spa_root_vdev->vdev_children) {
854 			zfs_panic_recover("blkptr at %p DVA %u has invalid "
855 			    "VDEV %llu",
856 			    bp, i, (longlong_t)vdevid);
857 			continue;
858 		}
859 		vdev_t *vd = spa->spa_root_vdev->vdev_child[vdevid];
860 		if (vd == NULL) {
861 			zfs_panic_recover("blkptr at %p DVA %u has invalid "
862 			    "VDEV %llu",
863 			    bp, i, (longlong_t)vdevid);
864 			continue;
865 		}
866 		if (vd->vdev_ops == &vdev_hole_ops) {
867 			zfs_panic_recover("blkptr at %p DVA %u has hole "
868 			    "VDEV %llu",
869 			    bp, i, (longlong_t)vdevid);
870 			continue;
871 		}
872 		if (vd->vdev_ops == &vdev_missing_ops) {
873 			/*
874 			 * "missing" vdevs are valid during import, but we
875 			 * don't have their detailed info (e.g. asize), so
876 			 * we can't perform any more checks on them.
877 			 */
878 			continue;
879 		}
880 		uint64_t offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
881 		uint64_t asize = DVA_GET_ASIZE(&bp->blk_dva[i]);
882 		if (BP_IS_GANG(bp))
883 			asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
884 		if (offset + asize > vd->vdev_asize) {
885 			zfs_panic_recover("blkptr at %p DVA %u has invalid "
886 			    "OFFSET %llu",
887 			    bp, i, (longlong_t)offset);
888 		}
889 	}
890 }
891 
892 boolean_t
893 zfs_dva_valid(spa_t *spa, const dva_t *dva, const blkptr_t *bp)
894 {
895 	uint64_t vdevid = DVA_GET_VDEV(dva);
896 
897 	if (vdevid >= spa->spa_root_vdev->vdev_children)
898 		return (B_FALSE);
899 
900 	vdev_t *vd = spa->spa_root_vdev->vdev_child[vdevid];
901 	if (vd == NULL)
902 		return (B_FALSE);
903 
904 	if (vd->vdev_ops == &vdev_hole_ops)
905 		return (B_FALSE);
906 
907 	if (vd->vdev_ops == &vdev_missing_ops) {
908 		return (B_FALSE);
909 	}
910 
911 	uint64_t offset = DVA_GET_OFFSET(dva);
912 	uint64_t asize = DVA_GET_ASIZE(dva);
913 
914 	if (BP_IS_GANG(bp))
915 		asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
916 	if (offset + asize > vd->vdev_asize)
917 		return (B_FALSE);
918 
919 	return (B_TRUE);
920 }
921 
922 zio_t *
923 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
924     abd_t *data, uint64_t size, zio_done_func_t *done, void *private,
925     zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
926 {
927 	zio_t *zio;
928 
929 	zfs_blkptr_verify(spa, bp);
930 
931 	zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
932 	    data, size, size, done, private,
933 	    ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
934 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
935 	    ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
936 
937 	return (zio);
938 }
939 
940 zio_t *
941 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
942     abd_t *data, uint64_t lsize, uint64_t psize, const zio_prop_t *zp,
943     zio_done_func_t *ready, zio_done_func_t *children_ready,
944     zio_done_func_t *physdone, zio_done_func_t *done,
945     void *private, zio_priority_t priority, enum zio_flag flags,
946     const zbookmark_phys_t *zb)
947 {
948 	zio_t *zio;
949 
950 	ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
951 	    zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
952 	    zp->zp_compress >= ZIO_COMPRESS_OFF &&
953 	    zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
954 	    DMU_OT_IS_VALID(zp->zp_type) &&
955 	    zp->zp_level < 32 &&
956 	    zp->zp_copies > 0 &&
957 	    zp->zp_copies <= spa_max_replication(spa));
958 
959 	zio = zio_create(pio, spa, txg, bp, data, lsize, psize, done, private,
960 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
961 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
962 	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
963 
964 	zio->io_ready = ready;
965 	zio->io_children_ready = children_ready;
966 	zio->io_physdone = physdone;
967 	zio->io_prop = *zp;
968 
969 	/*
970 	 * Data can be NULL if we are going to call zio_write_override() to
971 	 * provide the already-allocated BP.  But we may need the data to
972 	 * verify a dedup hit (if requested).  In this case, don't try to
973 	 * dedup (just take the already-allocated BP verbatim). Encrypted
974 	 * dedup blocks need data as well so we also disable dedup in this
975 	 * case.
976 	 */
977 	if (data == NULL &&
978 	    (zio->io_prop.zp_dedup_verify || zio->io_prop.zp_encrypt)) {
979 		zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
980 	}
981 
982 	return (zio);
983 }
984 
985 zio_t *
986 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, abd_t *data,
987     uint64_t size, zio_done_func_t *done, void *private,
988     zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
989 {
990 	zio_t *zio;
991 
992 	zio = zio_create(pio, spa, txg, bp, data, size, size, done, private,
993 	    ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_IO_REWRITE, NULL, 0, zb,
994 	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
995 
996 	return (zio);
997 }
998 
999 void
1000 zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
1001 {
1002 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
1003 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1004 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
1005 	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
1006 
1007 	/*
1008 	 * We must reset the io_prop to match the values that existed
1009 	 * when the bp was first written by dmu_sync() keeping in mind
1010 	 * that nopwrite and dedup are mutually exclusive.
1011 	 */
1012 	zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
1013 	zio->io_prop.zp_nopwrite = nopwrite;
1014 	zio->io_prop.zp_copies = copies;
1015 	zio->io_bp_override = bp;
1016 }
1017 
1018 void
1019 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
1020 {
1021 
1022 	zfs_blkptr_verify(spa, bp);
1023 
1024 	/*
1025 	 * The check for EMBEDDED is a performance optimization.  We
1026 	 * process the free here (by ignoring it) rather than
1027 	 * putting it on the list and then processing it in zio_free_sync().
1028 	 */
1029 	if (BP_IS_EMBEDDED(bp))
1030 		return;
1031 	metaslab_check_free(spa, bp);
1032 
1033 	/*
1034 	 * Frees that are for the currently-syncing txg, are not going to be
1035 	 * deferred, and which will not need to do a read (i.e. not GANG or
1036 	 * DEDUP), can be processed immediately.  Otherwise, put them on the
1037 	 * in-memory list for later processing.
1038 	 */
1039 	if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp) ||
1040 	    txg != spa->spa_syncing_txg ||
1041 	    spa_sync_pass(spa) >= zfs_sync_pass_deferred_free) {
1042 		bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
1043 	} else {
1044 		VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp, 0)));
1045 	}
1046 }
1047 
1048 zio_t *
1049 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
1050     enum zio_flag flags)
1051 {
1052 	zio_t *zio;
1053 	enum zio_stage stage = ZIO_FREE_PIPELINE;
1054 
1055 	ASSERT(!BP_IS_HOLE(bp));
1056 	ASSERT(spa_syncing_txg(spa) == txg);
1057 	ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
1058 
1059 	if (BP_IS_EMBEDDED(bp))
1060 		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
1061 
1062 	metaslab_check_free(spa, bp);
1063 	arc_freed(spa, bp);
1064 	dsl_scan_freed(spa, bp);
1065 
1066 	/*
1067 	 * GANG and DEDUP blocks can induce a read (for the gang block header,
1068 	 * or the DDT), so issue them asynchronously so that this thread is
1069 	 * not tied up.
1070 	 */
1071 	if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
1072 		stage |= ZIO_STAGE_ISSUE_ASYNC;
1073 
1074 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
1075 	    BP_GET_PSIZE(bp), NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW,
1076 	    flags, NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
1077 
1078 	return (zio);
1079 }
1080 
1081 zio_t *
1082 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
1083     zio_done_func_t *done, void *private, enum zio_flag flags)
1084 {
1085 	zio_t *zio;
1086 
1087 	zfs_blkptr_verify(spa, bp);
1088 
1089 	if (BP_IS_EMBEDDED(bp))
1090 		return (zio_null(pio, spa, NULL, NULL, NULL, 0));
1091 
1092 	/*
1093 	 * A claim is an allocation of a specific block.  Claims are needed
1094 	 * to support immediate writes in the intent log.  The issue is that
1095 	 * immediate writes contain committed data, but in a txg that was
1096 	 * *not* committed.  Upon opening the pool after an unclean shutdown,
1097 	 * the intent log claims all blocks that contain immediate write data
1098 	 * so that the SPA knows they're in use.
1099 	 *
1100 	 * All claims *must* be resolved in the first txg -- before the SPA
1101 	 * starts allocating blocks -- so that nothing is allocated twice.
1102 	 * If txg == 0 we just verify that the block is claimable.
1103 	 */
1104 	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <,
1105 	    spa_min_claim_txg(spa));
1106 	ASSERT(txg == spa_min_claim_txg(spa) || txg == 0);
1107 	ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));	/* zdb(1M) */
1108 
1109 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
1110 	    BP_GET_PSIZE(bp), done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW,
1111 	    flags, NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
1112 	ASSERT0(zio->io_queued_timestamp);
1113 
1114 	return (zio);
1115 }
1116 
1117 zio_t *
1118 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
1119     zio_done_func_t *done, void *private, enum zio_flag flags)
1120 {
1121 	zio_t *zio;
1122 	int c;
1123 
1124 	if (vd->vdev_children == 0) {
1125 		zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
1126 		    ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
1127 		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
1128 
1129 		zio->io_cmd = cmd;
1130 	} else {
1131 		zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
1132 
1133 		for (c = 0; c < vd->vdev_children; c++)
1134 			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
1135 			    done, private, flags));
1136 	}
1137 
1138 	return (zio);
1139 }
1140 
1141 zio_t *
1142 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
1143     abd_t *data, int checksum, zio_done_func_t *done, void *private,
1144     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
1145 {
1146 	zio_t *zio;
1147 
1148 	ASSERT(vd->vdev_children == 0);
1149 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
1150 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
1151 	ASSERT3U(offset + size, <=, vd->vdev_psize);
1152 
1153 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, size, done,
1154 	    private, ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd,
1155 	    offset, NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
1156 
1157 	zio->io_prop.zp_checksum = checksum;
1158 
1159 	return (zio);
1160 }
1161 
1162 zio_t *
1163 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
1164     abd_t *data, int checksum, zio_done_func_t *done, void *private,
1165     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
1166 {
1167 	zio_t *zio;
1168 
1169 	ASSERT(vd->vdev_children == 0);
1170 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
1171 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
1172 	ASSERT3U(offset + size, <=, vd->vdev_psize);
1173 
1174 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, size, done,
1175 	    private, ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd,
1176 	    offset, NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
1177 
1178 	zio->io_prop.zp_checksum = checksum;
1179 
1180 	if (zio_checksum_table[checksum].ci_flags & ZCHECKSUM_FLAG_EMBEDDED) {
1181 		/*
1182 		 * zec checksums are necessarily destructive -- they modify
1183 		 * the end of the write buffer to hold the verifier/checksum.
1184 		 * Therefore, we must make a local copy in case the data is
1185 		 * being written to multiple places in parallel.
1186 		 */
1187 		abd_t *wbuf = abd_alloc_sametype(data, size);
1188 		abd_copy(wbuf, data, size);
1189 
1190 		zio_push_transform(zio, wbuf, size, size, NULL);
1191 	}
1192 
1193 	return (zio);
1194 }
1195 
1196 /*
1197  * Create a child I/O to do some work for us.
1198  */
1199 zio_t *
1200 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
1201     abd_t *data, uint64_t size, int type, zio_priority_t priority,
1202     enum zio_flag flags, zio_done_func_t *done, void *private)
1203 {
1204 	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
1205 	zio_t *zio;
1206 
1207 	/*
1208 	 * vdev child I/Os do not propagate their error to the parent.
1209 	 * Therefore, for correct operation the caller *must* check for
1210 	 * and handle the error in the child i/o's done callback.
1211 	 * The only exceptions are i/os that we don't care about
1212 	 * (OPTIONAL or REPAIR).
1213 	 */
1214 	ASSERT((flags & ZIO_FLAG_OPTIONAL) || (flags & ZIO_FLAG_IO_REPAIR) ||
1215 	    done != NULL);
1216 
1217 	if (type == ZIO_TYPE_READ && bp != NULL) {
1218 		/*
1219 		 * If we have the bp, then the child should perform the
1220 		 * checksum and the parent need not.  This pushes error
1221 		 * detection as close to the leaves as possible and
1222 		 * eliminates redundant checksums in the interior nodes.
1223 		 */
1224 		pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
1225 		pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
1226 	}
1227 
1228 	if (vd->vdev_ops->vdev_op_leaf) {
1229 		ASSERT0(vd->vdev_children);
1230 		offset += VDEV_LABEL_START_SIZE;
1231 	}
1232 
1233 	flags |= ZIO_VDEV_CHILD_FLAGS(pio);
1234 
1235 	/*
1236 	 * If we've decided to do a repair, the write is not speculative --
1237 	 * even if the original read was.
1238 	 */
1239 	if (flags & ZIO_FLAG_IO_REPAIR)
1240 		flags &= ~ZIO_FLAG_SPECULATIVE;
1241 
1242 	/*
1243 	 * If we're creating a child I/O that is not associated with a
1244 	 * top-level vdev, then the child zio is not an allocating I/O.
1245 	 * If this is a retried I/O then we ignore it since we will
1246 	 * have already processed the original allocating I/O.
1247 	 */
1248 	if (flags & ZIO_FLAG_IO_ALLOCATING &&
1249 	    (vd != vd->vdev_top || (flags & ZIO_FLAG_IO_RETRY))) {
1250 		ASSERT(pio->io_metaslab_class != NULL);
1251 		ASSERT(pio->io_metaslab_class->mc_alloc_throttle_enabled);
1252 		ASSERT(type == ZIO_TYPE_WRITE);
1253 		ASSERT(priority == ZIO_PRIORITY_ASYNC_WRITE);
1254 		ASSERT(!(flags & ZIO_FLAG_IO_REPAIR));
1255 		ASSERT(!(pio->io_flags & ZIO_FLAG_IO_REWRITE) ||
1256 		    pio->io_child_type == ZIO_CHILD_GANG);
1257 
1258 		flags &= ~ZIO_FLAG_IO_ALLOCATING;
1259 	}
1260 
1261 	zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size, size,
1262 	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
1263 	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
1264 	ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
1265 
1266 	zio->io_physdone = pio->io_physdone;
1267 	if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
1268 		zio->io_logical->io_phys_children++;
1269 
1270 	return (zio);
1271 }
1272 
1273 zio_t *
1274 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, abd_t *data, uint64_t size,
1275     zio_type_t type, zio_priority_t priority, enum zio_flag flags,
1276     zio_done_func_t *done, void *private)
1277 {
1278 	zio_t *zio;
1279 
1280 	ASSERT(vd->vdev_ops->vdev_op_leaf);
1281 
1282 	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
1283 	    data, size, size, done, private, type, priority,
1284 	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
1285 	    vd, offset, NULL,
1286 	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
1287 
1288 	return (zio);
1289 }
1290 
1291 void
1292 zio_flush(zio_t *zio, vdev_t *vd)
1293 {
1294 	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
1295 	    NULL, NULL,
1296 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
1297 }
1298 
1299 void
1300 zio_shrink(zio_t *zio, uint64_t size)
1301 {
1302 	ASSERT3P(zio->io_executor, ==, NULL);
1303 	ASSERT3P(zio->io_orig_size, ==, zio->io_size);
1304 	ASSERT3U(size, <=, zio->io_size);
1305 
1306 	/*
1307 	 * We don't shrink for raidz because of problems with the
1308 	 * reconstruction when reading back less than the block size.
1309 	 * Note, BP_IS_RAIDZ() assumes no compression.
1310 	 */
1311 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1312 	if (!BP_IS_RAIDZ(zio->io_bp)) {
1313 		/* we are not doing a raw write */
1314 		ASSERT3U(zio->io_size, ==, zio->io_lsize);
1315 		zio->io_orig_size = zio->io_size = zio->io_lsize = size;
1316 	}
1317 }
1318 
1319 /*
1320  * ==========================================================================
1321  * Prepare to read and write logical blocks
1322  * ==========================================================================
1323  */
1324 
1325 static int
1326 zio_read_bp_init(zio_t *zio)
1327 {
1328 	blkptr_t *bp = zio->io_bp;
1329 	uint64_t psize =
1330 	    BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
1331 
1332 	ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
1333 
1334 	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
1335 	    zio->io_child_type == ZIO_CHILD_LOGICAL &&
1336 	    !(zio->io_flags & ZIO_FLAG_RAW_COMPRESS)) {
1337 		zio_push_transform(zio, abd_alloc_sametype(zio->io_abd, psize),
1338 		    psize, psize, zio_decompress);
1339 	}
1340 
1341 	if (((BP_IS_PROTECTED(bp) && !(zio->io_flags & ZIO_FLAG_RAW_ENCRYPT)) ||
1342 	    BP_HAS_INDIRECT_MAC_CKSUM(bp)) &&
1343 	    zio->io_child_type == ZIO_CHILD_LOGICAL) {
1344 		zio_push_transform(zio, abd_alloc_sametype(zio->io_abd, psize),
1345 		    psize, psize, zio_decrypt);
1346 	}
1347 
1348 	if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
1349 		int psize = BPE_GET_PSIZE(bp);
1350 		void *data = abd_borrow_buf(zio->io_abd, psize);
1351 
1352 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1353 		decode_embedded_bp_compressed(bp, data);
1354 		abd_return_buf_copy(zio->io_abd, data, psize);
1355 	} else {
1356 		ASSERT(!BP_IS_EMBEDDED(bp));
1357 		ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
1358 	}
1359 
1360 	if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
1361 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1362 
1363 	if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
1364 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1365 
1366 	if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
1367 		zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
1368 
1369 	return (ZIO_PIPELINE_CONTINUE);
1370 }
1371 
1372 static int
1373 zio_write_bp_init(zio_t *zio)
1374 {
1375 	if (!IO_IS_ALLOCATING(zio))
1376 		return (ZIO_PIPELINE_CONTINUE);
1377 
1378 	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1379 
1380 	if (zio->io_bp_override) {
1381 		blkptr_t *bp = zio->io_bp;
1382 		zio_prop_t *zp = &zio->io_prop;
1383 
1384 		ASSERT(bp->blk_birth != zio->io_txg);
1385 		ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
1386 
1387 		*bp = *zio->io_bp_override;
1388 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1389 
1390 		if (BP_IS_EMBEDDED(bp))
1391 			return (ZIO_PIPELINE_CONTINUE);
1392 
1393 		/*
1394 		 * If we've been overridden and nopwrite is set then
1395 		 * set the flag accordingly to indicate that a nopwrite
1396 		 * has already occurred.
1397 		 */
1398 		if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
1399 			ASSERT(!zp->zp_dedup);
1400 			ASSERT3U(BP_GET_CHECKSUM(bp), ==, zp->zp_checksum);
1401 			zio->io_flags |= ZIO_FLAG_NOPWRITE;
1402 			return (ZIO_PIPELINE_CONTINUE);
1403 		}
1404 
1405 		ASSERT(!zp->zp_nopwrite);
1406 
1407 		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
1408 			return (ZIO_PIPELINE_CONTINUE);
1409 
1410 		ASSERT((zio_checksum_table[zp->zp_checksum].ci_flags &
1411 		    ZCHECKSUM_FLAG_DEDUP) || zp->zp_dedup_verify);
1412 
1413 		if (BP_GET_CHECKSUM(bp) == zp->zp_checksum &&
1414 		    !zp->zp_encrypt) {
1415 			BP_SET_DEDUP(bp, 1);
1416 			zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
1417 			return (ZIO_PIPELINE_CONTINUE);
1418 		}
1419 
1420 		/*
1421 		 * We were unable to handle this as an override bp, treat
1422 		 * it as a regular write I/O.
1423 		 */
1424 		zio->io_bp_override = NULL;
1425 		*bp = zio->io_bp_orig;
1426 		zio->io_pipeline = zio->io_orig_pipeline;
1427 	}
1428 
1429 	return (ZIO_PIPELINE_CONTINUE);
1430 }
1431 
1432 static int
1433 zio_write_compress(zio_t *zio)
1434 {
1435 	spa_t *spa = zio->io_spa;
1436 	zio_prop_t *zp = &zio->io_prop;
1437 	enum zio_compress compress = zp->zp_compress;
1438 	blkptr_t *bp = zio->io_bp;
1439 	uint64_t lsize = zio->io_lsize;
1440 	uint64_t psize = zio->io_size;
1441 	int pass = 1;
1442 
1443 	/*
1444 	 * If our children haven't all reached the ready stage,
1445 	 * wait for them and then repeat this pipeline stage.
1446 	 */
1447 	if (zio_wait_for_children(zio, ZIO_CHILD_LOGICAL_BIT |
1448 	    ZIO_CHILD_GANG_BIT, ZIO_WAIT_READY)) {
1449 		return (ZIO_PIPELINE_STOP);
1450 	}
1451 
1452 	if (!IO_IS_ALLOCATING(zio))
1453 		return (ZIO_PIPELINE_CONTINUE);
1454 
1455 	if (zio->io_children_ready != NULL) {
1456 		/*
1457 		 * Now that all our children are ready, run the callback
1458 		 * associated with this zio in case it wants to modify the
1459 		 * data to be written.
1460 		 */
1461 		ASSERT3U(zp->zp_level, >, 0);
1462 		zio->io_children_ready(zio);
1463 	}
1464 
1465 	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1466 	ASSERT(zio->io_bp_override == NULL);
1467 
1468 	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
1469 		/*
1470 		 * We're rewriting an existing block, which means we're
1471 		 * working on behalf of spa_sync().  For spa_sync() to
1472 		 * converge, it must eventually be the case that we don't
1473 		 * have to allocate new blocks.  But compression changes
1474 		 * the blocksize, which forces a reallocate, and makes
1475 		 * convergence take longer.  Therefore, after the first
1476 		 * few passes, stop compressing to ensure convergence.
1477 		 */
1478 		pass = spa_sync_pass(spa);
1479 
1480 		ASSERT(zio->io_txg == spa_syncing_txg(spa));
1481 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1482 		ASSERT(!BP_GET_DEDUP(bp));
1483 
1484 		if (pass >= zfs_sync_pass_dont_compress)
1485 			compress = ZIO_COMPRESS_OFF;
1486 
1487 		/* Make sure someone doesn't change their mind on overwrites */
1488 		ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
1489 		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
1490 	}
1491 
1492 	/* If it's a compressed write that is not raw, compress the buffer. */
1493 	if (compress != ZIO_COMPRESS_OFF &&
1494 	    !(zio->io_flags & ZIO_FLAG_RAW_COMPRESS)) {
1495 		void *cbuf = zio_buf_alloc(lsize);
1496 		psize = zio_compress_data(compress, zio->io_abd, cbuf, lsize);
1497 		if (psize == 0 || psize == lsize) {
1498 			compress = ZIO_COMPRESS_OFF;
1499 			zio_buf_free(cbuf, lsize);
1500 		} else if (!zp->zp_dedup && !zp->zp_encrypt &&
1501 		    psize <= BPE_PAYLOAD_SIZE &&
1502 		    zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
1503 		    spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
1504 			encode_embedded_bp_compressed(bp,
1505 			    cbuf, compress, lsize, psize);
1506 			BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
1507 			BP_SET_TYPE(bp, zio->io_prop.zp_type);
1508 			BP_SET_LEVEL(bp, zio->io_prop.zp_level);
1509 			zio_buf_free(cbuf, lsize);
1510 			bp->blk_birth = zio->io_txg;
1511 			zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1512 			ASSERT(spa_feature_is_active(spa,
1513 			    SPA_FEATURE_EMBEDDED_DATA));
1514 			return (ZIO_PIPELINE_CONTINUE);
1515 		} else {
1516 			/*
1517 			 * Round up compressed size up to the ashift
1518 			 * of the smallest-ashift device, and zero the tail.
1519 			 * This ensures that the compressed size of the BP
1520 			 * (and thus compressratio property) are correct,
1521 			 * in that we charge for the padding used to fill out
1522 			 * the last sector.
1523 			 */
1524 			ASSERT3U(spa->spa_min_ashift, >=, SPA_MINBLOCKSHIFT);
1525 			size_t rounded = (size_t)P2ROUNDUP(psize,
1526 			    1ULL << spa->spa_min_ashift);
1527 			if (rounded >= lsize) {
1528 				compress = ZIO_COMPRESS_OFF;
1529 				zio_buf_free(cbuf, lsize);
1530 				psize = lsize;
1531 			} else {
1532 				abd_t *cdata = abd_get_from_buf(cbuf, lsize);
1533 				abd_take_ownership_of_buf(cdata, B_TRUE);
1534 				abd_zero_off(cdata, psize, rounded - psize);
1535 				psize = rounded;
1536 				zio_push_transform(zio, cdata,
1537 				    psize, lsize, NULL);
1538 			}
1539 		}
1540 
1541 		/*
1542 		 * We were unable to handle this as an override bp, treat
1543 		 * it as a regular write I/O.
1544 		 */
1545 		zio->io_bp_override = NULL;
1546 		*bp = zio->io_bp_orig;
1547 		zio->io_pipeline = zio->io_orig_pipeline;
1548 
1549 	} else if ((zio->io_flags & ZIO_FLAG_RAW_ENCRYPT) != 0 &&
1550 	    zp->zp_type == DMU_OT_DNODE) {
1551 		/*
1552 		 * The DMU actually relies on the zio layer's compression
1553 		 * to free metadnode blocks that have had all contained
1554 		 * dnodes freed. As a result, even when doing a raw
1555 		 * receive, we must check whether the block can be compressed
1556 		 * to a hole.
1557 		 */
1558 		psize = zio_compress_data(ZIO_COMPRESS_EMPTY,
1559 		    zio->io_abd, NULL, lsize);
1560 		if (psize == 0)
1561 			compress = ZIO_COMPRESS_OFF;
1562 	} else {
1563 		ASSERT3U(psize, !=, 0);
1564 	}
1565 
1566 	/*
1567 	 * The final pass of spa_sync() must be all rewrites, but the first
1568 	 * few passes offer a trade-off: allocating blocks defers convergence,
1569 	 * but newly allocated blocks are sequential, so they can be written
1570 	 * to disk faster.  Therefore, we allow the first few passes of
1571 	 * spa_sync() to allocate new blocks, but force rewrites after that.
1572 	 * There should only be a handful of blocks after pass 1 in any case.
1573 	 */
1574 	if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
1575 	    BP_GET_PSIZE(bp) == psize &&
1576 	    pass >= zfs_sync_pass_rewrite) {
1577 		VERIFY3U(psize, !=, 0);
1578 		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
1579 		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
1580 		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
1581 	} else {
1582 		BP_ZERO(bp);
1583 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
1584 	}
1585 
1586 	if (psize == 0) {
1587 		if (zio->io_bp_orig.blk_birth != 0 &&
1588 		    spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
1589 			BP_SET_LSIZE(bp, lsize);
1590 			BP_SET_TYPE(bp, zp->zp_type);
1591 			BP_SET_LEVEL(bp, zp->zp_level);
1592 			BP_SET_BIRTH(bp, zio->io_txg, 0);
1593 		}
1594 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1595 	} else {
1596 		ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
1597 		BP_SET_LSIZE(bp, lsize);
1598 		BP_SET_TYPE(bp, zp->zp_type);
1599 		BP_SET_LEVEL(bp, zp->zp_level);
1600 		BP_SET_PSIZE(bp, psize);
1601 		BP_SET_COMPRESS(bp, compress);
1602 		BP_SET_CHECKSUM(bp, zp->zp_checksum);
1603 		BP_SET_DEDUP(bp, zp->zp_dedup);
1604 		BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1605 		if (zp->zp_dedup) {
1606 			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1607 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1608 			ASSERT(!zp->zp_encrypt ||
1609 			    DMU_OT_IS_ENCRYPTED(zp->zp_type));
1610 			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
1611 		}
1612 		if (zp->zp_nopwrite) {
1613 			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1614 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1615 			zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
1616 		}
1617 	}
1618 	return (ZIO_PIPELINE_CONTINUE);
1619 }
1620 
1621 static int
1622 zio_free_bp_init(zio_t *zio)
1623 {
1624 	blkptr_t *bp = zio->io_bp;
1625 
1626 	if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
1627 		if (BP_GET_DEDUP(bp))
1628 			zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
1629 	}
1630 
1631 	ASSERT3P(zio->io_bp, ==, &zio->io_bp_copy);
1632 
1633 	return (ZIO_PIPELINE_CONTINUE);
1634 }
1635 
1636 /*
1637  * ==========================================================================
1638  * Execute the I/O pipeline
1639  * ==========================================================================
1640  */
1641 
1642 static void
1643 zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
1644 {
1645 	spa_t *spa = zio->io_spa;
1646 	zio_type_t t = zio->io_type;
1647 	int flags = (cutinline ? TQ_FRONT : 0);
1648 
1649 	/*
1650 	 * If we're a config writer or a probe, the normal issue and
1651 	 * interrupt threads may all be blocked waiting for the config lock.
1652 	 * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
1653 	 */
1654 	if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
1655 		t = ZIO_TYPE_NULL;
1656 
1657 	/*
1658 	 * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
1659 	 */
1660 	if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
1661 		t = ZIO_TYPE_NULL;
1662 
1663 	/*
1664 	 * If this is a high priority I/O, then use the high priority taskq if
1665 	 * available.
1666 	 */
1667 	if ((zio->io_priority == ZIO_PRIORITY_NOW ||
1668 	    zio->io_priority == ZIO_PRIORITY_SYNC_WRITE) &&
1669 	    spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
1670 		q++;
1671 
1672 	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
1673 
1674 	/*
1675 	 * NB: We are assuming that the zio can only be dispatched
1676 	 * to a single taskq at a time.  It would be a grievous error
1677 	 * to dispatch the zio to another taskq at the same time.
1678 	 */
1679 	ASSERT(zio->io_tqent.tqent_next == NULL);
1680 	spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
1681 	    flags, &zio->io_tqent);
1682 }
1683 
1684 static boolean_t
1685 zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
1686 {
1687 	kthread_t *executor = zio->io_executor;
1688 	spa_t *spa = zio->io_spa;
1689 
1690 	for (zio_type_t t = 0; t < ZIO_TYPES; t++) {
1691 		spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
1692 		uint_t i;
1693 		for (i = 0; i < tqs->stqs_count; i++) {
1694 			if (taskq_member(tqs->stqs_taskq[i], executor))
1695 				return (B_TRUE);
1696 		}
1697 	}
1698 
1699 	return (B_FALSE);
1700 }
1701 
1702 static int
1703 zio_issue_async(zio_t *zio)
1704 {
1705 	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1706 
1707 	return (ZIO_PIPELINE_STOP);
1708 }
1709 
1710 void
1711 zio_interrupt(zio_t *zio)
1712 {
1713 	zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
1714 }
1715 
1716 void
1717 zio_delay_interrupt(zio_t *zio)
1718 {
1719 	/*
1720 	 * The timeout_generic() function isn't defined in userspace, so
1721 	 * rather than trying to implement the function, the zio delay
1722 	 * functionality has been disabled for userspace builds.
1723 	 */
1724 
1725 #ifdef _KERNEL
1726 	/*
1727 	 * If io_target_timestamp is zero, then no delay has been registered
1728 	 * for this IO, thus jump to the end of this function and "skip" the
1729 	 * delay; issuing it directly to the zio layer.
1730 	 */
1731 	if (zio->io_target_timestamp != 0) {
1732 		hrtime_t now = gethrtime();
1733 
1734 		if (now >= zio->io_target_timestamp) {
1735 			/*
1736 			 * This IO has already taken longer than the target
1737 			 * delay to complete, so we don't want to delay it
1738 			 * any longer; we "miss" the delay and issue it
1739 			 * directly to the zio layer. This is likely due to
1740 			 * the target latency being set to a value less than
1741 			 * the underlying hardware can satisfy (e.g. delay
1742 			 * set to 1ms, but the disks take 10ms to complete an
1743 			 * IO request).
1744 			 */
1745 
1746 			DTRACE_PROBE2(zio__delay__miss, zio_t *, zio,
1747 			    hrtime_t, now);
1748 
1749 			zio_interrupt(zio);
1750 		} else {
1751 			hrtime_t diff = zio->io_target_timestamp - now;
1752 
1753 			DTRACE_PROBE3(zio__delay__hit, zio_t *, zio,
1754 			    hrtime_t, now, hrtime_t, diff);
1755 
1756 			(void) timeout_generic(CALLOUT_NORMAL,
1757 			    (void (*)(void *))zio_interrupt, zio, diff, 1, 0);
1758 		}
1759 
1760 		return;
1761 	}
1762 #endif
1763 
1764 	DTRACE_PROBE1(zio__delay__skip, zio_t *, zio);
1765 	zio_interrupt(zio);
1766 }
1767 
1768 /*
1769  * Execute the I/O pipeline until one of the following occurs:
1770  *
1771  *	(1) the I/O completes
1772  *	(2) the pipeline stalls waiting for dependent child I/Os
1773  *	(3) the I/O issues, so we're waiting for an I/O completion interrupt
1774  *	(4) the I/O is delegated by vdev-level caching or aggregation
1775  *	(5) the I/O is deferred due to vdev-level queueing
1776  *	(6) the I/O is handed off to another thread.
1777  *
1778  * In all cases, the pipeline stops whenever there's no CPU work; it never
1779  * burns a thread in cv_wait().
1780  *
1781  * There's no locking on io_stage because there's no legitimate way
1782  * for multiple threads to be attempting to process the same I/O.
1783  */
1784 static zio_pipe_stage_t *zio_pipeline[];
1785 
1786 void
1787 zio_execute(zio_t *zio)
1788 {
1789 	zio->io_executor = curthread;
1790 
1791 	ASSERT3U(zio->io_queued_timestamp, >, 0);
1792 
1793 	while (zio->io_stage < ZIO_STAGE_DONE) {
1794 		enum zio_stage pipeline = zio->io_pipeline;
1795 		enum zio_stage stage = zio->io_stage;
1796 		int rv;
1797 
1798 		ASSERT(!MUTEX_HELD(&zio->io_lock));
1799 		ASSERT(ISP2(stage));
1800 		ASSERT(zio->io_stall == NULL);
1801 
1802 		do {
1803 			stage <<= 1;
1804 		} while ((stage & pipeline) == 0);
1805 
1806 		ASSERT(stage <= ZIO_STAGE_DONE);
1807 
1808 		/*
1809 		 * If we are in interrupt context and this pipeline stage
1810 		 * will grab a config lock that is held across I/O,
1811 		 * or may wait for an I/O that needs an interrupt thread
1812 		 * to complete, issue async to avoid deadlock.
1813 		 *
1814 		 * For VDEV_IO_START, we cut in line so that the io will
1815 		 * be sent to disk promptly.
1816 		 */
1817 		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
1818 		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
1819 			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1820 			    zio_requeue_io_start_cut_in_line : B_FALSE;
1821 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1822 			return;
1823 		}
1824 
1825 		zio->io_stage = stage;
1826 		zio->io_pipeline_trace |= zio->io_stage;
1827 		rv = zio_pipeline[highbit64(stage) - 1](zio);
1828 
1829 		if (rv == ZIO_PIPELINE_STOP)
1830 			return;
1831 
1832 		ASSERT(rv == ZIO_PIPELINE_CONTINUE);
1833 	}
1834 }
1835 
1836 /*
1837  * ==========================================================================
1838  * Initiate I/O, either sync or async
1839  * ==========================================================================
1840  */
1841 int
1842 zio_wait(zio_t *zio)
1843 {
1844 	int error;
1845 
1846 	ASSERT3P(zio->io_stage, ==, ZIO_STAGE_OPEN);
1847 	ASSERT3P(zio->io_executor, ==, NULL);
1848 
1849 	zio->io_waiter = curthread;
1850 	ASSERT0(zio->io_queued_timestamp);
1851 	zio->io_queued_timestamp = gethrtime();
1852 
1853 	zio_execute(zio);
1854 
1855 	mutex_enter(&zio->io_lock);
1856 	while (zio->io_executor != NULL)
1857 		cv_wait(&zio->io_cv, &zio->io_lock);
1858 	mutex_exit(&zio->io_lock);
1859 
1860 	error = zio->io_error;
1861 	zio_destroy(zio);
1862 
1863 	return (error);
1864 }
1865 
1866 void
1867 zio_nowait(zio_t *zio)
1868 {
1869 	ASSERT3P(zio->io_executor, ==, NULL);
1870 
1871 	if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
1872 	    zio_unique_parent(zio) == NULL) {
1873 		/*
1874 		 * This is a logical async I/O with no parent to wait for it.
1875 		 * We add it to the spa_async_root_zio "Godfather" I/O which
1876 		 * will ensure they complete prior to unloading the pool.
1877 		 */
1878 		spa_t *spa = zio->io_spa;
1879 
1880 		zio_add_child(spa->spa_async_zio_root[CPU_SEQID], zio);
1881 	}
1882 
1883 	ASSERT0(zio->io_queued_timestamp);
1884 	zio->io_queued_timestamp = gethrtime();
1885 	zio_execute(zio);
1886 }
1887 
1888 /*
1889  * ==========================================================================
1890  * Reexecute, cancel, or suspend/resume failed I/O
1891  * ==========================================================================
1892  */
1893 
1894 static void
1895 zio_reexecute(zio_t *pio)
1896 {
1897 	zio_t *cio, *cio_next;
1898 
1899 	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
1900 	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
1901 	ASSERT(pio->io_gang_leader == NULL);
1902 	ASSERT(pio->io_gang_tree == NULL);
1903 
1904 	pio->io_flags = pio->io_orig_flags;
1905 	pio->io_stage = pio->io_orig_stage;
1906 	pio->io_pipeline = pio->io_orig_pipeline;
1907 	pio->io_reexecute = 0;
1908 	pio->io_flags |= ZIO_FLAG_REEXECUTED;
1909 	pio->io_pipeline_trace = 0;
1910 	pio->io_error = 0;
1911 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1912 		pio->io_state[w] = 0;
1913 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
1914 		pio->io_child_error[c] = 0;
1915 
1916 	if (IO_IS_ALLOCATING(pio))
1917 		BP_ZERO(pio->io_bp);
1918 
1919 	/*
1920 	 * As we reexecute pio's children, new children could be created.
1921 	 * New children go to the head of pio's io_child_list, however,
1922 	 * so we will (correctly) not reexecute them.  The key is that
1923 	 * the remainder of pio's io_child_list, from 'cio_next' onward,
1924 	 * cannot be affected by any side effects of reexecuting 'cio'.
1925 	 */
1926 	zio_link_t *zl = NULL;
1927 	mutex_enter(&pio->io_lock);
1928 	for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
1929 		cio_next = zio_walk_children(pio, &zl);
1930 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
1931 			pio->io_children[cio->io_child_type][w]++;
1932 		mutex_exit(&pio->io_lock);
1933 		zio_reexecute(cio);
1934 		mutex_enter(&pio->io_lock);
1935 	}
1936 	mutex_exit(&pio->io_lock);
1937 
1938 	/*
1939 	 * Now that all children have been reexecuted, execute the parent.
1940 	 * We don't reexecute "The Godfather" I/O here as it's the
1941 	 * responsibility of the caller to wait on it.
1942 	 */
1943 	if (!(pio->io_flags & ZIO_FLAG_GODFATHER)) {
1944 		pio->io_queued_timestamp = gethrtime();
1945 		zio_execute(pio);
1946 	}
1947 }
1948 
1949 void
1950 zio_suspend(spa_t *spa, zio_t *zio, zio_suspend_reason_t reason)
1951 {
1952 	if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
1953 		fm_panic("Pool '%s' has encountered an uncorrectable I/O "
1954 		    "failure and the failure mode property for this pool "
1955 		    "is set to panic.", spa_name(spa));
1956 
1957 	zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL,
1958 	    NULL, NULL, 0, 0);
1959 
1960 	mutex_enter(&spa->spa_suspend_lock);
1961 
1962 	if (spa->spa_suspend_zio_root == NULL)
1963 		spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
1964 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
1965 		    ZIO_FLAG_GODFATHER);
1966 
1967 	spa->spa_suspended = reason;
1968 
1969 	if (zio != NULL) {
1970 		ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
1971 		ASSERT(zio != spa->spa_suspend_zio_root);
1972 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1973 		ASSERT(zio_unique_parent(zio) == NULL);
1974 		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
1975 		zio_add_child(spa->spa_suspend_zio_root, zio);
1976 	}
1977 
1978 	mutex_exit(&spa->spa_suspend_lock);
1979 }
1980 
1981 int
1982 zio_resume(spa_t *spa)
1983 {
1984 	zio_t *pio;
1985 
1986 	/*
1987 	 * Reexecute all previously suspended i/o.
1988 	 */
1989 	mutex_enter(&spa->spa_suspend_lock);
1990 	spa->spa_suspended = ZIO_SUSPEND_NONE;
1991 	cv_broadcast(&spa->spa_suspend_cv);
1992 	pio = spa->spa_suspend_zio_root;
1993 	spa->spa_suspend_zio_root = NULL;
1994 	mutex_exit(&spa->spa_suspend_lock);
1995 
1996 	if (pio == NULL)
1997 		return (0);
1998 
1999 	zio_reexecute(pio);
2000 	return (zio_wait(pio));
2001 }
2002 
2003 void
2004 zio_resume_wait(spa_t *spa)
2005 {
2006 	mutex_enter(&spa->spa_suspend_lock);
2007 	while (spa_suspended(spa))
2008 		cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
2009 	mutex_exit(&spa->spa_suspend_lock);
2010 }
2011 
2012 /*
2013  * ==========================================================================
2014  * Gang blocks.
2015  *
2016  * A gang block is a collection of small blocks that looks to the DMU
2017  * like one large block.  When zio_dva_allocate() cannot find a block
2018  * of the requested size, due to either severe fragmentation or the pool
2019  * being nearly full, it calls zio_write_gang_block() to construct the
2020  * block from smaller fragments.
2021  *
2022  * A gang block consists of a gang header (zio_gbh_phys_t) and up to
2023  * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
2024  * an indirect block: it's an array of block pointers.  It consumes
2025  * only one sector and hence is allocatable regardless of fragmentation.
2026  * The gang header's bps point to its gang members, which hold the data.
2027  *
2028  * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
2029  * as the verifier to ensure uniqueness of the SHA256 checksum.
2030  * Critically, the gang block bp's blk_cksum is the checksum of the data,
2031  * not the gang header.  This ensures that data block signatures (needed for
2032  * deduplication) are independent of how the block is physically stored.
2033  *
2034  * Gang blocks can be nested: a gang member may itself be a gang block.
2035  * Thus every gang block is a tree in which root and all interior nodes are
2036  * gang headers, and the leaves are normal blocks that contain user data.
2037  * The root of the gang tree is called the gang leader.
2038  *
2039  * To perform any operation (read, rewrite, free, claim) on a gang block,
2040  * zio_gang_assemble() first assembles the gang tree (minus data leaves)
2041  * in the io_gang_tree field of the original logical i/o by recursively
2042  * reading the gang leader and all gang headers below it.  This yields
2043  * an in-core tree containing the contents of every gang header and the
2044  * bps for every constituent of the gang block.
2045  *
2046  * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
2047  * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
2048  * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
2049  * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
2050  * zio_read_gang() is a wrapper around zio_read() that omits reading gang
2051  * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
2052  * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
2053  * of the gang header plus zio_checksum_compute() of the data to update the
2054  * gang header's blk_cksum as described above.
2055  *
2056  * The two-phase assemble/issue model solves the problem of partial failure --
2057  * what if you'd freed part of a gang block but then couldn't read the
2058  * gang header for another part?  Assembling the entire gang tree first
2059  * ensures that all the necessary gang header I/O has succeeded before
2060  * starting the actual work of free, claim, or write.  Once the gang tree
2061  * is assembled, free and claim are in-memory operations that cannot fail.
2062  *
2063  * In the event that a gang write fails, zio_dva_unallocate() walks the
2064  * gang tree to immediately free (i.e. insert back into the space map)
2065  * everything we've allocated.  This ensures that we don't get ENOSPC
2066  * errors during repeated suspend/resume cycles due to a flaky device.
2067  *
2068  * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
2069  * the gang tree, we won't modify the block, so we can safely defer the free
2070  * (knowing that the block is still intact).  If we *can* assemble the gang
2071  * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
2072  * each constituent bp and we can allocate a new block on the next sync pass.
2073  *
2074  * In all cases, the gang tree allows complete recovery from partial failure.
2075  * ==========================================================================
2076  */
2077 
2078 static void
2079 zio_gang_issue_func_done(zio_t *zio)
2080 {
2081 	abd_put(zio->io_abd);
2082 }
2083 
2084 static zio_t *
2085 zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2086     uint64_t offset)
2087 {
2088 	if (gn != NULL)
2089 		return (pio);
2090 
2091 	return (zio_read(pio, pio->io_spa, bp, abd_get_offset(data, offset),
2092 	    BP_GET_PSIZE(bp), zio_gang_issue_func_done,
2093 	    NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
2094 	    &pio->io_bookmark));
2095 }
2096 
2097 static zio_t *
2098 zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2099     uint64_t offset)
2100 {
2101 	zio_t *zio;
2102 
2103 	if (gn != NULL) {
2104 		abd_t *gbh_abd =
2105 		    abd_get_from_buf(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2106 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
2107 		    gbh_abd, SPA_GANGBLOCKSIZE, zio_gang_issue_func_done, NULL,
2108 		    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
2109 		    &pio->io_bookmark);
2110 		/*
2111 		 * As we rewrite each gang header, the pipeline will compute
2112 		 * a new gang block header checksum for it; but no one will
2113 		 * compute a new data checksum, so we do that here.  The one
2114 		 * exception is the gang leader: the pipeline already computed
2115 		 * its data checksum because that stage precedes gang assembly.
2116 		 * (Presently, nothing actually uses interior data checksums;
2117 		 * this is just good hygiene.)
2118 		 */
2119 		if (gn != pio->io_gang_leader->io_gang_tree) {
2120 			abd_t *buf = abd_get_offset(data, offset);
2121 
2122 			zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
2123 			    buf, BP_GET_PSIZE(bp));
2124 
2125 			abd_put(buf);
2126 		}
2127 		/*
2128 		 * If we are here to damage data for testing purposes,
2129 		 * leave the GBH alone so that we can detect the damage.
2130 		 */
2131 		if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
2132 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
2133 	} else {
2134 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
2135 		    abd_get_offset(data, offset), BP_GET_PSIZE(bp),
2136 		    zio_gang_issue_func_done, NULL, pio->io_priority,
2137 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2138 	}
2139 
2140 	return (zio);
2141 }
2142 
2143 /* ARGSUSED */
2144 static zio_t *
2145 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2146     uint64_t offset)
2147 {
2148 	return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
2149 	    ZIO_GANG_CHILD_FLAGS(pio)));
2150 }
2151 
2152 /* ARGSUSED */
2153 static zio_t *
2154 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2155     uint64_t offset)
2156 {
2157 	return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
2158 	    NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
2159 }
2160 
2161 static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
2162 	NULL,
2163 	zio_read_gang,
2164 	zio_rewrite_gang,
2165 	zio_free_gang,
2166 	zio_claim_gang,
2167 	NULL
2168 };
2169 
2170 static void zio_gang_tree_assemble_done(zio_t *zio);
2171 
2172 static zio_gang_node_t *
2173 zio_gang_node_alloc(zio_gang_node_t **gnpp)
2174 {
2175 	zio_gang_node_t *gn;
2176 
2177 	ASSERT(*gnpp == NULL);
2178 
2179 	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
2180 	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
2181 	*gnpp = gn;
2182 
2183 	return (gn);
2184 }
2185 
2186 static void
2187 zio_gang_node_free(zio_gang_node_t **gnpp)
2188 {
2189 	zio_gang_node_t *gn = *gnpp;
2190 
2191 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
2192 		ASSERT(gn->gn_child[g] == NULL);
2193 
2194 	zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2195 	kmem_free(gn, sizeof (*gn));
2196 	*gnpp = NULL;
2197 }
2198 
2199 static void
2200 zio_gang_tree_free(zio_gang_node_t **gnpp)
2201 {
2202 	zio_gang_node_t *gn = *gnpp;
2203 
2204 	if (gn == NULL)
2205 		return;
2206 
2207 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
2208 		zio_gang_tree_free(&gn->gn_child[g]);
2209 
2210 	zio_gang_node_free(gnpp);
2211 }
2212 
2213 static void
2214 zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
2215 {
2216 	zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
2217 	abd_t *gbh_abd = abd_get_from_buf(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2218 
2219 	ASSERT(gio->io_gang_leader == gio);
2220 	ASSERT(BP_IS_GANG(bp));
2221 
2222 	zio_nowait(zio_read(gio, gio->io_spa, bp, gbh_abd, SPA_GANGBLOCKSIZE,
2223 	    zio_gang_tree_assemble_done, gn, gio->io_priority,
2224 	    ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
2225 }
2226 
2227 static void
2228 zio_gang_tree_assemble_done(zio_t *zio)
2229 {
2230 	zio_t *gio = zio->io_gang_leader;
2231 	zio_gang_node_t *gn = zio->io_private;
2232 	blkptr_t *bp = zio->io_bp;
2233 
2234 	ASSERT(gio == zio_unique_parent(zio));
2235 	ASSERT(zio->io_child_count == 0);
2236 
2237 	if (zio->io_error)
2238 		return;
2239 
2240 	/* this ABD was created from a linear buf in zio_gang_tree_assemble */
2241 	if (BP_SHOULD_BYTESWAP(bp))
2242 		byteswap_uint64_array(abd_to_buf(zio->io_abd), zio->io_size);
2243 
2244 	ASSERT3P(abd_to_buf(zio->io_abd), ==, gn->gn_gbh);
2245 	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
2246 	ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
2247 
2248 	abd_put(zio->io_abd);
2249 
2250 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2251 		blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
2252 		if (!BP_IS_GANG(gbp))
2253 			continue;
2254 		zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
2255 	}
2256 }
2257 
2258 static void
2259 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, abd_t *data,
2260     uint64_t offset)
2261 {
2262 	zio_t *gio = pio->io_gang_leader;
2263 	zio_t *zio;
2264 
2265 	ASSERT(BP_IS_GANG(bp) == !!gn);
2266 	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
2267 	ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
2268 
2269 	/*
2270 	 * If you're a gang header, your data is in gn->gn_gbh.
2271 	 * If you're a gang member, your data is in 'data' and gn == NULL.
2272 	 */
2273 	zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data, offset);
2274 
2275 	if (gn != NULL) {
2276 		ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
2277 
2278 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2279 			blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
2280 			if (BP_IS_HOLE(gbp))
2281 				continue;
2282 			zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data,
2283 			    offset);
2284 			offset += BP_GET_PSIZE(gbp);
2285 		}
2286 	}
2287 
2288 	if (gn == gio->io_gang_tree)
2289 		ASSERT3U(gio->io_size, ==, offset);
2290 
2291 	if (zio != pio)
2292 		zio_nowait(zio);
2293 }
2294 
2295 static int
2296 zio_gang_assemble(zio_t *zio)
2297 {
2298 	blkptr_t *bp = zio->io_bp;
2299 
2300 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
2301 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2302 
2303 	zio->io_gang_leader = zio;
2304 
2305 	zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
2306 
2307 	return (ZIO_PIPELINE_CONTINUE);
2308 }
2309 
2310 static int
2311 zio_gang_issue(zio_t *zio)
2312 {
2313 	blkptr_t *bp = zio->io_bp;
2314 
2315 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG_BIT, ZIO_WAIT_DONE)) {
2316 		return (ZIO_PIPELINE_STOP);
2317 	}
2318 
2319 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
2320 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2321 
2322 	if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
2323 		zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_abd,
2324 		    0);
2325 	else
2326 		zio_gang_tree_free(&zio->io_gang_tree);
2327 
2328 	zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2329 
2330 	return (ZIO_PIPELINE_CONTINUE);
2331 }
2332 
2333 static void
2334 zio_write_gang_member_ready(zio_t *zio)
2335 {
2336 	zio_t *pio = zio_unique_parent(zio);
2337 	zio_t *gio = zio->io_gang_leader;
2338 	dva_t *cdva = zio->io_bp->blk_dva;
2339 	dva_t *pdva = pio->io_bp->blk_dva;
2340 	uint64_t asize;
2341 
2342 	if (BP_IS_HOLE(zio->io_bp))
2343 		return;
2344 
2345 	ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
2346 
2347 	ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
2348 	ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
2349 	ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
2350 	ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
2351 	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
2352 
2353 	mutex_enter(&pio->io_lock);
2354 	for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
2355 		ASSERT(DVA_GET_GANG(&pdva[d]));
2356 		asize = DVA_GET_ASIZE(&pdva[d]);
2357 		asize += DVA_GET_ASIZE(&cdva[d]);
2358 		DVA_SET_ASIZE(&pdva[d], asize);
2359 	}
2360 	mutex_exit(&pio->io_lock);
2361 }
2362 
2363 static void
2364 zio_write_gang_done(zio_t *zio)
2365 {
2366 	/*
2367 	 * The io_abd field will be NULL for a zio with no data.  The io_flags
2368 	 * will initially have the ZIO_FLAG_NODATA bit flag set, but we can't
2369 	 * check for it here as it is cleared in zio_ready.
2370 	 */
2371 	if (zio->io_abd != NULL)
2372 		abd_put(zio->io_abd);
2373 }
2374 
2375 static int
2376 zio_write_gang_block(zio_t *pio)
2377 {
2378 	spa_t *spa = pio->io_spa;
2379 	metaslab_class_t *mc = spa_normal_class(spa);
2380 	blkptr_t *bp = pio->io_bp;
2381 	zio_t *gio = pio->io_gang_leader;
2382 	zio_t *zio;
2383 	zio_gang_node_t *gn, **gnpp;
2384 	zio_gbh_phys_t *gbh;
2385 	abd_t *gbh_abd;
2386 	uint64_t txg = pio->io_txg;
2387 	uint64_t resid = pio->io_size;
2388 	uint64_t lsize;
2389 	int copies = gio->io_prop.zp_copies;
2390 	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
2391 	zio_prop_t zp;
2392 	int error;
2393 	boolean_t has_data = !(pio->io_flags & ZIO_FLAG_NODATA);
2394 
2395 	/*
2396 	 * encrypted blocks need DVA[2] free so encrypted gang headers can't
2397 	 * have a third copy.
2398 	 */
2399 	if (gio->io_prop.zp_encrypt && gbh_copies >= SPA_DVAS_PER_BP)
2400 		gbh_copies = SPA_DVAS_PER_BP - 1;
2401 
2402 	int flags = METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER;
2403 	if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2404 		ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2405 		ASSERT(has_data);
2406 
2407 		flags |= METASLAB_ASYNC_ALLOC;
2408 		VERIFY(zfs_refcount_held(&mc->mc_alloc_slots[pio->io_allocator],
2409 		    pio));
2410 
2411 		/*
2412 		 * The logical zio has already placed a reservation for
2413 		 * 'copies' allocation slots but gang blocks may require
2414 		 * additional copies. These additional copies
2415 		 * (i.e. gbh_copies - copies) are guaranteed to succeed
2416 		 * since metaslab_class_throttle_reserve() always allows
2417 		 * additional reservations for gang blocks.
2418 		 */
2419 		VERIFY(metaslab_class_throttle_reserve(mc, gbh_copies - copies,
2420 		    pio->io_allocator, pio, flags));
2421 	}
2422 
2423 	error = metaslab_alloc(spa, mc, SPA_GANGBLOCKSIZE,
2424 	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp, flags,
2425 	    &pio->io_alloc_list, pio, pio->io_allocator);
2426 	if (error) {
2427 		if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2428 			ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2429 			ASSERT(has_data);
2430 
2431 			/*
2432 			 * If we failed to allocate the gang block header then
2433 			 * we remove any additional allocation reservations that
2434 			 * we placed here. The original reservation will
2435 			 * be removed when the logical I/O goes to the ready
2436 			 * stage.
2437 			 */
2438 			metaslab_class_throttle_unreserve(mc,
2439 			    gbh_copies - copies, pio->io_allocator, pio);
2440 		}
2441 		pio->io_error = error;
2442 		return (ZIO_PIPELINE_CONTINUE);
2443 	}
2444 
2445 	if (pio == gio) {
2446 		gnpp = &gio->io_gang_tree;
2447 	} else {
2448 		gnpp = pio->io_private;
2449 		ASSERT(pio->io_ready == zio_write_gang_member_ready);
2450 	}
2451 
2452 	gn = zio_gang_node_alloc(gnpp);
2453 	gbh = gn->gn_gbh;
2454 	bzero(gbh, SPA_GANGBLOCKSIZE);
2455 	gbh_abd = abd_get_from_buf(gbh, SPA_GANGBLOCKSIZE);
2456 
2457 	/*
2458 	 * Create the gang header.
2459 	 */
2460 	zio = zio_rewrite(pio, spa, txg, bp, gbh_abd, SPA_GANGBLOCKSIZE,
2461 	    zio_write_gang_done, NULL, pio->io_priority,
2462 	    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2463 
2464 	/*
2465 	 * Create and nowait the gang children.
2466 	 */
2467 	for (int g = 0; resid != 0; resid -= lsize, g++) {
2468 		lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
2469 		    SPA_MINBLOCKSIZE);
2470 		ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
2471 
2472 		zp.zp_checksum = gio->io_prop.zp_checksum;
2473 		zp.zp_compress = ZIO_COMPRESS_OFF;
2474 		zp.zp_type = DMU_OT_NONE;
2475 		zp.zp_level = 0;
2476 		zp.zp_copies = gio->io_prop.zp_copies;
2477 		zp.zp_dedup = B_FALSE;
2478 		zp.zp_dedup_verify = B_FALSE;
2479 		zp.zp_nopwrite = B_FALSE;
2480 		zp.zp_encrypt = gio->io_prop.zp_encrypt;
2481 		zp.zp_byteorder = gio->io_prop.zp_byteorder;
2482 		bzero(zp.zp_salt, ZIO_DATA_SALT_LEN);
2483 		bzero(zp.zp_iv, ZIO_DATA_IV_LEN);
2484 		bzero(zp.zp_mac, ZIO_DATA_MAC_LEN);
2485 
2486 		zio_t *cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
2487 		    has_data ? abd_get_offset(pio->io_abd, pio->io_size -
2488 		    resid) : NULL, lsize, lsize, &zp,
2489 		    zio_write_gang_member_ready, NULL, NULL,
2490 		    zio_write_gang_done, &gn->gn_child[g], pio->io_priority,
2491 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2492 
2493 		if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2494 			ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2495 			ASSERT(has_data);
2496 
2497 			/*
2498 			 * Gang children won't throttle but we should
2499 			 * account for their work, so reserve an allocation
2500 			 * slot for them here.
2501 			 */
2502 			VERIFY(metaslab_class_throttle_reserve(mc,
2503 			    zp.zp_copies, cio->io_allocator, cio, flags));
2504 		}
2505 		zio_nowait(cio);
2506 	}
2507 
2508 	/*
2509 	 * Set pio's pipeline to just wait for zio to finish.
2510 	 */
2511 	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2512 
2513 	zio_nowait(zio);
2514 
2515 	return (ZIO_PIPELINE_CONTINUE);
2516 }
2517 
2518 /*
2519  * The zio_nop_write stage in the pipeline determines if allocating a
2520  * new bp is necessary.  The nopwrite feature can handle writes in
2521  * either syncing or open context (i.e. zil writes) and as a result is
2522  * mutually exclusive with dedup.
2523  *
2524  * By leveraging a cryptographically secure checksum, such as SHA256, we
2525  * can compare the checksums of the new data and the old to determine if
2526  * allocating a new block is required.  Note that our requirements for
2527  * cryptographic strength are fairly weak: there can't be any accidental
2528  * hash collisions, but we don't need to be secure against intentional
2529  * (malicious) collisions.  To trigger a nopwrite, you have to be able
2530  * to write the file to begin with, and triggering an incorrect (hash
2531  * collision) nopwrite is no worse than simply writing to the file.
2532  * That said, there are no known attacks against the checksum algorithms
2533  * used for nopwrite, assuming that the salt and the checksums
2534  * themselves remain secret.
2535  */
2536 static int
2537 zio_nop_write(zio_t *zio)
2538 {
2539 	blkptr_t *bp = zio->io_bp;
2540 	blkptr_t *bp_orig = &zio->io_bp_orig;
2541 	zio_prop_t *zp = &zio->io_prop;
2542 
2543 	ASSERT(BP_GET_LEVEL(bp) == 0);
2544 	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
2545 	ASSERT(zp->zp_nopwrite);
2546 	ASSERT(!zp->zp_dedup);
2547 	ASSERT(zio->io_bp_override == NULL);
2548 	ASSERT(IO_IS_ALLOCATING(zio));
2549 
2550 	/*
2551 	 * Check to see if the original bp and the new bp have matching
2552 	 * characteristics (i.e. same checksum, compression algorithms, etc).
2553 	 * If they don't then just continue with the pipeline which will
2554 	 * allocate a new bp.
2555 	 */
2556 	if (BP_IS_HOLE(bp_orig) ||
2557 	    !(zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_flags &
2558 	    ZCHECKSUM_FLAG_NOPWRITE) ||
2559 	    BP_IS_ENCRYPTED(bp) || BP_IS_ENCRYPTED(bp_orig) ||
2560 	    BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
2561 	    BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
2562 	    BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
2563 	    zp->zp_copies != BP_GET_NDVAS(bp_orig))
2564 		return (ZIO_PIPELINE_CONTINUE);
2565 
2566 	/*
2567 	 * If the checksums match then reset the pipeline so that we
2568 	 * avoid allocating a new bp and issuing any I/O.
2569 	 */
2570 	if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
2571 		ASSERT(zio_checksum_table[zp->zp_checksum].ci_flags &
2572 		    ZCHECKSUM_FLAG_NOPWRITE);
2573 		ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
2574 		ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
2575 		ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
2576 		ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
2577 		    sizeof (uint64_t)) == 0);
2578 
2579 		*bp = *bp_orig;
2580 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2581 		zio->io_flags |= ZIO_FLAG_NOPWRITE;
2582 	}
2583 
2584 	return (ZIO_PIPELINE_CONTINUE);
2585 }
2586 
2587 /*
2588  * ==========================================================================
2589  * Dedup
2590  * ==========================================================================
2591  */
2592 static void
2593 zio_ddt_child_read_done(zio_t *zio)
2594 {
2595 	blkptr_t *bp = zio->io_bp;
2596 	ddt_entry_t *dde = zio->io_private;
2597 	ddt_phys_t *ddp;
2598 	zio_t *pio = zio_unique_parent(zio);
2599 
2600 	mutex_enter(&pio->io_lock);
2601 	ddp = ddt_phys_select(dde, bp);
2602 	if (zio->io_error == 0)
2603 		ddt_phys_clear(ddp);	/* this ddp doesn't need repair */
2604 
2605 	if (zio->io_error == 0 && dde->dde_repair_abd == NULL)
2606 		dde->dde_repair_abd = zio->io_abd;
2607 	else
2608 		abd_free(zio->io_abd);
2609 	mutex_exit(&pio->io_lock);
2610 }
2611 
2612 static int
2613 zio_ddt_read_start(zio_t *zio)
2614 {
2615 	blkptr_t *bp = zio->io_bp;
2616 
2617 	ASSERT(BP_GET_DEDUP(bp));
2618 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2619 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2620 
2621 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
2622 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
2623 		ddt_entry_t *dde = ddt_repair_start(ddt, bp);
2624 		ddt_phys_t *ddp = dde->dde_phys;
2625 		ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
2626 		blkptr_t blk;
2627 
2628 		ASSERT(zio->io_vsd == NULL);
2629 		zio->io_vsd = dde;
2630 
2631 		if (ddp_self == NULL)
2632 			return (ZIO_PIPELINE_CONTINUE);
2633 
2634 		for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
2635 			if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
2636 				continue;
2637 			ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
2638 			    &blk);
2639 			zio_nowait(zio_read(zio, zio->io_spa, &blk,
2640 			    abd_alloc_for_io(zio->io_size, B_TRUE),
2641 			    zio->io_size, zio_ddt_child_read_done, dde,
2642 			    zio->io_priority, ZIO_DDT_CHILD_FLAGS(zio) |
2643 			    ZIO_FLAG_DONT_PROPAGATE, &zio->io_bookmark));
2644 		}
2645 		return (ZIO_PIPELINE_CONTINUE);
2646 	}
2647 
2648 	zio_nowait(zio_read(zio, zio->io_spa, bp,
2649 	    zio->io_abd, zio->io_size, NULL, NULL, zio->io_priority,
2650 	    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
2651 
2652 	return (ZIO_PIPELINE_CONTINUE);
2653 }
2654 
2655 static int
2656 zio_ddt_read_done(zio_t *zio)
2657 {
2658 	blkptr_t *bp = zio->io_bp;
2659 
2660 	if (zio_wait_for_children(zio, ZIO_CHILD_DDT_BIT, ZIO_WAIT_DONE)) {
2661 		return (ZIO_PIPELINE_STOP);
2662 	}
2663 
2664 	ASSERT(BP_GET_DEDUP(bp));
2665 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2666 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2667 
2668 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
2669 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
2670 		ddt_entry_t *dde = zio->io_vsd;
2671 		if (ddt == NULL) {
2672 			ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
2673 			return (ZIO_PIPELINE_CONTINUE);
2674 		}
2675 		if (dde == NULL) {
2676 			zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
2677 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
2678 			return (ZIO_PIPELINE_STOP);
2679 		}
2680 		if (dde->dde_repair_abd != NULL) {
2681 			abd_copy(zio->io_abd, dde->dde_repair_abd,
2682 			    zio->io_size);
2683 			zio->io_child_error[ZIO_CHILD_DDT] = 0;
2684 		}
2685 		ddt_repair_done(ddt, dde);
2686 		zio->io_vsd = NULL;
2687 	}
2688 
2689 	ASSERT(zio->io_vsd == NULL);
2690 
2691 	return (ZIO_PIPELINE_CONTINUE);
2692 }
2693 
2694 static boolean_t
2695 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
2696 {
2697 	spa_t *spa = zio->io_spa;
2698 	boolean_t do_raw = !!(zio->io_flags & ZIO_FLAG_RAW);
2699 
2700 	/* We should never get a raw, override zio */
2701 	ASSERT(!(zio->io_bp_override && do_raw));
2702 
2703 	/*
2704 	 * Note: we compare the original data, not the transformed data,
2705 	 * because when zio->io_bp is an override bp, we will not have
2706 	 * pushed the I/O transforms.  That's an important optimization
2707 	 * because otherwise we'd compress/encrypt all dmu_sync() data twice.
2708 	 * However, we should never get a raw, override zio so in these
2709 	 * cases we can compare the io_data directly. This is useful because
2710 	 * it allows us to do dedup verification even if we don't have access
2711 	 * to the original data (for instance, if the encryption keys aren't
2712 	 * loaded).
2713 	 */
2714 
2715 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2716 		zio_t *lio = dde->dde_lead_zio[p];
2717 
2718 		if (lio != NULL && do_raw) {
2719 			return (lio->io_size != zio->io_size ||
2720 			    abd_cmp(zio->io_abd, lio->io_abd,
2721 			    zio->io_size) != 0);
2722 		} else if (lio != NULL) {
2723 			return (lio->io_orig_size != zio->io_orig_size ||
2724 			    abd_cmp(zio->io_orig_abd, lio->io_orig_abd,
2725 			    zio->io_orig_size) != 0);
2726 		}
2727 	}
2728 
2729 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2730 		ddt_phys_t *ddp = &dde->dde_phys[p];
2731 
2732 		if (ddp->ddp_phys_birth != 0 && do_raw) {
2733 			blkptr_t blk = *zio->io_bp;
2734 			uint64_t psize;
2735 			abd_t *tmpabd;
2736 			int error;
2737 
2738 			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
2739 			psize = BP_GET_PSIZE(&blk);
2740 
2741 			if (psize != zio->io_size)
2742 				return (B_TRUE);
2743 
2744 			ddt_exit(ddt);
2745 
2746 			tmpabd = abd_alloc_for_io(psize, B_TRUE);
2747 
2748 			error = zio_wait(zio_read(NULL, spa, &blk, tmpabd,
2749 			    psize, NULL, NULL, ZIO_PRIORITY_SYNC_READ,
2750 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
2751 			    ZIO_FLAG_RAW, &zio->io_bookmark));
2752 
2753 			if (error == 0) {
2754 				if (abd_cmp(tmpabd, zio->io_abd, psize) != 0)
2755 					error = SET_ERROR(ENOENT);
2756 			}
2757 
2758 			abd_free(tmpabd);
2759 			ddt_enter(ddt);
2760 			return (error != 0);
2761 		} else if (ddp->ddp_phys_birth != 0) {
2762 			arc_buf_t *abuf = NULL;
2763 			arc_flags_t aflags = ARC_FLAG_WAIT;
2764 			int zio_flags = ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE;
2765 			blkptr_t blk = *zio->io_bp;
2766 			int error;
2767 
2768 			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
2769 
2770 			if (BP_GET_LSIZE(&blk) != zio->io_orig_size)
2771 				return (B_TRUE);
2772 
2773 			ddt_exit(ddt);
2774 
2775 			/*
2776 			 * Intuitively, it would make more sense to compare
2777 			 * io_abd than io_orig_abd in the raw case since you
2778 			 * don't want to look at any transformations that have
2779 			 * happened to the data. However, for raw I/Os the
2780 			 * data will actually be the same in io_abd and
2781 			 * io_orig_abd, so all we have to do is issue this as
2782 			 * a raw ARC read.
2783 			 */
2784 			if (do_raw) {
2785 				zio_flags |= ZIO_FLAG_RAW;
2786 				ASSERT3U(zio->io_size, ==, zio->io_orig_size);
2787 				ASSERT0(abd_cmp(zio->io_abd, zio->io_orig_abd,
2788 				    zio->io_size));
2789 				ASSERT3P(zio->io_transform_stack, ==, NULL);
2790 			}
2791 
2792 			error = arc_read(NULL, spa, &blk,
2793 			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
2794 			    zio_flags, &aflags, &zio->io_bookmark);
2795 
2796 			if (error == 0) {
2797 				if (abd_cmp_buf(zio->io_orig_abd, abuf->b_data,
2798 				    zio->io_orig_size) != 0)
2799 					error = SET_ERROR(ENOENT);
2800 				arc_buf_destroy(abuf, &abuf);
2801 			}
2802 
2803 			ddt_enter(ddt);
2804 			return (error != 0);
2805 		}
2806 	}
2807 
2808 	return (B_FALSE);
2809 }
2810 
2811 static void
2812 zio_ddt_child_write_ready(zio_t *zio)
2813 {
2814 	int p = zio->io_prop.zp_copies;
2815 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2816 	ddt_entry_t *dde = zio->io_private;
2817 	ddt_phys_t *ddp = &dde->dde_phys[p];
2818 	zio_t *pio;
2819 
2820 	if (zio->io_error)
2821 		return;
2822 
2823 	ddt_enter(ddt);
2824 
2825 	ASSERT(dde->dde_lead_zio[p] == zio);
2826 
2827 	ddt_phys_fill(ddp, zio->io_bp);
2828 
2829 	zio_link_t *zl = NULL;
2830 	while ((pio = zio_walk_parents(zio, &zl)) != NULL)
2831 		ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
2832 
2833 	ddt_exit(ddt);
2834 }
2835 
2836 static void
2837 zio_ddt_child_write_done(zio_t *zio)
2838 {
2839 	int p = zio->io_prop.zp_copies;
2840 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2841 	ddt_entry_t *dde = zio->io_private;
2842 	ddt_phys_t *ddp = &dde->dde_phys[p];
2843 
2844 	ddt_enter(ddt);
2845 
2846 	ASSERT(ddp->ddp_refcnt == 0);
2847 	ASSERT(dde->dde_lead_zio[p] == zio);
2848 	dde->dde_lead_zio[p] = NULL;
2849 
2850 	if (zio->io_error == 0) {
2851 		zio_link_t *zl = NULL;
2852 		while (zio_walk_parents(zio, &zl) != NULL)
2853 			ddt_phys_addref(ddp);
2854 	} else {
2855 		ddt_phys_clear(ddp);
2856 	}
2857 
2858 	ddt_exit(ddt);
2859 }
2860 
2861 static void
2862 zio_ddt_ditto_write_done(zio_t *zio)
2863 {
2864 	int p = DDT_PHYS_DITTO;
2865 	zio_prop_t *zp = &zio->io_prop;
2866 	blkptr_t *bp = zio->io_bp;
2867 	ddt_t *ddt = ddt_select(zio->io_spa, bp);
2868 	ddt_entry_t *dde = zio->io_private;
2869 	ddt_phys_t *ddp = &dde->dde_phys[p];
2870 	ddt_key_t *ddk = &dde->dde_key;
2871 
2872 	ddt_enter(ddt);
2873 
2874 	ASSERT(ddp->ddp_refcnt == 0);
2875 	ASSERT(dde->dde_lead_zio[p] == zio);
2876 	dde->dde_lead_zio[p] = NULL;
2877 
2878 	if (zio->io_error == 0) {
2879 		ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
2880 		ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
2881 		ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
2882 		if (ddp->ddp_phys_birth != 0)
2883 			ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
2884 		ddt_phys_fill(ddp, bp);
2885 	}
2886 
2887 	ddt_exit(ddt);
2888 }
2889 
2890 static int
2891 zio_ddt_write(zio_t *zio)
2892 {
2893 	spa_t *spa = zio->io_spa;
2894 	blkptr_t *bp = zio->io_bp;
2895 	uint64_t txg = zio->io_txg;
2896 	zio_prop_t *zp = &zio->io_prop;
2897 	int p = zp->zp_copies;
2898 	int ditto_copies;
2899 	zio_t *cio = NULL;
2900 	zio_t *dio = NULL;
2901 	ddt_t *ddt = ddt_select(spa, bp);
2902 	ddt_entry_t *dde;
2903 	ddt_phys_t *ddp;
2904 
2905 	ASSERT(BP_GET_DEDUP(bp));
2906 	ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
2907 	ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
2908 	ASSERT(!(zio->io_bp_override && (zio->io_flags & ZIO_FLAG_RAW)));
2909 
2910 	ddt_enter(ddt);
2911 	dde = ddt_lookup(ddt, bp, B_TRUE);
2912 	ddp = &dde->dde_phys[p];
2913 
2914 	if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
2915 		/*
2916 		 * If we're using a weak checksum, upgrade to a strong checksum
2917 		 * and try again.  If we're already using a strong checksum,
2918 		 * we can't resolve it, so just convert to an ordinary write.
2919 		 * (And automatically e-mail a paper to Nature?)
2920 		 */
2921 		if (!(zio_checksum_table[zp->zp_checksum].ci_flags &
2922 		    ZCHECKSUM_FLAG_DEDUP)) {
2923 			zp->zp_checksum = spa_dedup_checksum(spa);
2924 			zio_pop_transforms(zio);
2925 			zio->io_stage = ZIO_STAGE_OPEN;
2926 			BP_ZERO(bp);
2927 		} else {
2928 			zp->zp_dedup = B_FALSE;
2929 			BP_SET_DEDUP(bp, B_FALSE);
2930 		}
2931 		ASSERT(!BP_GET_DEDUP(bp));
2932 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
2933 		ddt_exit(ddt);
2934 		return (ZIO_PIPELINE_CONTINUE);
2935 	}
2936 
2937 	ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
2938 	ASSERT(ditto_copies < SPA_DVAS_PER_BP);
2939 
2940 	if (ditto_copies > ddt_ditto_copies_present(dde) &&
2941 	    dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
2942 		zio_prop_t czp = *zp;
2943 
2944 		czp.zp_copies = ditto_copies;
2945 
2946 		/*
2947 		 * If we arrived here with an override bp, we won't have run
2948 		 * the transform stack, so we won't have the data we need to
2949 		 * generate a child i/o.  So, toss the override bp and restart.
2950 		 * This is safe, because using the override bp is just an
2951 		 * optimization; and it's rare, so the cost doesn't matter.
2952 		 */
2953 		if (zio->io_bp_override) {
2954 			zio_pop_transforms(zio);
2955 			zio->io_stage = ZIO_STAGE_OPEN;
2956 			zio->io_pipeline = ZIO_WRITE_PIPELINE;
2957 			zio->io_bp_override = NULL;
2958 			BP_ZERO(bp);
2959 			ddt_exit(ddt);
2960 			return (ZIO_PIPELINE_CONTINUE);
2961 		}
2962 
2963 		dio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
2964 		    zio->io_orig_size, zio->io_orig_size, &czp, NULL, NULL,
2965 		    NULL, zio_ddt_ditto_write_done, dde, zio->io_priority,
2966 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2967 
2968 		zio_push_transform(dio, zio->io_abd, zio->io_size, 0, NULL);
2969 		dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
2970 	}
2971 
2972 	if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
2973 		if (ddp->ddp_phys_birth != 0)
2974 			ddt_bp_fill(ddp, bp, txg);
2975 		if (dde->dde_lead_zio[p] != NULL)
2976 			zio_add_child(zio, dde->dde_lead_zio[p]);
2977 		else
2978 			ddt_phys_addref(ddp);
2979 	} else if (zio->io_bp_override) {
2980 		ASSERT(bp->blk_birth == txg);
2981 		ASSERT(BP_EQUAL(bp, zio->io_bp_override));
2982 		ddt_phys_fill(ddp, bp);
2983 		ddt_phys_addref(ddp);
2984 	} else {
2985 		cio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
2986 		    zio->io_orig_size, zio->io_orig_size, zp,
2987 		    zio_ddt_child_write_ready, NULL, NULL,
2988 		    zio_ddt_child_write_done, dde, zio->io_priority,
2989 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2990 
2991 		zio_push_transform(cio, zio->io_abd, zio->io_size, 0, NULL);
2992 		dde->dde_lead_zio[p] = cio;
2993 	}
2994 
2995 	ddt_exit(ddt);
2996 
2997 	if (cio)
2998 		zio_nowait(cio);
2999 	if (dio)
3000 		zio_nowait(dio);
3001 
3002 	return (ZIO_PIPELINE_CONTINUE);
3003 }
3004 
3005 ddt_entry_t *freedde; /* for debugging */
3006 
3007 static int
3008 zio_ddt_free(zio_t *zio)
3009 {
3010 	spa_t *spa = zio->io_spa;
3011 	blkptr_t *bp = zio->io_bp;
3012 	ddt_t *ddt = ddt_select(spa, bp);
3013 	ddt_entry_t *dde;
3014 	ddt_phys_t *ddp;
3015 
3016 	ASSERT(BP_GET_DEDUP(bp));
3017 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3018 
3019 	ddt_enter(ddt);
3020 	freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
3021 	ddp = ddt_phys_select(dde, bp);
3022 	ddt_phys_decref(ddp);
3023 	ddt_exit(ddt);
3024 
3025 	return (ZIO_PIPELINE_CONTINUE);
3026 }
3027 
3028 /*
3029  * ==========================================================================
3030  * Allocate and free blocks
3031  * ==========================================================================
3032  */
3033 
3034 static zio_t *
3035 zio_io_to_allocate(spa_t *spa, int allocator)
3036 {
3037 	zio_t *zio;
3038 
3039 	ASSERT(MUTEX_HELD(&spa->spa_alloc_locks[allocator]));
3040 
3041 	zio = avl_first(&spa->spa_alloc_trees[allocator]);
3042 	if (zio == NULL)
3043 		return (NULL);
3044 
3045 	ASSERT(IO_IS_ALLOCATING(zio));
3046 
3047 	/*
3048 	 * Try to place a reservation for this zio. If we're unable to
3049 	 * reserve then we throttle.
3050 	 */
3051 	ASSERT3U(zio->io_allocator, ==, allocator);
3052 	if (!metaslab_class_throttle_reserve(zio->io_metaslab_class,
3053 	    zio->io_prop.zp_copies, zio->io_allocator, zio, 0)) {
3054 		return (NULL);
3055 	}
3056 
3057 	avl_remove(&spa->spa_alloc_trees[allocator], zio);
3058 	ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
3059 
3060 	return (zio);
3061 }
3062 
3063 static int
3064 zio_dva_throttle(zio_t *zio)
3065 {
3066 	spa_t *spa = zio->io_spa;
3067 	zio_t *nio;
3068 	metaslab_class_t *mc;
3069 
3070 	/* locate an appropriate allocation class */
3071 	mc = spa_preferred_class(spa, zio->io_size, zio->io_prop.zp_type,
3072 	    zio->io_prop.zp_level, zio->io_prop.zp_zpl_smallblk);
3073 
3074 	if (zio->io_priority == ZIO_PRIORITY_SYNC_WRITE ||
3075 	    !mc->mc_alloc_throttle_enabled ||
3076 	    zio->io_child_type == ZIO_CHILD_GANG ||
3077 	    zio->io_flags & ZIO_FLAG_NODATA) {
3078 		return (ZIO_PIPELINE_CONTINUE);
3079 	}
3080 
3081 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
3082 
3083 	ASSERT3U(zio->io_queued_timestamp, >, 0);
3084 	ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
3085 
3086 	zbookmark_phys_t *bm = &zio->io_bookmark;
3087 	/*
3088 	 * We want to try to use as many allocators as possible to help improve
3089 	 * performance, but we also want logically adjacent IOs to be physically
3090 	 * adjacent to improve sequential read performance. We chunk each object
3091 	 * into 2^20 block regions, and then hash based on the objset, object,
3092 	 * level, and region to accomplish both of these goals.
3093 	 */
3094 	zio->io_allocator = cityhash4(bm->zb_objset, bm->zb_object,
3095 	    bm->zb_level, bm->zb_blkid >> 20) % spa->spa_alloc_count;
3096 	mutex_enter(&spa->spa_alloc_locks[zio->io_allocator]);
3097 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
3098 	zio->io_metaslab_class = mc;
3099 	avl_add(&spa->spa_alloc_trees[zio->io_allocator], zio);
3100 	nio = zio_io_to_allocate(spa, zio->io_allocator);
3101 	mutex_exit(&spa->spa_alloc_locks[zio->io_allocator]);
3102 
3103 	if (nio == zio)
3104 		return (ZIO_PIPELINE_CONTINUE);
3105 
3106 	if (nio != NULL) {
3107 		ASSERT(nio->io_stage == ZIO_STAGE_DVA_THROTTLE);
3108 		/*
3109 		 * We are passing control to a new zio so make sure that
3110 		 * it is processed by a different thread. We do this to
3111 		 * avoid stack overflows that can occur when parents are
3112 		 * throttled and children are making progress. We allow
3113 		 * it to go to the head of the taskq since it's already
3114 		 * been waiting.
3115 		 */
3116 		zio_taskq_dispatch(nio, ZIO_TASKQ_ISSUE, B_TRUE);
3117 	}
3118 	return (ZIO_PIPELINE_STOP);
3119 }
3120 
3121 static void
3122 zio_allocate_dispatch(spa_t *spa, int allocator)
3123 {
3124 	zio_t *zio;
3125 
3126 	mutex_enter(&spa->spa_alloc_locks[allocator]);
3127 	zio = zio_io_to_allocate(spa, allocator);
3128 	mutex_exit(&spa->spa_alloc_locks[allocator]);
3129 	if (zio == NULL)
3130 		return;
3131 
3132 	ASSERT3U(zio->io_stage, ==, ZIO_STAGE_DVA_THROTTLE);
3133 	ASSERT0(zio->io_error);
3134 	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_TRUE);
3135 }
3136 
3137 static int
3138 zio_dva_allocate(zio_t *zio)
3139 {
3140 	spa_t *spa = zio->io_spa;
3141 	metaslab_class_t *mc;
3142 	blkptr_t *bp = zio->io_bp;
3143 	int error;
3144 	int flags = 0;
3145 
3146 	if (zio->io_gang_leader == NULL) {
3147 		ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
3148 		zio->io_gang_leader = zio;
3149 	}
3150 
3151 	ASSERT(BP_IS_HOLE(bp));
3152 	ASSERT0(BP_GET_NDVAS(bp));
3153 	ASSERT3U(zio->io_prop.zp_copies, >, 0);
3154 	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
3155 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
3156 
3157 	if (zio->io_flags & ZIO_FLAG_NODATA)
3158 		flags |= METASLAB_DONT_THROTTLE;
3159 	if (zio->io_flags & ZIO_FLAG_GANG_CHILD)
3160 		flags |= METASLAB_GANG_CHILD;
3161 	if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE)
3162 		flags |= METASLAB_ASYNC_ALLOC;
3163 
3164 	/*
3165 	 * if not already chosen, locate an appropriate allocation class
3166 	 */
3167 	mc = zio->io_metaslab_class;
3168 	if (mc == NULL) {
3169 		mc = spa_preferred_class(spa, zio->io_size,
3170 		    zio->io_prop.zp_type, zio->io_prop.zp_level,
3171 		    zio->io_prop.zp_zpl_smallblk);
3172 		zio->io_metaslab_class = mc;
3173 	}
3174 
3175 	error = metaslab_alloc(spa, mc, zio->io_size, bp,
3176 	    zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
3177 	    &zio->io_alloc_list, zio, zio->io_allocator);
3178 
3179 	/*
3180 	 * Fallback to normal class when an alloc class is full
3181 	 */
3182 	if (error == ENOSPC && mc != spa_normal_class(spa)) {
3183 		/*
3184 		 * If throttling, transfer reservation over to normal class.
3185 		 * The io_allocator slot can remain the same even though we
3186 		 * are switching classes.
3187 		 */
3188 		if (mc->mc_alloc_throttle_enabled &&
3189 		    (zio->io_flags & ZIO_FLAG_IO_ALLOCATING)) {
3190 			metaslab_class_throttle_unreserve(mc,
3191 			    zio->io_prop.zp_copies, zio->io_allocator, zio);
3192 			zio->io_flags &= ~ZIO_FLAG_IO_ALLOCATING;
3193 
3194 			mc = spa_normal_class(spa);
3195 			VERIFY(metaslab_class_throttle_reserve(mc,
3196 			    zio->io_prop.zp_copies, zio->io_allocator, zio,
3197 			    flags | METASLAB_MUST_RESERVE));
3198 		} else {
3199 			mc = spa_normal_class(spa);
3200 		}
3201 		zio->io_metaslab_class = mc;
3202 
3203 		error = metaslab_alloc(spa, mc, zio->io_size, bp,
3204 		    zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
3205 		    &zio->io_alloc_list, zio, zio->io_allocator);
3206 	}
3207 
3208 	if (error != 0) {
3209 		zfs_dbgmsg("%s: metaslab allocation failure: zio %p, "
3210 		    "size %llu, error %d", spa_name(spa), zio, zio->io_size,
3211 		    error);
3212 		if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
3213 			return (zio_write_gang_block(zio));
3214 		zio->io_error = error;
3215 	}
3216 
3217 	return (ZIO_PIPELINE_CONTINUE);
3218 }
3219 
3220 static int
3221 zio_dva_free(zio_t *zio)
3222 {
3223 	metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
3224 
3225 	return (ZIO_PIPELINE_CONTINUE);
3226 }
3227 
3228 static int
3229 zio_dva_claim(zio_t *zio)
3230 {
3231 	int error;
3232 
3233 	error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
3234 	if (error)
3235 		zio->io_error = error;
3236 
3237 	return (ZIO_PIPELINE_CONTINUE);
3238 }
3239 
3240 /*
3241  * Undo an allocation.  This is used by zio_done() when an I/O fails
3242  * and we want to give back the block we just allocated.
3243  * This handles both normal blocks and gang blocks.
3244  */
3245 static void
3246 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
3247 {
3248 	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
3249 	ASSERT(zio->io_bp_override == NULL);
3250 
3251 	if (!BP_IS_HOLE(bp))
3252 		metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
3253 
3254 	if (gn != NULL) {
3255 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
3256 			zio_dva_unallocate(zio, gn->gn_child[g],
3257 			    &gn->gn_gbh->zg_blkptr[g]);
3258 		}
3259 	}
3260 }
3261 
3262 /*
3263  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
3264  */
3265 int
3266 zio_alloc_zil(spa_t *spa, objset_t *os, uint64_t txg, blkptr_t *new_bp,
3267     blkptr_t *old_bp, uint64_t size, boolean_t *slog)
3268 {
3269 	int error = 1;
3270 	zio_alloc_list_t io_alloc_list;
3271 
3272 	ASSERT(txg > spa_syncing_txg(spa));
3273 
3274 	metaslab_trace_init(&io_alloc_list);
3275 
3276 	/*
3277 	 * Block pointer fields are useful to metaslabs for stats and debugging.
3278 	 * Fill in the obvious ones before calling into metaslab_alloc().
3279 	 */
3280 	BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
3281 	BP_SET_PSIZE(new_bp, size);
3282 	BP_SET_LEVEL(new_bp, 0);
3283 
3284 	/*
3285 	 * When allocating a zil block, we don't have information about
3286 	 * the final destination of the block except the objset it's part
3287 	 * of, so we just hash the objset ID to pick the allocator to get
3288 	 * some parallelism.
3289 	 */
3290 	error = metaslab_alloc(spa, spa_log_class(spa), size, new_bp, 1,
3291 	    txg, old_bp, METASLAB_HINTBP_AVOID, &io_alloc_list, NULL,
3292 	    cityhash4(0, 0, 0,
3293 	    os->os_dsl_dataset->ds_object) % spa->spa_alloc_count);
3294 	if (error == 0) {
3295 		*slog = TRUE;
3296 	} else {
3297 		error = metaslab_alloc(spa, spa_normal_class(spa), size,
3298 		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID,
3299 		    &io_alloc_list, NULL, cityhash4(0, 0, 0,
3300 		    os->os_dsl_dataset->ds_object) % spa->spa_alloc_count);
3301 		if (error == 0)
3302 			*slog = FALSE;
3303 	}
3304 	metaslab_trace_fini(&io_alloc_list);
3305 
3306 	if (error == 0) {
3307 		BP_SET_LSIZE(new_bp, size);
3308 		BP_SET_PSIZE(new_bp, size);
3309 		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
3310 		BP_SET_CHECKSUM(new_bp,
3311 		    spa_version(spa) >= SPA_VERSION_SLIM_ZIL
3312 		    ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
3313 		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
3314 		BP_SET_LEVEL(new_bp, 0);
3315 		BP_SET_DEDUP(new_bp, 0);
3316 		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
3317 
3318 		/*
3319 		 * encrypted blocks will require an IV and salt. We generate
3320 		 * these now since we will not be rewriting the bp at
3321 		 * rewrite time.
3322 		 */
3323 		if (os->os_encrypted) {
3324 			uint8_t iv[ZIO_DATA_IV_LEN];
3325 			uint8_t salt[ZIO_DATA_SALT_LEN];
3326 
3327 			BP_SET_CRYPT(new_bp, B_TRUE);
3328 			VERIFY0(spa_crypt_get_salt(spa,
3329 			    dmu_objset_id(os), salt));
3330 			VERIFY0(zio_crypt_generate_iv(iv));
3331 
3332 			zio_crypt_encode_params_bp(new_bp, salt, iv);
3333 		}
3334 	} else {
3335 		zfs_dbgmsg("%s: zil block allocation failure: "
3336 		    "size %llu, error %d", spa_name(spa), size, error);
3337 	}
3338 
3339 	return (error);
3340 }
3341 
3342 /*
3343  * ==========================================================================
3344  * Read and write to physical devices
3345  * ==========================================================================
3346  */
3347 
3348 
3349 /*
3350  * Issue an I/O to the underlying vdev. Typically the issue pipeline
3351  * stops after this stage and will resume upon I/O completion.
3352  * However, there are instances where the vdev layer may need to
3353  * continue the pipeline when an I/O was not issued. Since the I/O
3354  * that was sent to the vdev layer might be different than the one
3355  * currently active in the pipeline (see vdev_queue_io()), we explicitly
3356  * force the underlying vdev layers to call either zio_execute() or
3357  * zio_interrupt() to ensure that the pipeline continues with the correct I/O.
3358  */
3359 static int
3360 zio_vdev_io_start(zio_t *zio)
3361 {
3362 	vdev_t *vd = zio->io_vd;
3363 	uint64_t align;
3364 	spa_t *spa = zio->io_spa;
3365 
3366 	ASSERT(zio->io_error == 0);
3367 	ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
3368 
3369 	if (vd == NULL) {
3370 		if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
3371 			spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
3372 
3373 		/*
3374 		 * The mirror_ops handle multiple DVAs in a single BP.
3375 		 */
3376 		vdev_mirror_ops.vdev_op_io_start(zio);
3377 		return (ZIO_PIPELINE_STOP);
3378 	}
3379 
3380 	ASSERT3P(zio->io_logical, !=, zio);
3381 	if (zio->io_type == ZIO_TYPE_WRITE) {
3382 		ASSERT(spa->spa_trust_config);
3383 
3384 		/*
3385 		 * Note: the code can handle other kinds of writes,
3386 		 * but we don't expect them.
3387 		 */
3388 		if (zio->io_vd->vdev_removing) {
3389 			ASSERT(zio->io_flags &
3390 			    (ZIO_FLAG_PHYSICAL | ZIO_FLAG_SELF_HEAL |
3391 			    ZIO_FLAG_RESILVER | ZIO_FLAG_INDUCE_DAMAGE));
3392 		}
3393 	}
3394 
3395 	align = 1ULL << vd->vdev_top->vdev_ashift;
3396 
3397 	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
3398 	    P2PHASE(zio->io_size, align) != 0) {
3399 		/* Transform logical writes to be a full physical block size. */
3400 		uint64_t asize = P2ROUNDUP(zio->io_size, align);
3401 		abd_t *abuf = abd_alloc_sametype(zio->io_abd, asize);
3402 		ASSERT(vd == vd->vdev_top);
3403 		if (zio->io_type == ZIO_TYPE_WRITE) {
3404 			abd_copy(abuf, zio->io_abd, zio->io_size);
3405 			abd_zero_off(abuf, zio->io_size, asize - zio->io_size);
3406 		}
3407 		zio_push_transform(zio, abuf, asize, asize, zio_subblock);
3408 	}
3409 
3410 	/*
3411 	 * If this is not a physical io, make sure that it is properly aligned
3412 	 * before proceeding.
3413 	 */
3414 	if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
3415 		ASSERT0(P2PHASE(zio->io_offset, align));
3416 		ASSERT0(P2PHASE(zio->io_size, align));
3417 	} else {
3418 		/*
3419 		 * For physical writes, we allow 512b aligned writes and assume
3420 		 * the device will perform a read-modify-write as necessary.
3421 		 */
3422 		ASSERT0(P2PHASE(zio->io_offset, SPA_MINBLOCKSIZE));
3423 		ASSERT0(P2PHASE(zio->io_size, SPA_MINBLOCKSIZE));
3424 	}
3425 
3426 	VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
3427 
3428 	/*
3429 	 * If this is a repair I/O, and there's no self-healing involved --
3430 	 * that is, we're just resilvering what we expect to resilver --
3431 	 * then don't do the I/O unless zio's txg is actually in vd's DTL.
3432 	 * This prevents spurious resilvering.
3433 	 *
3434 	 * There are a few ways that we can end up creating these spurious
3435 	 * resilver i/os:
3436 	 *
3437 	 * 1. A resilver i/o will be issued if any DVA in the BP has a
3438 	 * dirty DTL.  The mirror code will issue resilver writes to
3439 	 * each DVA, including the one(s) that are not on vdevs with dirty
3440 	 * DTLs.
3441 	 *
3442 	 * 2. With nested replication, which happens when we have a
3443 	 * "replacing" or "spare" vdev that's a child of a mirror or raidz.
3444 	 * For example, given mirror(replacing(A+B), C), it's likely that
3445 	 * only A is out of date (it's the new device). In this case, we'll
3446 	 * read from C, then use the data to resilver A+B -- but we don't
3447 	 * actually want to resilver B, just A. The top-level mirror has no
3448 	 * way to know this, so instead we just discard unnecessary repairs
3449 	 * as we work our way down the vdev tree.
3450 	 *
3451 	 * 3. ZTEST also creates mirrors of mirrors, mirrors of raidz, etc.
3452 	 * The same logic applies to any form of nested replication: ditto
3453 	 * + mirror, RAID-Z + replacing, etc.
3454 	 *
3455 	 * However, indirect vdevs point off to other vdevs which may have
3456 	 * DTL's, so we never bypass them.  The child i/os on concrete vdevs
3457 	 * will be properly bypassed instead.
3458 	 */
3459 	if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
3460 	    !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
3461 	    zio->io_txg != 0 &&	/* not a delegated i/o */
3462 	    vd->vdev_ops != &vdev_indirect_ops &&
3463 	    !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
3464 		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
3465 		zio_vdev_io_bypass(zio);
3466 		return (ZIO_PIPELINE_CONTINUE);
3467 	}
3468 
3469 	if (vd->vdev_ops->vdev_op_leaf &&
3470 	    (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
3471 
3472 		if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio))
3473 			return (ZIO_PIPELINE_CONTINUE);
3474 
3475 		if ((zio = vdev_queue_io(zio)) == NULL)
3476 			return (ZIO_PIPELINE_STOP);
3477 
3478 		if (!vdev_accessible(vd, zio)) {
3479 			zio->io_error = SET_ERROR(ENXIO);
3480 			zio_interrupt(zio);
3481 			return (ZIO_PIPELINE_STOP);
3482 		}
3483 	}
3484 
3485 	vd->vdev_ops->vdev_op_io_start(zio);
3486 	return (ZIO_PIPELINE_STOP);
3487 }
3488 
3489 static int
3490 zio_vdev_io_done(zio_t *zio)
3491 {
3492 	vdev_t *vd = zio->io_vd;
3493 	vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
3494 	boolean_t unexpected_error = B_FALSE;
3495 
3496 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV_BIT, ZIO_WAIT_DONE)) {
3497 		return (ZIO_PIPELINE_STOP);
3498 	}
3499 
3500 	ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
3501 
3502 	if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
3503 
3504 		vdev_queue_io_done(zio);
3505 
3506 		if (zio->io_type == ZIO_TYPE_WRITE)
3507 			vdev_cache_write(zio);
3508 
3509 		if (zio_injection_enabled && zio->io_error == 0)
3510 			zio->io_error = zio_handle_device_injection(vd,
3511 			    zio, EIO);
3512 
3513 		if (zio_injection_enabled && zio->io_error == 0)
3514 			zio->io_error = zio_handle_label_injection(zio, EIO);
3515 
3516 		if (zio->io_error) {
3517 			if (!vdev_accessible(vd, zio)) {
3518 				zio->io_error = SET_ERROR(ENXIO);
3519 			} else {
3520 				unexpected_error = B_TRUE;
3521 			}
3522 		}
3523 	}
3524 
3525 	ops->vdev_op_io_done(zio);
3526 
3527 	if (unexpected_error)
3528 		VERIFY(vdev_probe(vd, zio) == NULL);
3529 
3530 	return (ZIO_PIPELINE_CONTINUE);
3531 }
3532 
3533 /*
3534  * This function is used to change the priority of an existing zio that is
3535  * currently in-flight. This is used by the arc to upgrade priority in the
3536  * event that a demand read is made for a block that is currently queued
3537  * as a scrub or async read IO. Otherwise, the high priority read request
3538  * would end up having to wait for the lower priority IO.
3539  */
3540 void
3541 zio_change_priority(zio_t *pio, zio_priority_t priority)
3542 {
3543 	zio_t *cio, *cio_next;
3544 	zio_link_t *zl = NULL;
3545 
3546 	ASSERT3U(priority, <, ZIO_PRIORITY_NUM_QUEUEABLE);
3547 
3548 	if (pio->io_vd != NULL && pio->io_vd->vdev_ops->vdev_op_leaf) {
3549 		vdev_queue_change_io_priority(pio, priority);
3550 	} else {
3551 		pio->io_priority = priority;
3552 	}
3553 
3554 	mutex_enter(&pio->io_lock);
3555 	for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
3556 		cio_next = zio_walk_children(pio, &zl);
3557 		zio_change_priority(cio, priority);
3558 	}
3559 	mutex_exit(&pio->io_lock);
3560 }
3561 
3562 /*
3563  * For non-raidz ZIOs, we can just copy aside the bad data read from the
3564  * disk, and use that to finish the checksum ereport later.
3565  */
3566 static void
3567 zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
3568     const abd_t *good_buf)
3569 {
3570 	/* no processing needed */
3571 	zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
3572 }
3573 
3574 /*ARGSUSED*/
3575 void
3576 zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
3577 {
3578 	void *abd = abd_alloc_sametype(zio->io_abd, zio->io_size);
3579 
3580 	abd_copy(abd, zio->io_abd, zio->io_size);
3581 
3582 	zcr->zcr_cbinfo = zio->io_size;
3583 	zcr->zcr_cbdata = abd;
3584 	zcr->zcr_finish = zio_vsd_default_cksum_finish;
3585 	zcr->zcr_free = zio_abd_free;
3586 }
3587 
3588 static int
3589 zio_vdev_io_assess(zio_t *zio)
3590 {
3591 	vdev_t *vd = zio->io_vd;
3592 
3593 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV_BIT, ZIO_WAIT_DONE)) {
3594 		return (ZIO_PIPELINE_STOP);
3595 	}
3596 
3597 	if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
3598 		spa_config_exit(zio->io_spa, SCL_ZIO, zio);
3599 
3600 	if (zio->io_vsd != NULL) {
3601 		zio->io_vsd_ops->vsd_free(zio);
3602 		zio->io_vsd = NULL;
3603 	}
3604 
3605 	if (zio_injection_enabled && zio->io_error == 0)
3606 		zio->io_error = zio_handle_fault_injection(zio, EIO);
3607 
3608 	/*
3609 	 * If the I/O failed, determine whether we should attempt to retry it.
3610 	 *
3611 	 * On retry, we cut in line in the issue queue, since we don't want
3612 	 * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
3613 	 */
3614 	if (zio->io_error && vd == NULL &&
3615 	    !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
3616 		ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE));	/* not a leaf */
3617 		ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));	/* not a leaf */
3618 		zio->io_error = 0;
3619 		zio->io_flags |= ZIO_FLAG_IO_RETRY |
3620 		    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
3621 		zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
3622 		zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
3623 		    zio_requeue_io_start_cut_in_line);
3624 		return (ZIO_PIPELINE_STOP);
3625 	}
3626 
3627 	/*
3628 	 * If we got an error on a leaf device, convert it to ENXIO
3629 	 * if the device is not accessible at all.
3630 	 */
3631 	if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3632 	    !vdev_accessible(vd, zio))
3633 		zio->io_error = SET_ERROR(ENXIO);
3634 
3635 	/*
3636 	 * If we can't write to an interior vdev (mirror or RAID-Z),
3637 	 * set vdev_cant_write so that we stop trying to allocate from it.
3638 	 */
3639 	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
3640 	    vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
3641 		vd->vdev_cant_write = B_TRUE;
3642 	}
3643 
3644 	/*
3645 	 * If a cache flush returns ENOTSUP or ENOTTY, we know that no future
3646 	 * attempts will ever succeed. In this case we set a persistent bit so
3647 	 * that we don't bother with it in the future.
3648 	 */
3649 	if ((zio->io_error == ENOTSUP || zio->io_error == ENOTTY) &&
3650 	    zio->io_type == ZIO_TYPE_IOCTL &&
3651 	    zio->io_cmd == DKIOCFLUSHWRITECACHE && vd != NULL)
3652 		vd->vdev_nowritecache = B_TRUE;
3653 
3654 	if (zio->io_error)
3655 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3656 
3657 	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3658 	    zio->io_physdone != NULL) {
3659 		ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
3660 		ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
3661 		zio->io_physdone(zio->io_logical);
3662 	}
3663 
3664 	return (ZIO_PIPELINE_CONTINUE);
3665 }
3666 
3667 void
3668 zio_vdev_io_reissue(zio_t *zio)
3669 {
3670 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
3671 	ASSERT(zio->io_error == 0);
3672 
3673 	zio->io_stage >>= 1;
3674 }
3675 
3676 void
3677 zio_vdev_io_redone(zio_t *zio)
3678 {
3679 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
3680 
3681 	zio->io_stage >>= 1;
3682 }
3683 
3684 void
3685 zio_vdev_io_bypass(zio_t *zio)
3686 {
3687 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
3688 	ASSERT(zio->io_error == 0);
3689 
3690 	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
3691 	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
3692 }
3693 
3694 /*
3695  * ==========================================================================
3696  * Encrypt and store encryption parameters
3697  * ==========================================================================
3698  */
3699 
3700 
3701 /*
3702  * This function is used for ZIO_STAGE_ENCRYPT. It is responsible for
3703  * managing the storage of encryption parameters and passing them to the
3704  * lower-level encryption functions.
3705  */
3706 static int
3707 zio_encrypt(zio_t *zio)
3708 {
3709 	zio_prop_t *zp = &zio->io_prop;
3710 	spa_t *spa = zio->io_spa;
3711 	blkptr_t *bp = zio->io_bp;
3712 	uint64_t psize = BP_GET_PSIZE(bp);
3713 	uint64_t dsobj = zio->io_bookmark.zb_objset;
3714 	dmu_object_type_t ot = BP_GET_TYPE(bp);
3715 	void *enc_buf = NULL;
3716 	abd_t *eabd = NULL;
3717 	uint8_t salt[ZIO_DATA_SALT_LEN];
3718 	uint8_t iv[ZIO_DATA_IV_LEN];
3719 	uint8_t mac[ZIO_DATA_MAC_LEN];
3720 	boolean_t no_crypt = B_FALSE;
3721 
3722 	/* the root zio already encrypted the data */
3723 	if (zio->io_child_type == ZIO_CHILD_GANG)
3724 		return (ZIO_PIPELINE_CONTINUE);
3725 
3726 	/* only ZIL blocks are re-encrypted on rewrite */
3727 	if (!IO_IS_ALLOCATING(zio) && ot != DMU_OT_INTENT_LOG)
3728 		return (ZIO_PIPELINE_CONTINUE);
3729 
3730 	if (!(zp->zp_encrypt || BP_IS_ENCRYPTED(bp))) {
3731 		BP_SET_CRYPT(bp, B_FALSE);
3732 		return (ZIO_PIPELINE_CONTINUE);
3733 	}
3734 
3735 	/* if we are doing raw encryption set the provided encryption params */
3736 	if (zio->io_flags & ZIO_FLAG_RAW_ENCRYPT) {
3737 		ASSERT0(BP_GET_LEVEL(bp));
3738 		BP_SET_CRYPT(bp, B_TRUE);
3739 		BP_SET_BYTEORDER(bp, zp->zp_byteorder);
3740 		if (ot != DMU_OT_OBJSET)
3741 			zio_crypt_encode_mac_bp(bp, zp->zp_mac);
3742 
3743 		/* dnode blocks must be written out in the provided byteorder */
3744 		if (zp->zp_byteorder != ZFS_HOST_BYTEORDER &&
3745 		    ot == DMU_OT_DNODE) {
3746 			void *bswap_buf = zio_buf_alloc(psize);
3747 			abd_t *babd = abd_get_from_buf(bswap_buf, psize);
3748 
3749 			ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
3750 			abd_copy_to_buf(bswap_buf, zio->io_abd, psize);
3751 			dmu_ot_byteswap[DMU_OT_BYTESWAP(ot)].ob_func(bswap_buf,
3752 			    psize);
3753 
3754 			abd_take_ownership_of_buf(babd, B_TRUE);
3755 			zio_push_transform(zio, babd, psize, psize, NULL);
3756 		}
3757 
3758 		if (DMU_OT_IS_ENCRYPTED(ot))
3759 			zio_crypt_encode_params_bp(bp, zp->zp_salt, zp->zp_iv);
3760 		return (ZIO_PIPELINE_CONTINUE);
3761 	}
3762 
3763 	/* indirect blocks only maintain a cksum of the lower level MACs */
3764 	if (BP_GET_LEVEL(bp) > 0) {
3765 		BP_SET_CRYPT(bp, B_TRUE);
3766 		VERIFY0(zio_crypt_do_indirect_mac_checksum_abd(B_TRUE,
3767 		    zio->io_orig_abd, BP_GET_LSIZE(bp), BP_SHOULD_BYTESWAP(bp),
3768 		    mac));
3769 		zio_crypt_encode_mac_bp(bp, mac);
3770 		return (ZIO_PIPELINE_CONTINUE);
3771 	}
3772 
3773 	/*
3774 	 * Objset blocks are a special case since they have 2 256-bit MACs
3775 	 * embedded within them.
3776 	 */
3777 	if (ot == DMU_OT_OBJSET) {
3778 		ASSERT0(DMU_OT_IS_ENCRYPTED(ot));
3779 		ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
3780 		BP_SET_CRYPT(bp, B_TRUE);
3781 		VERIFY0(spa_do_crypt_objset_mac_abd(B_TRUE, spa, dsobj,
3782 		    zio->io_abd, psize, BP_SHOULD_BYTESWAP(bp)));
3783 		return (ZIO_PIPELINE_CONTINUE);
3784 	}
3785 
3786 	/* unencrypted object types are only authenticated with a MAC */
3787 	if (!DMU_OT_IS_ENCRYPTED(ot)) {
3788 		BP_SET_CRYPT(bp, B_TRUE);
3789 		VERIFY0(spa_do_crypt_mac_abd(B_TRUE, spa, dsobj,
3790 		    zio->io_abd, psize, mac));
3791 		zio_crypt_encode_mac_bp(bp, mac);
3792 		return (ZIO_PIPELINE_CONTINUE);
3793 	}
3794 
3795 	/*
3796 	 * Later passes of sync-to-convergence may decide to rewrite data
3797 	 * in place to avoid more disk reallocations. This presents a problem
3798 	 * for encryption because this consitutes rewriting the new data with
3799 	 * the same encryption key and IV. However, this only applies to blocks
3800 	 * in the MOS (particularly the spacemaps) and we do not encrypt the
3801 	 * MOS. We assert that the zio is allocating or an intent log write
3802 	 * to enforce this.
3803 	 */
3804 	ASSERT(IO_IS_ALLOCATING(zio) || ot == DMU_OT_INTENT_LOG);
3805 	ASSERT(BP_GET_LEVEL(bp) == 0 || ot == DMU_OT_INTENT_LOG);
3806 	ASSERT(spa_feature_is_active(spa, SPA_FEATURE_ENCRYPTION));
3807 	ASSERT3U(psize, !=, 0);
3808 
3809 	enc_buf = zio_buf_alloc(psize);
3810 	eabd = abd_get_from_buf(enc_buf, psize);
3811 	abd_take_ownership_of_buf(eabd, B_TRUE);
3812 
3813 	/*
3814 	 * For an explanation of what encryption parameters are stored
3815 	 * where, see the block comment in zio_crypt.c.
3816 	 */
3817 	if (ot == DMU_OT_INTENT_LOG) {
3818 		zio_crypt_decode_params_bp(bp, salt, iv);
3819 	} else {
3820 		BP_SET_CRYPT(bp, B_TRUE);
3821 	}
3822 
3823 	/* Perform the encryption. This should not fail */
3824 	VERIFY0(spa_do_crypt_abd(B_TRUE, spa, &zio->io_bookmark,
3825 	    BP_GET_TYPE(bp), BP_GET_DEDUP(bp), BP_SHOULD_BYTESWAP(bp),
3826 	    salt, iv, mac, psize, zio->io_abd, eabd, &no_crypt));
3827 
3828 	/* encode encryption metadata into the bp */
3829 	if (ot == DMU_OT_INTENT_LOG) {
3830 		/*
3831 		 * ZIL blocks store the MAC in the embedded checksum, so the
3832 		 * transform must always be applied.
3833 		 */
3834 		zio_crypt_encode_mac_zil(enc_buf, mac);
3835 		zio_push_transform(zio, eabd, psize, psize, NULL);
3836 	} else {
3837 		BP_SET_CRYPT(bp, B_TRUE);
3838 		zio_crypt_encode_params_bp(bp, salt, iv);
3839 		zio_crypt_encode_mac_bp(bp, mac);
3840 
3841 		if (no_crypt) {
3842 			ASSERT3U(ot, ==, DMU_OT_DNODE);
3843 			abd_free(eabd);
3844 		} else {
3845 			zio_push_transform(zio, eabd, psize, psize, NULL);
3846 		}
3847 	}
3848 
3849 	return (ZIO_PIPELINE_CONTINUE);
3850 }
3851 
3852 /*
3853  * ==========================================================================
3854  * Generate and verify checksums
3855  * ==========================================================================
3856  */
3857 static int
3858 zio_checksum_generate(zio_t *zio)
3859 {
3860 	blkptr_t *bp = zio->io_bp;
3861 	enum zio_checksum checksum;
3862 
3863 	if (bp == NULL) {
3864 		/*
3865 		 * This is zio_write_phys().
3866 		 * We're either generating a label checksum, or none at all.
3867 		 */
3868 		checksum = zio->io_prop.zp_checksum;
3869 
3870 		if (checksum == ZIO_CHECKSUM_OFF)
3871 			return (ZIO_PIPELINE_CONTINUE);
3872 
3873 		ASSERT(checksum == ZIO_CHECKSUM_LABEL);
3874 	} else {
3875 		if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
3876 			ASSERT(!IO_IS_ALLOCATING(zio));
3877 			checksum = ZIO_CHECKSUM_GANG_HEADER;
3878 		} else {
3879 			checksum = BP_GET_CHECKSUM(bp);
3880 		}
3881 	}
3882 
3883 	zio_checksum_compute(zio, checksum, zio->io_abd, zio->io_size);
3884 
3885 	return (ZIO_PIPELINE_CONTINUE);
3886 }
3887 
3888 static int
3889 zio_checksum_verify(zio_t *zio)
3890 {
3891 	zio_bad_cksum_t info;
3892 	blkptr_t *bp = zio->io_bp;
3893 	int error;
3894 
3895 	ASSERT(zio->io_vd != NULL);
3896 
3897 	if (bp == NULL) {
3898 		/*
3899 		 * This is zio_read_phys().
3900 		 * We're either verifying a label checksum, or nothing at all.
3901 		 */
3902 		if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
3903 			return (ZIO_PIPELINE_CONTINUE);
3904 
3905 		ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
3906 	}
3907 
3908 	if ((error = zio_checksum_error(zio, &info)) != 0) {
3909 		zio->io_error = error;
3910 		if (error == ECKSUM &&
3911 		    !(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
3912 			zfs_ereport_start_checksum(zio->io_spa,
3913 			    zio->io_vd, &zio->io_bookmark, zio,
3914 			    zio->io_offset, zio->io_size, NULL, &info);
3915 		}
3916 	}
3917 
3918 	return (ZIO_PIPELINE_CONTINUE);
3919 }
3920 
3921 /*
3922  * Called by RAID-Z to ensure we don't compute the checksum twice.
3923  */
3924 void
3925 zio_checksum_verified(zio_t *zio)
3926 {
3927 	zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
3928 }
3929 
3930 /*
3931  * ==========================================================================
3932  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
3933  * An error of 0 indicates success.  ENXIO indicates whole-device failure,
3934  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
3935  * indicate errors that are specific to one I/O, and most likely permanent.
3936  * Any other error is presumed to be worse because we weren't expecting it.
3937  * ==========================================================================
3938  */
3939 int
3940 zio_worst_error(int e1, int e2)
3941 {
3942 	static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
3943 	int r1, r2;
3944 
3945 	for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
3946 		if (e1 == zio_error_rank[r1])
3947 			break;
3948 
3949 	for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
3950 		if (e2 == zio_error_rank[r2])
3951 			break;
3952 
3953 	return (r1 > r2 ? e1 : e2);
3954 }
3955 
3956 /*
3957  * ==========================================================================
3958  * I/O completion
3959  * ==========================================================================
3960  */
3961 static int
3962 zio_ready(zio_t *zio)
3963 {
3964 	blkptr_t *bp = zio->io_bp;
3965 	zio_t *pio, *pio_next;
3966 	zio_link_t *zl = NULL;
3967 
3968 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG_BIT | ZIO_CHILD_DDT_BIT,
3969 	    ZIO_WAIT_READY)) {
3970 		return (ZIO_PIPELINE_STOP);
3971 	}
3972 
3973 	if (zio->io_ready) {
3974 		ASSERT(IO_IS_ALLOCATING(zio));
3975 		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
3976 		    (zio->io_flags & ZIO_FLAG_NOPWRITE));
3977 		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
3978 
3979 		zio->io_ready(zio);
3980 	}
3981 
3982 	if (bp != NULL && bp != &zio->io_bp_copy)
3983 		zio->io_bp_copy = *bp;
3984 
3985 	if (zio->io_error != 0) {
3986 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3987 
3988 		if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
3989 			ASSERT(IO_IS_ALLOCATING(zio));
3990 			ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
3991 			ASSERT(zio->io_metaslab_class != NULL);
3992 
3993 			/*
3994 			 * We were unable to allocate anything, unreserve and
3995 			 * issue the next I/O to allocate.
3996 			 */
3997 			metaslab_class_throttle_unreserve(
3998 			    zio->io_metaslab_class, zio->io_prop.zp_copies,
3999 			    zio->io_allocator, zio);
4000 			zio_allocate_dispatch(zio->io_spa, zio->io_allocator);
4001 		}
4002 	}
4003 
4004 	mutex_enter(&zio->io_lock);
4005 	zio->io_state[ZIO_WAIT_READY] = 1;
4006 	pio = zio_walk_parents(zio, &zl);
4007 	mutex_exit(&zio->io_lock);
4008 
4009 	/*
4010 	 * As we notify zio's parents, new parents could be added.
4011 	 * New parents go to the head of zio's io_parent_list, however,
4012 	 * so we will (correctly) not notify them.  The remainder of zio's
4013 	 * io_parent_list, from 'pio_next' onward, cannot change because
4014 	 * all parents must wait for us to be done before they can be done.
4015 	 */
4016 	for (; pio != NULL; pio = pio_next) {
4017 		pio_next = zio_walk_parents(zio, &zl);
4018 		zio_notify_parent(pio, zio, ZIO_WAIT_READY);
4019 	}
4020 
4021 	if (zio->io_flags & ZIO_FLAG_NODATA) {
4022 		if (BP_IS_GANG(bp)) {
4023 			zio->io_flags &= ~ZIO_FLAG_NODATA;
4024 		} else {
4025 			ASSERT((uintptr_t)zio->io_abd < SPA_MAXBLOCKSIZE);
4026 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
4027 		}
4028 	}
4029 
4030 	if (zio_injection_enabled &&
4031 	    zio->io_spa->spa_syncing_txg == zio->io_txg)
4032 		zio_handle_ignored_writes(zio);
4033 
4034 	return (ZIO_PIPELINE_CONTINUE);
4035 }
4036 
4037 /*
4038  * Update the allocation throttle accounting.
4039  */
4040 static void
4041 zio_dva_throttle_done(zio_t *zio)
4042 {
4043 	zio_t *lio = zio->io_logical;
4044 	zio_t *pio = zio_unique_parent(zio);
4045 	vdev_t *vd = zio->io_vd;
4046 	int flags = METASLAB_ASYNC_ALLOC;
4047 
4048 	ASSERT3P(zio->io_bp, !=, NULL);
4049 	ASSERT3U(zio->io_type, ==, ZIO_TYPE_WRITE);
4050 	ASSERT3U(zio->io_priority, ==, ZIO_PRIORITY_ASYNC_WRITE);
4051 	ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
4052 	ASSERT(vd != NULL);
4053 	ASSERT3P(vd, ==, vd->vdev_top);
4054 	ASSERT(!(zio->io_flags & (ZIO_FLAG_IO_REPAIR | ZIO_FLAG_IO_RETRY)));
4055 	ASSERT(zio->io_flags & ZIO_FLAG_IO_ALLOCATING);
4056 	ASSERT(!(lio->io_flags & ZIO_FLAG_IO_REWRITE));
4057 	ASSERT(!(lio->io_orig_flags & ZIO_FLAG_NODATA));
4058 
4059 	/*
4060 	 * Parents of gang children can have two flavors -- ones that
4061 	 * allocated the gang header (will have ZIO_FLAG_IO_REWRITE set)
4062 	 * and ones that allocated the constituent blocks. The allocation
4063 	 * throttle needs to know the allocating parent zio so we must find
4064 	 * it here.
4065 	 */
4066 	if (pio->io_child_type == ZIO_CHILD_GANG) {
4067 		/*
4068 		 * If our parent is a rewrite gang child then our grandparent
4069 		 * would have been the one that performed the allocation.
4070 		 */
4071 		if (pio->io_flags & ZIO_FLAG_IO_REWRITE)
4072 			pio = zio_unique_parent(pio);
4073 		flags |= METASLAB_GANG_CHILD;
4074 	}
4075 
4076 	ASSERT(IO_IS_ALLOCATING(pio));
4077 	ASSERT3P(zio, !=, zio->io_logical);
4078 	ASSERT(zio->io_logical != NULL);
4079 	ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REPAIR));
4080 	ASSERT0(zio->io_flags & ZIO_FLAG_NOPWRITE);
4081 	ASSERT(zio->io_metaslab_class != NULL);
4082 
4083 	mutex_enter(&pio->io_lock);
4084 	metaslab_group_alloc_decrement(zio->io_spa, vd->vdev_id, pio, flags,
4085 	    pio->io_allocator, B_TRUE);
4086 	mutex_exit(&pio->io_lock);
4087 
4088 	metaslab_class_throttle_unreserve(zio->io_metaslab_class, 1,
4089 	    pio->io_allocator, pio);
4090 
4091 	/*
4092 	 * Call into the pipeline to see if there is more work that
4093 	 * needs to be done. If there is work to be done it will be
4094 	 * dispatched to another taskq thread.
4095 	 */
4096 	zio_allocate_dispatch(zio->io_spa, pio->io_allocator);
4097 }
4098 
4099 static int
4100 zio_done(zio_t *zio)
4101 {
4102 	spa_t *spa = zio->io_spa;
4103 	zio_t *lio = zio->io_logical;
4104 	blkptr_t *bp = zio->io_bp;
4105 	vdev_t *vd = zio->io_vd;
4106 	uint64_t psize = zio->io_size;
4107 	zio_t *pio, *pio_next;
4108 	zio_link_t *zl = NULL;
4109 
4110 	/*
4111 	 * If our children haven't all completed,
4112 	 * wait for them and then repeat this pipeline stage.
4113 	 */
4114 	if (zio_wait_for_children(zio, ZIO_CHILD_ALL_BITS, ZIO_WAIT_DONE)) {
4115 		return (ZIO_PIPELINE_STOP);
4116 	}
4117 
4118 	/*
4119 	 * If the allocation throttle is enabled, then update the accounting.
4120 	 * We only track child I/Os that are part of an allocating async
4121 	 * write. We must do this since the allocation is performed
4122 	 * by the logical I/O but the actual write is done by child I/Os.
4123 	 */
4124 	if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING &&
4125 	    zio->io_child_type == ZIO_CHILD_VDEV) {
4126 		ASSERT(zio->io_metaslab_class != NULL);
4127 		ASSERT(zio->io_metaslab_class->mc_alloc_throttle_enabled);
4128 		zio_dva_throttle_done(zio);
4129 	}
4130 
4131 	/*
4132 	 * If the allocation throttle is enabled, verify that
4133 	 * we have decremented the refcounts for every I/O that was throttled.
4134 	 */
4135 	if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
4136 		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
4137 		ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
4138 		ASSERT(bp != NULL);
4139 
4140 		metaslab_group_alloc_verify(spa, zio->io_bp, zio,
4141 		    zio->io_allocator);
4142 		VERIFY(zfs_refcount_not_held(
4143 		    &zio->io_metaslab_class->mc_alloc_slots[zio->io_allocator],
4144 		    zio));
4145 	}
4146 
4147 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
4148 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
4149 			ASSERT(zio->io_children[c][w] == 0);
4150 
4151 	if (bp != NULL && !BP_IS_EMBEDDED(bp)) {
4152 		ASSERT(bp->blk_pad[0] == 0);
4153 		ASSERT(bp->blk_pad[1] == 0);
4154 		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
4155 		    (bp == zio_unique_parent(zio)->io_bp));
4156 		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
4157 		    zio->io_bp_override == NULL &&
4158 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
4159 			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
4160 			ASSERT(BP_COUNT_GANG(bp) == 0 ||
4161 			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
4162 		}
4163 		if (zio->io_flags & ZIO_FLAG_NOPWRITE)
4164 			VERIFY(BP_EQUAL(bp, &zio->io_bp_orig));
4165 	}
4166 
4167 	/*
4168 	 * If there were child vdev/gang/ddt errors, they apply to us now.
4169 	 */
4170 	zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
4171 	zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
4172 	zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
4173 
4174 	/*
4175 	 * If the I/O on the transformed data was successful, generate any
4176 	 * checksum reports now while we still have the transformed data.
4177 	 */
4178 	if (zio->io_error == 0) {
4179 		while (zio->io_cksum_report != NULL) {
4180 			zio_cksum_report_t *zcr = zio->io_cksum_report;
4181 			uint64_t align = zcr->zcr_align;
4182 			uint64_t asize = P2ROUNDUP(psize, align);
4183 			abd_t *adata = zio->io_abd;
4184 
4185 			if (asize != psize) {
4186 				adata = abd_alloc(asize, B_TRUE);
4187 				abd_copy(adata, zio->io_abd, psize);
4188 				abd_zero_off(adata, psize, asize - psize);
4189 			}
4190 
4191 			zio->io_cksum_report = zcr->zcr_next;
4192 			zcr->zcr_next = NULL;
4193 			zcr->zcr_finish(zcr, adata);
4194 			zfs_ereport_free_checksum(zcr);
4195 
4196 			if (asize != psize)
4197 				abd_free(adata);
4198 		}
4199 	}
4200 
4201 	zio_pop_transforms(zio);	/* note: may set zio->io_error */
4202 
4203 	vdev_stat_update(zio, psize);
4204 
4205 	if (zio->io_error) {
4206 		/*
4207 		 * If this I/O is attached to a particular vdev,
4208 		 * generate an error message describing the I/O failure
4209 		 * at the block level.  We ignore these errors if the
4210 		 * device is currently unavailable.
4211 		 */
4212 		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
4213 			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd,
4214 			    &zio->io_bookmark, zio, 0, 0);
4215 
4216 		if ((zio->io_error == EIO || !(zio->io_flags &
4217 		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
4218 		    zio == lio) {
4219 			/*
4220 			 * For logical I/O requests, tell the SPA to log the
4221 			 * error and generate a logical data ereport.
4222 			 */
4223 			spa_log_error(spa, &zio->io_bookmark);
4224 			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL,
4225 			    &zio->io_bookmark, zio, 0, 0);
4226 		}
4227 	}
4228 
4229 	if (zio->io_error && zio == lio) {
4230 		/*
4231 		 * Determine whether zio should be reexecuted.  This will
4232 		 * propagate all the way to the root via zio_notify_parent().
4233 		 */
4234 		ASSERT(vd == NULL && bp != NULL);
4235 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
4236 
4237 		if (IO_IS_ALLOCATING(zio) &&
4238 		    !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
4239 			if (zio->io_error != ENOSPC)
4240 				zio->io_reexecute |= ZIO_REEXECUTE_NOW;
4241 			else
4242 				zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
4243 		}
4244 
4245 		if ((zio->io_type == ZIO_TYPE_READ ||
4246 		    zio->io_type == ZIO_TYPE_FREE) &&
4247 		    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
4248 		    zio->io_error == ENXIO &&
4249 		    spa_load_state(spa) == SPA_LOAD_NONE &&
4250 		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
4251 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
4252 
4253 		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
4254 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
4255 
4256 		/*
4257 		 * Here is a possibly good place to attempt to do
4258 		 * either combinatorial reconstruction or error correction
4259 		 * based on checksums.  It also might be a good place
4260 		 * to send out preliminary ereports before we suspend
4261 		 * processing.
4262 		 */
4263 	}
4264 
4265 	/*
4266 	 * If there were logical child errors, they apply to us now.
4267 	 * We defer this until now to avoid conflating logical child
4268 	 * errors with errors that happened to the zio itself when
4269 	 * updating vdev stats and reporting FMA events above.
4270 	 */
4271 	zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
4272 
4273 	if ((zio->io_error || zio->io_reexecute) &&
4274 	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
4275 	    !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
4276 		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
4277 
4278 	zio_gang_tree_free(&zio->io_gang_tree);
4279 
4280 	/*
4281 	 * Godfather I/Os should never suspend.
4282 	 */
4283 	if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
4284 	    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
4285 		zio->io_reexecute = 0;
4286 
4287 	if (zio->io_reexecute) {
4288 		/*
4289 		 * This is a logical I/O that wants to reexecute.
4290 		 *
4291 		 * Reexecute is top-down.  When an i/o fails, if it's not
4292 		 * the root, it simply notifies its parent and sticks around.
4293 		 * The parent, seeing that it still has children in zio_done(),
4294 		 * does the same.  This percolates all the way up to the root.
4295 		 * The root i/o will reexecute or suspend the entire tree.
4296 		 *
4297 		 * This approach ensures that zio_reexecute() honors
4298 		 * all the original i/o dependency relationships, e.g.
4299 		 * parents not executing until children are ready.
4300 		 */
4301 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
4302 
4303 		zio->io_gang_leader = NULL;
4304 
4305 		mutex_enter(&zio->io_lock);
4306 		zio->io_state[ZIO_WAIT_DONE] = 1;
4307 		mutex_exit(&zio->io_lock);
4308 
4309 		/*
4310 		 * "The Godfather" I/O monitors its children but is
4311 		 * not a true parent to them. It will track them through
4312 		 * the pipeline but severs its ties whenever they get into
4313 		 * trouble (e.g. suspended). This allows "The Godfather"
4314 		 * I/O to return status without blocking.
4315 		 */
4316 		zl = NULL;
4317 		for (pio = zio_walk_parents(zio, &zl); pio != NULL;
4318 		    pio = pio_next) {
4319 			zio_link_t *remove_zl = zl;
4320 			pio_next = zio_walk_parents(zio, &zl);
4321 
4322 			if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
4323 			    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
4324 				zio_remove_child(pio, zio, remove_zl);
4325 				zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
4326 			}
4327 		}
4328 
4329 		if ((pio = zio_unique_parent(zio)) != NULL) {
4330 			/*
4331 			 * We're not a root i/o, so there's nothing to do
4332 			 * but notify our parent.  Don't propagate errors
4333 			 * upward since we haven't permanently failed yet.
4334 			 */
4335 			ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
4336 			zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
4337 			zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
4338 		} else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
4339 			/*
4340 			 * We'd fail again if we reexecuted now, so suspend
4341 			 * until conditions improve (e.g. device comes online).
4342 			 */
4343 			zio_suspend(zio->io_spa, zio, ZIO_SUSPEND_IOERR);
4344 		} else {
4345 			/*
4346 			 * Reexecution is potentially a huge amount of work.
4347 			 * Hand it off to the otherwise-unused claim taskq.
4348 			 */
4349 			ASSERT(zio->io_tqent.tqent_next == NULL);
4350 			spa_taskq_dispatch_ent(spa, ZIO_TYPE_CLAIM,
4351 			    ZIO_TASKQ_ISSUE, (task_func_t *)zio_reexecute, zio,
4352 			    0, &zio->io_tqent);
4353 		}
4354 		return (ZIO_PIPELINE_STOP);
4355 	}
4356 
4357 	ASSERT(zio->io_child_count == 0);
4358 	ASSERT(zio->io_reexecute == 0);
4359 	ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
4360 
4361 	/*
4362 	 * Report any checksum errors, since the I/O is complete.
4363 	 */
4364 	while (zio->io_cksum_report != NULL) {
4365 		zio_cksum_report_t *zcr = zio->io_cksum_report;
4366 		zio->io_cksum_report = zcr->zcr_next;
4367 		zcr->zcr_next = NULL;
4368 		zcr->zcr_finish(zcr, NULL);
4369 		zfs_ereport_free_checksum(zcr);
4370 	}
4371 
4372 	/*
4373 	 * It is the responsibility of the done callback to ensure that this
4374 	 * particular zio is no longer discoverable for adoption, and as
4375 	 * such, cannot acquire any new parents.
4376 	 */
4377 	if (zio->io_done)
4378 		zio->io_done(zio);
4379 
4380 	mutex_enter(&zio->io_lock);
4381 	zio->io_state[ZIO_WAIT_DONE] = 1;
4382 	mutex_exit(&zio->io_lock);
4383 
4384 	zl = NULL;
4385 	for (pio = zio_walk_parents(zio, &zl); pio != NULL; pio = pio_next) {
4386 		zio_link_t *remove_zl = zl;
4387 		pio_next = zio_walk_parents(zio, &zl);
4388 		zio_remove_child(pio, zio, remove_zl);
4389 		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
4390 	}
4391 
4392 	if (zio->io_waiter != NULL) {
4393 		mutex_enter(&zio->io_lock);
4394 		zio->io_executor = NULL;
4395 		cv_broadcast(&zio->io_cv);
4396 		mutex_exit(&zio->io_lock);
4397 	} else {
4398 		zio_destroy(zio);
4399 	}
4400 
4401 	return (ZIO_PIPELINE_STOP);
4402 }
4403 
4404 /*
4405  * ==========================================================================
4406  * I/O pipeline definition
4407  * ==========================================================================
4408  */
4409 static zio_pipe_stage_t *zio_pipeline[] = {
4410 	NULL,
4411 	zio_read_bp_init,
4412 	zio_write_bp_init,
4413 	zio_free_bp_init,
4414 	zio_issue_async,
4415 	zio_write_compress,
4416 	zio_encrypt,
4417 	zio_checksum_generate,
4418 	zio_nop_write,
4419 	zio_ddt_read_start,
4420 	zio_ddt_read_done,
4421 	zio_ddt_write,
4422 	zio_ddt_free,
4423 	zio_gang_assemble,
4424 	zio_gang_issue,
4425 	zio_dva_throttle,
4426 	zio_dva_allocate,
4427 	zio_dva_free,
4428 	zio_dva_claim,
4429 	zio_ready,
4430 	zio_vdev_io_start,
4431 	zio_vdev_io_done,
4432 	zio_vdev_io_assess,
4433 	zio_checksum_verify,
4434 	zio_done
4435 };
4436 
4437 
4438 
4439 
4440 /*
4441  * Compare two zbookmark_phys_t's to see which we would reach first in a
4442  * pre-order traversal of the object tree.
4443  *
4444  * This is simple in every case aside from the meta-dnode object. For all other
4445  * objects, we traverse them in order (object 1 before object 2, and so on).
4446  * However, all of these objects are traversed while traversing object 0, since
4447  * the data it points to is the list of objects.  Thus, we need to convert to a
4448  * canonical representation so we can compare meta-dnode bookmarks to
4449  * non-meta-dnode bookmarks.
4450  *
4451  * We do this by calculating "equivalents" for each field of the zbookmark.
4452  * zbookmarks outside of the meta-dnode use their own object and level, and
4453  * calculate the level 0 equivalent (the first L0 blkid that is contained in the
4454  * blocks this bookmark refers to) by multiplying their blkid by their span
4455  * (the number of L0 blocks contained within one block at their level).
4456  * zbookmarks inside the meta-dnode calculate their object equivalent
4457  * (which is L0equiv * dnodes per data block), use 0 for their L0equiv, and use
4458  * level + 1<<31 (any value larger than a level could ever be) for their level.
4459  * This causes them to always compare before a bookmark in their object
4460  * equivalent, compare appropriately to bookmarks in other objects, and to
4461  * compare appropriately to other bookmarks in the meta-dnode.
4462  */
4463 int
4464 zbookmark_compare(uint16_t dbss1, uint8_t ibs1, uint16_t dbss2, uint8_t ibs2,
4465     const zbookmark_phys_t *zb1, const zbookmark_phys_t *zb2)
4466 {
4467 	/*
4468 	 * These variables represent the "equivalent" values for the zbookmark,
4469 	 * after converting zbookmarks inside the meta dnode to their
4470 	 * normal-object equivalents.
4471 	 */
4472 	uint64_t zb1obj, zb2obj;
4473 	uint64_t zb1L0, zb2L0;
4474 	uint64_t zb1level, zb2level;
4475 
4476 	if (zb1->zb_object == zb2->zb_object &&
4477 	    zb1->zb_level == zb2->zb_level &&
4478 	    zb1->zb_blkid == zb2->zb_blkid)
4479 		return (0);
4480 
4481 	/*
4482 	 * BP_SPANB calculates the span in blocks.
4483 	 */
4484 	zb1L0 = (zb1->zb_blkid) * BP_SPANB(ibs1, zb1->zb_level);
4485 	zb2L0 = (zb2->zb_blkid) * BP_SPANB(ibs2, zb2->zb_level);
4486 
4487 	if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
4488 		zb1obj = zb1L0 * (dbss1 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
4489 		zb1L0 = 0;
4490 		zb1level = zb1->zb_level + COMPARE_META_LEVEL;
4491 	} else {
4492 		zb1obj = zb1->zb_object;
4493 		zb1level = zb1->zb_level;
4494 	}
4495 
4496 	if (zb2->zb_object == DMU_META_DNODE_OBJECT) {
4497 		zb2obj = zb2L0 * (dbss2 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
4498 		zb2L0 = 0;
4499 		zb2level = zb2->zb_level + COMPARE_META_LEVEL;
4500 	} else {
4501 		zb2obj = zb2->zb_object;
4502 		zb2level = zb2->zb_level;
4503 	}
4504 
4505 	/* Now that we have a canonical representation, do the comparison. */
4506 	if (zb1obj != zb2obj)
4507 		return (zb1obj < zb2obj ? -1 : 1);
4508 	else if (zb1L0 != zb2L0)
4509 		return (zb1L0 < zb2L0 ? -1 : 1);
4510 	else if (zb1level != zb2level)
4511 		return (zb1level > zb2level ? -1 : 1);
4512 	/*
4513 	 * This can (theoretically) happen if the bookmarks have the same object
4514 	 * and level, but different blkids, if the block sizes are not the same.
4515 	 * There is presently no way to change the indirect block sizes
4516 	 */
4517 	return (0);
4518 }
4519 
4520 /*
4521  *  This function checks the following: given that last_block is the place that
4522  *  our traversal stopped last time, does that guarantee that we've visited
4523  *  every node under subtree_root?  Therefore, we can't just use the raw output
4524  *  of zbookmark_compare.  We have to pass in a modified version of
4525  *  subtree_root; by incrementing the block id, and then checking whether
4526  *  last_block is before or equal to that, we can tell whether or not having
4527  *  visited last_block implies that all of subtree_root's children have been
4528  *  visited.
4529  */
4530 boolean_t
4531 zbookmark_subtree_completed(const dnode_phys_t *dnp,
4532     const zbookmark_phys_t *subtree_root, const zbookmark_phys_t *last_block)
4533 {
4534 	zbookmark_phys_t mod_zb = *subtree_root;
4535 	mod_zb.zb_blkid++;
4536 	ASSERT(last_block->zb_level == 0);
4537 
4538 	/* The objset_phys_t isn't before anything. */
4539 	if (dnp == NULL)
4540 		return (B_FALSE);
4541 
4542 	/*
4543 	 * We pass in 1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT) for the
4544 	 * data block size in sectors, because that variable is only used if
4545 	 * the bookmark refers to a block in the meta-dnode.  Since we don't
4546 	 * know without examining it what object it refers to, and there's no
4547 	 * harm in passing in this value in other cases, we always pass it in.
4548 	 *
4549 	 * We pass in 0 for the indirect block size shift because zb2 must be
4550 	 * level 0.  The indirect block size is only used to calculate the span
4551 	 * of the bookmark, but since the bookmark must be level 0, the span is
4552 	 * always 1, so the math works out.
4553 	 *
4554 	 * If you make changes to how the zbookmark_compare code works, be sure
4555 	 * to make sure that this code still works afterwards.
4556 	 */
4557 	return (zbookmark_compare(dnp->dn_datablkszsec, dnp->dn_indblkshift,
4558 	    1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT), 0, &mod_zb,
4559 	    last_block) <= 0);
4560 }
4561