xref: /illumos-gate/usr/src/uts/common/fs/zfs/zil.c (revision 7f7322febbcfe774b7270abc3b191c094bfcc517)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <sys/zfs_context.h>
30 #include <sys/spa.h>
31 #include <sys/dmu.h>
32 #include <sys/zap.h>
33 #include <sys/arc.h>
34 #include <sys/stat.h>
35 #include <sys/resource.h>
36 #include <sys/zil.h>
37 #include <sys/zil_impl.h>
38 #include <sys/dsl_dataset.h>
39 #include <sys/vdev.h>
40 
41 /*
42  * The zfs intent log (ZIL) saves transaction records of system calls
43  * that change the file system in memory with enough information
44  * to be able to replay them. These are stored in memory until
45  * either the DMU transaction group (txg) commits them to the stable pool
46  * and they can be discarded, or they are flushed to the stable log
47  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
48  * requirement. In the event of a panic or power fail then those log
49  * records (transactions) are replayed.
50  *
51  * There is one ZIL per file system. Its on-disk (pool) format consists
52  * of 3 parts:
53  *
54  * 	- ZIL header
55  * 	- ZIL blocks
56  * 	- ZIL records
57  *
58  * A log record holds a system call transaction. Log blocks can
59  * hold many log records and the blocks are chained together.
60  * Each ZIL block contains a block pointer (blkptr_t) to the next
61  * ZIL block in the chain. The ZIL header points to the first
62  * block in the chain. Note there is not a fixed place in the pool
63  * to hold blocks. They are dynamically allocated and freed as
64  * needed from the blocks available. Figure X shows the ZIL structure:
65  */
66 
67 /*
68  * These global ZIL switches affect all pools
69  */
70 int zil_disable = 0;	/* disable intent logging */
71 int zil_always = 0;	/* make every transaction synchronous */
72 int zil_purge = 0;	/* at pool open, just throw everything away */
73 int zil_noflush = 0;	/* don't flush write cache buffers on disks */
74 
75 static kmem_cache_t *zil_lwb_cache;
76 
77 static int
78 zil_dva_compare(const void *x1, const void *x2)
79 {
80 	const dva_t *dva1 = x1;
81 	const dva_t *dva2 = x2;
82 
83 	if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
84 		return (-1);
85 	if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
86 		return (1);
87 
88 	if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
89 		return (-1);
90 	if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
91 		return (1);
92 
93 	return (0);
94 }
95 
96 static void
97 zil_dva_tree_init(avl_tree_t *t)
98 {
99 	avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
100 	    offsetof(zil_dva_node_t, zn_node));
101 }
102 
103 static void
104 zil_dva_tree_fini(avl_tree_t *t)
105 {
106 	zil_dva_node_t *zn;
107 	void *cookie = NULL;
108 
109 	while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
110 		kmem_free(zn, sizeof (zil_dva_node_t));
111 
112 	avl_destroy(t);
113 }
114 
115 static int
116 zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
117 {
118 	zil_dva_node_t *zn;
119 	avl_index_t where;
120 
121 	if (avl_find(t, dva, &where) != NULL)
122 		return (EEXIST);
123 
124 	zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
125 	zn->zn_dva = *dva;
126 	avl_insert(t, zn, where);
127 
128 	return (0);
129 }
130 
131 /*
132  * Read a log block, make sure it's valid, and byteswap it if necessary.
133  */
134 static int
135 zil_read_log_block(zilog_t *zilog, blkptr_t *bp, char *buf)
136 {
137 	uint64_t blksz = BP_GET_LSIZE(bp);
138 	zil_trailer_t *ztp = (zil_trailer_t *)(buf + blksz) - 1;
139 	zio_cksum_t cksum;
140 	int error;
141 
142 	error = zio_wait(zio_read(NULL, zilog->zl_spa, bp, buf, blksz,
143 	    NULL, NULL, ZIO_PRIORITY_SYNC_READ,
144 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE));
145 	if (error) {
146 		dprintf_bp(bp, "zilog %p bp %p read failed, error %d: ",
147 		    zilog, bp, error);
148 		return (error);
149 	}
150 
151 	if (BP_SHOULD_BYTESWAP(bp))
152 		byteswap_uint64_array(buf, blksz);
153 
154 	/*
155 	 * Sequence numbers should be... sequential.  The checksum verifier for
156 	 * the next block should be: <logid[0], logid[1], objset id, seq + 1>.
157 	 */
158 	cksum = bp->blk_cksum;
159 	cksum.zc_word[3]++;
160 	if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum, sizeof (cksum)) != 0) {
161 		dprintf_bp(bp, "zilog %p bp %p stale pointer: ", zilog, bp);
162 		return (ESTALE);
163 	}
164 
165 	if (BP_IS_HOLE(&ztp->zit_next_blk)) {
166 		dprintf_bp(bp, "zilog %p bp %p hole: ", zilog, bp);
167 		return (ENOENT);
168 	}
169 
170 	if (ztp->zit_nused > (blksz - sizeof (zil_trailer_t))) {
171 		dprintf("zilog %p bp %p nused exceeds blksz\n", zilog, bp);
172 		return (EOVERFLOW);
173 	}
174 
175 	dprintf_bp(bp, "zilog %p bp %p good block: ", zilog, bp);
176 
177 	return (0);
178 }
179 
180 /*
181  * Parse the intent log, and call parse_func for each valid record within.
182  */
183 void
184 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
185     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
186 {
187 	blkptr_t blk;
188 	char *lrbuf, *lrp;
189 	zil_trailer_t *ztp;
190 	int reclen, error;
191 
192 	blk = zilog->zl_header->zh_log;
193 	if (BP_IS_HOLE(&blk))
194 		return;
195 
196 	/*
197 	 * Starting at the block pointed to by zh_log we read the log chain.
198 	 * For each block in the chain we strongly check that block to
199 	 * ensure its validity.  We stop when an invalid block is found.
200 	 * For each block pointer in the chain we call parse_blk_func().
201 	 * For each record in each valid block we call parse_lr_func().
202 	 */
203 	zil_dva_tree_init(&zilog->zl_dva_tree);
204 	lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
205 	for (;;) {
206 		error = zil_read_log_block(zilog, &blk, lrbuf);
207 
208 		if (parse_blk_func != NULL)
209 			parse_blk_func(zilog, &blk, arg, txg);
210 
211 		if (error)
212 			break;
213 
214 		ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
215 		blk = ztp->zit_next_blk;
216 
217 		if (parse_lr_func == NULL)
218 			continue;
219 
220 		for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
221 			lr_t *lr = (lr_t *)lrp;
222 			reclen = lr->lrc_reclen;
223 			ASSERT3U(reclen, >=, sizeof (lr_t));
224 			parse_lr_func(zilog, lr, arg, txg);
225 		}
226 	}
227 	zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
228 	zil_dva_tree_fini(&zilog->zl_dva_tree);
229 }
230 
231 /* ARGSUSED */
232 static void
233 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
234 {
235 	spa_t *spa = zilog->zl_spa;
236 	int err;
237 
238 	dprintf_bp(bp, "first_txg %llu: ", first_txg);
239 
240 	/*
241 	 * Claim log block if not already committed and not already claimed.
242 	 */
243 	if (bp->blk_birth >= first_txg &&
244 	    zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
245 		err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL));
246 		ASSERT(err == 0);
247 	}
248 }
249 
250 static void
251 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
252 {
253 	if (lrc->lrc_txtype == TX_WRITE) {
254 		lr_write_t *lr = (lr_write_t *)lrc;
255 		zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
256 	}
257 }
258 
259 /* ARGSUSED */
260 static void
261 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
262 {
263 	zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
264 }
265 
266 static void
267 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
268 {
269 	/*
270 	 * If we previously claimed it, we need to free it.
271 	 */
272 	if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
273 		lr_write_t *lr = (lr_write_t *)lrc;
274 		blkptr_t *bp = &lr->lr_blkptr;
275 		if (bp->blk_birth >= claim_txg &&
276 		    !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
277 			(void) arc_free(NULL, zilog->zl_spa,
278 			    dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
279 		}
280 	}
281 }
282 
283 /*
284  * Create an on-disk intent log.
285  */
286 static void
287 zil_create(zilog_t *zilog)
288 {
289 	lwb_t *lwb;
290 	uint64_t txg;
291 	dmu_tx_t *tx;
292 	blkptr_t blk;
293 	int error;
294 
295 	ASSERT(zilog->zl_header->zh_claim_txg == 0);
296 	ASSERT(zilog->zl_header->zh_replay_seq == 0);
297 
298 	/*
299 	 * Initialize the log header block.
300 	 */
301 	tx = dmu_tx_create(zilog->zl_os);
302 	(void) dmu_tx_assign(tx, TXG_WAIT);
303 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
304 	txg = dmu_tx_get_txg(tx);
305 
306 	/*
307 	 * Allocate the first log block and assign its checksum verifier.
308 	 */
309 	error = zio_alloc_blk(zilog->zl_spa, ZIO_CHECKSUM_ZILOG,
310 	    ZIL_MIN_BLKSZ, &blk, txg);
311 	if (error == 0) {
312 		ZIO_SET_CHECKSUM(&blk.blk_cksum,
313 		    spa_get_random(-1ULL), spa_get_random(-1ULL),
314 		    dmu_objset_id(zilog->zl_os), 1ULL);
315 
316 		/*
317 		 * Allocate a log write buffer (lwb) for the first log block.
318 		 */
319 		lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
320 		lwb->lwb_zilog = zilog;
321 		lwb->lwb_blk = blk;
322 		lwb->lwb_nused = 0;
323 		lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
324 		lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
325 		lwb->lwb_max_txg = txg;
326 		lwb->lwb_seq = 0;
327 		lwb->lwb_state = UNWRITTEN;
328 		mutex_enter(&zilog->zl_lock);
329 		list_insert_tail(&zilog->zl_lwb_list, lwb);
330 		mutex_exit(&zilog->zl_lock);
331 	}
332 
333 	dmu_tx_commit(tx);
334 	txg_wait_synced(zilog->zl_dmu_pool, txg);
335 }
336 
337 /*
338  * In one tx, free all log blocks and clear the log header.
339  */
340 void
341 zil_destroy(zilog_t *zilog)
342 {
343 	dmu_tx_t *tx;
344 	uint64_t txg;
345 
346 	mutex_enter(&zilog->zl_destroy_lock);
347 
348 	if (BP_IS_HOLE(&zilog->zl_header->zh_log)) {
349 		mutex_exit(&zilog->zl_destroy_lock);
350 		return;
351 	}
352 
353 	tx = dmu_tx_create(zilog->zl_os);
354 	(void) dmu_tx_assign(tx, TXG_WAIT);
355 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
356 	txg = dmu_tx_get_txg(tx);
357 
358 	zil_parse(zilog, zil_free_log_block, zil_free_log_record, tx,
359 	    zilog->zl_header->zh_claim_txg);
360 	zilog->zl_destroy_txg = txg;
361 
362 	dmu_tx_commit(tx);
363 	txg_wait_synced(zilog->zl_dmu_pool, txg);
364 
365 	mutex_exit(&zilog->zl_destroy_lock);
366 }
367 
368 void
369 zil_claim(char *osname, void *txarg)
370 {
371 	dmu_tx_t *tx = txarg;
372 	uint64_t first_txg = dmu_tx_get_txg(tx);
373 	zilog_t *zilog;
374 	zil_header_t *zh;
375 	objset_t *os;
376 	int error;
377 
378 	error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_STANDARD, &os);
379 	if (error) {
380 		cmn_err(CE_WARN, "can't process intent log for %s", osname);
381 		return;
382 	}
383 
384 	zilog = dmu_objset_zil(os);
385 	zh = zilog->zl_header;
386 
387 	/*
388 	 * Claim all log blocks if we haven't already done so.
389 	 */
390 	ASSERT3U(zh->zh_claim_txg, <=, first_txg);
391 	if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
392 		zh->zh_claim_txg = first_txg;
393 		zil_parse(zilog, zil_claim_log_block, zil_claim_log_record,
394 		    tx, first_txg);
395 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
396 	}
397 	ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
398 	dmu_objset_close(os);
399 }
400 
401 void
402 zil_add_vdev(zilog_t *zilog, uint64_t vdev, uint64_t seq)
403 {
404 	zil_vdev_t *zv;
405 
406 	if (zil_noflush)
407 		return;
408 
409 	ASSERT(MUTEX_HELD(&zilog->zl_lock));
410 	zv = kmem_alloc(sizeof (zil_vdev_t), KM_SLEEP);
411 	zv->vdev = vdev;
412 	zv->seq = seq;
413 	list_insert_tail(&zilog->zl_vdev_list, zv);
414 }
415 
416 void
417 zil_flush_vdevs(zilog_t *zilog, uint64_t seq)
418 {
419 	vdev_t *vd;
420 	zil_vdev_t *zv, *zv2;
421 	zio_t *zio;
422 	spa_t *spa;
423 	uint64_t vdev;
424 
425 	if (zil_noflush)
426 		return;
427 
428 	ASSERT(MUTEX_HELD(&zilog->zl_lock));
429 
430 	spa = zilog->zl_spa;
431 	zio = NULL;
432 
433 	while ((zv = list_head(&zilog->zl_vdev_list)) != NULL &&
434 	    zv->seq <= seq) {
435 		vdev = zv->vdev;
436 		list_remove(&zilog->zl_vdev_list, zv);
437 		kmem_free(zv, sizeof (zil_vdev_t));
438 
439 		/*
440 		 * remove all chained entries <= seq with same vdev
441 		 */
442 		zv = list_head(&zilog->zl_vdev_list);
443 		while (zv && zv->seq <= seq) {
444 			zv2 = list_next(&zilog->zl_vdev_list, zv);
445 			if (zv->vdev == vdev) {
446 				list_remove(&zilog->zl_vdev_list, zv);
447 				kmem_free(zv, sizeof (zil_vdev_t));
448 			}
449 			zv = zv2;
450 		}
451 
452 		/* flush the write cache for this vdev */
453 		mutex_exit(&zilog->zl_lock);
454 		if (zio == NULL)
455 			zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
456 		vd = vdev_lookup_top(spa, vdev);
457 		ASSERT(vd);
458 		(void) zio_nowait(zio_ioctl(zio, spa, vd, DKIOCFLUSHWRITECACHE,
459 		    NULL, NULL, ZIO_PRIORITY_NOW,
460 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY));
461 		mutex_enter(&zilog->zl_lock);
462 	}
463 
464 	/*
465 	 * Wait for all the flushes to complete.  Not all devices actually
466 	 * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
467 	 */
468 	if (zio != NULL) {
469 		mutex_exit(&zilog->zl_lock);
470 		(void) zio_wait(zio);
471 		mutex_enter(&zilog->zl_lock);
472 	}
473 }
474 
475 /*
476  * Function called when a log block write completes
477  */
478 static void
479 zil_lwb_write_done(zio_t *zio)
480 {
481 	lwb_t *prev;
482 	lwb_t *lwb = zio->io_private;
483 	zilog_t *zilog = lwb->lwb_zilog;
484 	uint64_t max_seq;
485 
486 	/*
487 	 * Now that we've written this log block, we have a stable pointer
488 	 * to the next block in the chain, so it's OK to let the txg in
489 	 * which we allocated the next block sync.
490 	 */
491 	txg_rele_to_sync(&lwb->lwb_txgh);
492 
493 	zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
494 	mutex_enter(&zilog->zl_lock);
495 	lwb->lwb_buf = NULL;
496 	if (zio->io_error) {
497 		zilog->zl_log_error = B_TRUE;
498 		mutex_exit(&zilog->zl_lock);
499 		cv_broadcast(&zilog->zl_cv_seq);
500 		return;
501 	}
502 
503 	prev = list_prev(&zilog->zl_lwb_list, lwb);
504 	if (prev && prev->lwb_state != SEQ_COMPLETE) {
505 		/* There's an unwritten buffer in the chain before this one */
506 		lwb->lwb_state = SEQ_INCOMPLETE;
507 		mutex_exit(&zilog->zl_lock);
508 		return;
509 	}
510 
511 	max_seq = lwb->lwb_seq;
512 	lwb->lwb_state = SEQ_COMPLETE;
513 	/*
514 	 * We must also follow up the chain for already written buffers
515 	 * to see if we can set zl_ss_seq even higher.
516 	 */
517 	while (lwb = list_next(&zilog->zl_lwb_list, lwb)) {
518 		if (lwb->lwb_state != SEQ_INCOMPLETE)
519 			break;
520 		lwb->lwb_state = SEQ_COMPLETE;
521 		/* lwb_seq will be zero if we've written an empty buffer */
522 		if (lwb->lwb_seq) {
523 			ASSERT3U(max_seq, <, lwb->lwb_seq);
524 			max_seq = lwb->lwb_seq;
525 		}
526 	}
527 	zilog->zl_ss_seq = MAX(max_seq, zilog->zl_ss_seq);
528 	mutex_exit(&zilog->zl_lock);
529 	cv_broadcast(&zilog->zl_cv_seq);
530 }
531 
532 /*
533  * Start a log block write and advance to the next log block.
534  * Calls are serialized.
535  */
536 static lwb_t *
537 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
538 {
539 	lwb_t *nlwb;
540 	zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
541 	uint64_t txg;
542 	uint64_t zil_blksz;
543 	int error;
544 
545 	ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
546 
547 	/*
548 	 * Allocate the next block and save its address in this block
549 	 * before writing it in order to establish the log chain.
550 	 * Note that if the allocation of nlwb synced before we wrote
551 	 * the block that points at it (lwb), we'd leak it if we crashed.
552 	 * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
553 	 */
554 	txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
555 	txg_rele_to_quiesce(&lwb->lwb_txgh);
556 
557 	/*
558 	 * Pick a ZIL blocksize. We request a size that is the
559 	 * maximum of the previous used size, the current used size and
560 	 * the amount waiting in the queue.
561 	 */
562 	zil_blksz = MAX(zilog->zl_cur_used, zilog->zl_prev_used);
563 	zil_blksz = MAX(zil_blksz, zilog->zl_itx_list_sz + sizeof (*ztp));
564 	zil_blksz = P2ROUNDUP(zil_blksz, ZIL_MIN_BLKSZ);
565 	if (zil_blksz > ZIL_MAX_BLKSZ)
566 		zil_blksz = ZIL_MAX_BLKSZ;
567 
568 	error = zio_alloc_blk(zilog->zl_spa, ZIO_CHECKSUM_ZILOG,
569 	    zil_blksz, &ztp->zit_next_blk, txg);
570 	if (error) {
571 		txg_rele_to_sync(&lwb->lwb_txgh);
572 		return (NULL);
573 	}
574 
575 	ASSERT3U(ztp->zit_next_blk.blk_birth, ==, txg);
576 	ztp->zit_nused = lwb->lwb_nused;
577 	ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
578 	ztp->zit_next_blk.blk_cksum = lwb->lwb_blk.blk_cksum;
579 	ztp->zit_next_blk.blk_cksum.zc_word[3]++;
580 
581 	/*
582 	 * Allocate a new log write buffer (lwb).
583 	 */
584 	nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
585 
586 	nlwb->lwb_zilog = zilog;
587 	nlwb->lwb_blk = ztp->zit_next_blk;
588 	nlwb->lwb_nused = 0;
589 	nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
590 	nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
591 	nlwb->lwb_max_txg = txg;
592 	nlwb->lwb_seq = 0;
593 	nlwb->lwb_state = UNWRITTEN;
594 
595 	/*
596 	 * Put new lwb at the end of the log chain,
597 	 * and record the vdev for later flushing
598 	 */
599 	mutex_enter(&zilog->zl_lock);
600 	list_insert_tail(&zilog->zl_lwb_list, nlwb);
601 	zil_add_vdev(zilog, DVA_GET_VDEV(BP_IDENTITY(&(lwb->lwb_blk))),
602 	    lwb->lwb_seq);
603 	mutex_exit(&zilog->zl_lock);
604 
605 	/*
606 	 * write the old log block
607 	 */
608 	dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
609 	zio_nowait(zio_rewrite(NULL, zilog->zl_spa, ZIO_CHECKSUM_ZILOG, 0,
610 	    &lwb->lwb_blk, lwb->lwb_buf, lwb->lwb_sz, zil_lwb_write_done, lwb,
611 	    ZIO_PRIORITY_LOG_WRITE, ZIO_FLAG_MUSTSUCCEED));
612 
613 	return (nlwb);
614 }
615 
616 static lwb_t *
617 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
618 {
619 	lr_t *lrc = &itx->itx_lr; /* common log record */
620 	uint64_t seq = lrc->lrc_seq;
621 	uint64_t txg = lrc->lrc_txg;
622 	uint64_t reclen = lrc->lrc_reclen;
623 	int error;
624 
625 	if (lwb == NULL)
626 		return (NULL);
627 	ASSERT(lwb->lwb_buf != NULL);
628 
629 	/*
630 	 * If it's a write, fetch the data or get its blkptr as appropriate.
631 	 */
632 	if (lrc->lrc_txtype == TX_WRITE) {
633 		lr_write_t *lr = (lr_write_t *)lrc;
634 		if (txg > spa_freeze_txg(zilog->zl_spa))
635 			txg_wait_synced(zilog->zl_dmu_pool, txg);
636 
637 		if (!itx->itx_data_copied &&
638 		    (error = zilog->zl_get_data(itx->itx_private, lr)) != 0) {
639 			if (error != ENOENT && error != EALREADY) {
640 				txg_wait_synced(zilog->zl_dmu_pool, txg);
641 				mutex_enter(&zilog->zl_lock);
642 				zilog->zl_ss_seq = MAX(seq, zilog->zl_ss_seq);
643 				zil_add_vdev(zilog,
644 				    DVA_GET_VDEV(BP_IDENTITY(&(lr->lr_blkptr))),
645 				    seq);
646 				mutex_exit(&zilog->zl_lock);
647 				return (lwb);
648 			}
649 			mutex_enter(&zilog->zl_lock);
650 			zil_add_vdev(zilog,
651 			    DVA_GET_VDEV(BP_IDENTITY(&(lr->lr_blkptr))), seq);
652 			mutex_exit(&zilog->zl_lock);
653 			return (lwb);
654 		}
655 	}
656 
657 	zilog->zl_cur_used += reclen;
658 
659 	/*
660 	 * If this record won't fit in the current log block, start a new one.
661 	 */
662 	if (lwb->lwb_nused + reclen > ZIL_BLK_DATA_SZ(lwb)) {
663 		lwb = zil_lwb_write_start(zilog, lwb);
664 		if (lwb == NULL)
665 			return (NULL);
666 		if (lwb->lwb_nused + reclen > ZIL_BLK_DATA_SZ(lwb)) {
667 			txg_wait_synced(zilog->zl_dmu_pool, txg);
668 			mutex_enter(&zilog->zl_lock);
669 			zilog->zl_ss_seq = MAX(seq, zilog->zl_ss_seq);
670 			mutex_exit(&zilog->zl_lock);
671 			return (lwb);
672 		}
673 	}
674 
675 	bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
676 	lwb->lwb_nused += reclen;
677 	lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
678 	ASSERT3U(lwb->lwb_seq, <, seq);
679 	lwb->lwb_seq = seq;
680 	ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
681 	ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
682 
683 	return (lwb);
684 }
685 
686 itx_t *
687 zil_itx_create(int txtype, size_t lrsize)
688 {
689 	itx_t *itx;
690 
691 	lrsize = P2ROUNDUP(lrsize, sizeof (uint64_t));
692 
693 	itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
694 	itx->itx_lr.lrc_txtype = txtype;
695 	itx->itx_lr.lrc_reclen = lrsize;
696 	itx->itx_lr.lrc_seq = 0;	/* defensive */
697 
698 	return (itx);
699 }
700 
701 uint64_t
702 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
703 {
704 	uint64_t seq;
705 
706 	ASSERT(itx->itx_lr.lrc_seq == 0);
707 
708 	mutex_enter(&zilog->zl_lock);
709 	list_insert_tail(&zilog->zl_itx_list, itx);
710 	zilog->zl_itx_list_sz += itx->itx_lr.lrc_reclen;
711 	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
712 	itx->itx_lr.lrc_seq = seq = ++zilog->zl_itx_seq;
713 	mutex_exit(&zilog->zl_lock);
714 
715 	return (seq);
716 }
717 
718 /*
719  * Free up all in-memory intent log transactions that have now been synced.
720  */
721 static void
722 zil_itx_clean(zilog_t *zilog)
723 {
724 	uint64_t synced_txg = spa_last_synced_txg(zilog->zl_spa);
725 	uint64_t freeze_txg = spa_freeze_txg(zilog->zl_spa);
726 	uint64_t max_seq = 0;
727 	itx_t *itx;
728 
729 	mutex_enter(&zilog->zl_lock);
730 	while ((itx = list_head(&zilog->zl_itx_list)) != NULL &&
731 	    itx->itx_lr.lrc_txg <= MIN(synced_txg, freeze_txg)) {
732 		list_remove(&zilog->zl_itx_list, itx);
733 		zilog->zl_itx_list_sz -= itx->itx_lr.lrc_reclen;
734 		ASSERT3U(max_seq, <, itx->itx_lr.lrc_seq);
735 		max_seq = itx->itx_lr.lrc_seq;
736 		kmem_free(itx, offsetof(itx_t, itx_lr)
737 		    + itx->itx_lr.lrc_reclen);
738 	}
739 	if (max_seq > zilog->zl_ss_seq) {
740 		zilog->zl_ss_seq = max_seq;
741 		cv_broadcast(&zilog->zl_cv_seq);
742 	}
743 	mutex_exit(&zilog->zl_lock);
744 }
745 
746 void
747 zil_clean(zilog_t *zilog)
748 {
749 	/*
750 	 * Check for any log blocks that can be freed.
751 	 * Log blocks are only freed when the log block allocation and
752 	 * log records contained within are both known to be committed.
753 	 */
754 	mutex_enter(&zilog->zl_lock);
755 	if (list_head(&zilog->zl_itx_list) != NULL)
756 		(void) taskq_dispatch(zilog->zl_clean_taskq,
757 		    (void (*)(void *))zil_itx_clean, zilog, TQ_NOSLEEP);
758 	mutex_exit(&zilog->zl_lock);
759 }
760 
761 /*
762  * Push zfs transactions to stable storage up to the supplied sequence number.
763  */
764 void
765 zil_commit(zilog_t *zilog, uint64_t seq, int ioflag)
766 {
767 	uint64_t txg;
768 	uint64_t max_seq;
769 	uint64_t reclen;
770 	itx_t *itx;
771 	lwb_t *lwb;
772 	spa_t *spa;
773 
774 	if (zilog == NULL || seq == 0 ||
775 	    ((ioflag & (FSYNC | FDSYNC | FRSYNC)) == 0 && !zil_always))
776 		return;
777 
778 	spa = zilog->zl_spa;
779 	mutex_enter(&zilog->zl_lock);
780 
781 	seq = MIN(seq, zilog->zl_itx_seq);	/* cap seq at largest itx seq */
782 
783 	for (;;) {
784 		if (zilog->zl_ss_seq >= seq) {	/* already on stable storage */
785 			cv_signal(&zilog->zl_cv_write);
786 			mutex_exit(&zilog->zl_lock);
787 			return;
788 		}
789 
790 		if (zilog->zl_writer == B_FALSE) /* no one writing, do it */
791 			break;
792 
793 		cv_wait(&zilog->zl_cv_write, &zilog->zl_lock);
794 	}
795 
796 	zilog->zl_writer = B_TRUE;
797 	max_seq = 0;
798 
799 	if (zilog->zl_suspend) {
800 		lwb = NULL;
801 	} else {
802 		lwb = list_tail(&zilog->zl_lwb_list);
803 		if (lwb == NULL) {
804 			mutex_exit(&zilog->zl_lock);
805 			zil_create(zilog);
806 			mutex_enter(&zilog->zl_lock);
807 			lwb = list_tail(&zilog->zl_lwb_list);
808 		}
809 	}
810 
811 	/*
812 	 * Loop through in-memory log transactions filling log blocks,
813 	 * until we reach the given sequence number and there's no more
814 	 * room in the write buffer.
815 	 */
816 	for (;;) {
817 		itx = list_head(&zilog->zl_itx_list);
818 		if (itx == NULL)
819 			break;
820 
821 		reclen = itx->itx_lr.lrc_reclen;
822 		if ((itx->itx_lr.lrc_seq > seq) &&
823 		    ((lwb == NULL) || (lwb->lwb_nused + reclen >
824 		    ZIL_BLK_DATA_SZ(lwb))))
825 			break;
826 
827 		list_remove(&zilog->zl_itx_list, itx);
828 		txg = itx->itx_lr.lrc_txg;
829 		ASSERT(txg);
830 
831 		mutex_exit(&zilog->zl_lock);
832 		if (txg > spa_last_synced_txg(spa) ||
833 		    txg > spa_freeze_txg(spa))
834 			lwb = zil_lwb_commit(zilog, itx, lwb);
835 		else
836 			max_seq = itx->itx_lr.lrc_seq;
837 		kmem_free(itx, offsetof(itx_t, itx_lr)
838 		    + itx->itx_lr.lrc_reclen);
839 		mutex_enter(&zilog->zl_lock);
840 		zilog->zl_itx_list_sz -= reclen;
841 	}
842 
843 	mutex_exit(&zilog->zl_lock);
844 
845 	/* write the last block out */
846 	if (lwb != NULL && lwb->lwb_nused != 0)
847 		lwb = zil_lwb_write_start(zilog, lwb);
848 
849 	zilog->zl_prev_used = zilog->zl_cur_used;
850 	zilog->zl_cur_used = 0;
851 
852 	mutex_enter(&zilog->zl_lock);
853 	if (max_seq > zilog->zl_ss_seq) {
854 		zilog->zl_ss_seq = max_seq;
855 		cv_broadcast(&zilog->zl_cv_seq);
856 	}
857 	/*
858 	 * Wait if necessary for our seq to be committed.
859 	 */
860 	if (lwb) {
861 		while (zilog->zl_ss_seq < seq && zilog->zl_log_error == 0)
862 			cv_wait(&zilog->zl_cv_seq, &zilog->zl_lock);
863 		zil_flush_vdevs(zilog, seq);
864 	}
865 
866 	if (zilog->zl_log_error || lwb == NULL) {
867 		zilog->zl_log_error = 0;
868 		max_seq = zilog->zl_itx_seq;
869 		mutex_exit(&zilog->zl_lock);
870 		txg_wait_synced(zilog->zl_dmu_pool, 0);
871 		mutex_enter(&zilog->zl_lock);
872 		zilog->zl_ss_seq = MAX(max_seq, zilog->zl_ss_seq);
873 		cv_broadcast(&zilog->zl_cv_seq);
874 	}
875 	/* wake up others waiting to start a write */
876 	zilog->zl_writer = B_FALSE;
877 	mutex_exit(&zilog->zl_lock);
878 	cv_signal(&zilog->zl_cv_write);
879 }
880 
881 /*
882  * Called in syncing context to free committed log blocks and update log header.
883  */
884 void
885 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
886 {
887 	uint64_t txg = dmu_tx_get_txg(tx);
888 	spa_t *spa = zilog->zl_spa;
889 	lwb_t *lwb;
890 
891 	ASSERT(zilog->zl_stop_sync == 0);
892 
893 	zilog->zl_header->zh_replay_seq = zilog->zl_replay_seq[txg & TXG_MASK];
894 
895 	if (zilog->zl_destroy_txg == txg) {
896 		bzero(zilog->zl_header, sizeof (zil_header_t));
897 		bzero(zilog->zl_replay_seq, sizeof (zilog->zl_replay_seq));
898 		zilog->zl_destroy_txg = 0;
899 	}
900 
901 	mutex_enter(&zilog->zl_lock);
902 	for (;;) {
903 		lwb = list_head(&zilog->zl_lwb_list);
904 		if (lwb == NULL) {
905 			mutex_exit(&zilog->zl_lock);
906 			return;
907 		}
908 		if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
909 			break;
910 		list_remove(&zilog->zl_lwb_list, lwb);
911 		zio_free_blk(spa, &lwb->lwb_blk, txg);
912 		kmem_cache_free(zil_lwb_cache, lwb);
913 	}
914 	zilog->zl_header->zh_log = lwb->lwb_blk;
915 	mutex_exit(&zilog->zl_lock);
916 }
917 
918 void
919 zil_init(void)
920 {
921 	zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
922 	    sizeof (struct lwb), NULL, NULL, NULL, NULL, NULL, NULL, 0);
923 }
924 
925 void
926 zil_fini(void)
927 {
928 	kmem_cache_destroy(zil_lwb_cache);
929 }
930 
931 zilog_t *
932 zil_alloc(objset_t *os, zil_header_t *zh_phys)
933 {
934 	zilog_t *zilog;
935 
936 	zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
937 
938 	zilog->zl_header = zh_phys;
939 	zilog->zl_os = os;
940 	zilog->zl_spa = dmu_objset_spa(os);
941 	zilog->zl_dmu_pool = dmu_objset_pool(os);
942 
943 	list_create(&zilog->zl_itx_list, sizeof (itx_t),
944 	    offsetof(itx_t, itx_node));
945 
946 	list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
947 	    offsetof(lwb_t, lwb_node));
948 
949 	list_create(&zilog->zl_vdev_list, sizeof (zil_vdev_t),
950 	    offsetof(zil_vdev_t, vdev_seq_node));
951 
952 	return (zilog);
953 }
954 
955 void
956 zil_free(zilog_t *zilog)
957 {
958 	lwb_t *lwb;
959 	zil_vdev_t *zv;
960 
961 	zilog->zl_stop_sync = 1;
962 
963 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
964 		list_remove(&zilog->zl_lwb_list, lwb);
965 		if (lwb->lwb_buf != NULL)
966 			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
967 		kmem_cache_free(zil_lwb_cache, lwb);
968 	}
969 	list_destroy(&zilog->zl_lwb_list);
970 
971 	while ((zv = list_head(&zilog->zl_vdev_list)) != NULL) {
972 		list_remove(&zilog->zl_vdev_list, zv);
973 		kmem_free(zv, sizeof (zil_vdev_t));
974 	}
975 	list_destroy(&zilog->zl_vdev_list);
976 
977 	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
978 	list_destroy(&zilog->zl_itx_list);
979 
980 	kmem_free(zilog, sizeof (zilog_t));
981 }
982 
983 /*
984  * Open an intent log.
985  */
986 zilog_t *
987 zil_open(objset_t *os, zil_get_data_t *get_data)
988 {
989 	zilog_t *zilog = dmu_objset_zil(os);
990 
991 	zilog->zl_get_data = get_data;
992 	zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
993 	    2, 2, TASKQ_PREPOPULATE);
994 
995 	return (zilog);
996 }
997 
998 /*
999  * Close an intent log.
1000  */
1001 void
1002 zil_close(zilog_t *zilog)
1003 {
1004 	txg_wait_synced(zilog->zl_dmu_pool, 0);
1005 	taskq_destroy(zilog->zl_clean_taskq);
1006 	zilog->zl_clean_taskq = NULL;
1007 	zilog->zl_get_data = NULL;
1008 
1009 	zil_itx_clean(zilog);
1010 	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
1011 }
1012 
1013 /*
1014  * Suspend an intent log.  While in suspended mode, we still honor
1015  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1016  * We suspend the log briefly when taking a snapshot so that the snapshot
1017  * contains all the data it's supposed to, and has an empty intent log.
1018  */
1019 int
1020 zil_suspend(zilog_t *zilog)
1021 {
1022 	lwb_t *lwb;
1023 
1024 	mutex_enter(&zilog->zl_lock);
1025 	if (zilog->zl_header->zh_claim_txg != 0) {	/* unplayed log */
1026 		mutex_exit(&zilog->zl_lock);
1027 		return (EBUSY);
1028 	}
1029 	zilog->zl_suspend++;
1030 	mutex_exit(&zilog->zl_lock);
1031 
1032 	zil_commit(zilog, UINT64_MAX, FSYNC);
1033 
1034 	mutex_enter(&zilog->zl_lock);
1035 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1036 		if (lwb->lwb_buf != NULL) {
1037 			/*
1038 			 * Wait for the buffer if it's in the process of
1039 			 * being written.
1040 			 */
1041 			if ((lwb->lwb_seq != 0) &&
1042 			    (lwb->lwb_state != SEQ_COMPLETE)) {
1043 				cv_wait(&zilog->zl_cv_seq, &zilog->zl_lock);
1044 				continue;
1045 			}
1046 			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1047 		}
1048 		list_remove(&zilog->zl_lwb_list, lwb);
1049 		kmem_cache_free(zil_lwb_cache, lwb);
1050 	}
1051 	mutex_exit(&zilog->zl_lock);
1052 
1053 	zil_destroy(zilog);
1054 
1055 	return (0);
1056 }
1057 
1058 void
1059 zil_resume(zilog_t *zilog)
1060 {
1061 	mutex_enter(&zilog->zl_lock);
1062 	ASSERT(zilog->zl_suspend != 0);
1063 	zilog->zl_suspend--;
1064 	mutex_exit(&zilog->zl_lock);
1065 }
1066 
1067 typedef struct zil_replay_arg {
1068 	objset_t	*zr_os;
1069 	zil_replay_func_t **zr_replay;
1070 	void		*zr_arg;
1071 	void		(*zr_rm_sync)(void *arg);
1072 	uint64_t	*zr_txgp;
1073 	boolean_t	zr_byteswap;
1074 	char		*zr_lrbuf;
1075 } zil_replay_arg_t;
1076 
1077 static void
1078 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1079 {
1080 	zil_replay_arg_t *zr = zra;
1081 	zil_header_t *zh = zilog->zl_header;
1082 	uint64_t reclen = lr->lrc_reclen;
1083 	uint64_t txtype = lr->lrc_txtype;
1084 	int pass, error;
1085 
1086 	if (zilog->zl_stop_replay)
1087 		return;
1088 
1089 	if (lr->lrc_txg < claim_txg)		/* already committed */
1090 		return;
1091 
1092 	if (lr->lrc_seq <= zh->zh_replay_seq)	/* already replayed */
1093 		return;
1094 
1095 	/*
1096 	 * Make a copy of the data so we can revise and extend it.
1097 	 */
1098 	bcopy(lr, zr->zr_lrbuf, reclen);
1099 
1100 	/*
1101 	 * The log block containing this lr may have been byteswapped
1102 	 * so that we can easily examine common fields like lrc_txtype.
1103 	 * However, the log is a mix of different data types, and only the
1104 	 * replay vectors know how to byteswap their records.  Therefore, if
1105 	 * the lr was byteswapped, undo it before invoking the replay vector.
1106 	 */
1107 	if (zr->zr_byteswap)
1108 		byteswap_uint64_array(zr->zr_lrbuf, reclen);
1109 
1110 	/*
1111 	 * If this is a TX_WRITE with a blkptr, suck in the data.
1112 	 */
1113 	if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1114 		lr_write_t *lrw = (lr_write_t *)lr;
1115 		blkptr_t *wbp = &lrw->lr_blkptr;
1116 		uint64_t wlen = lrw->lr_length;
1117 		char *wbuf = zr->zr_lrbuf + reclen;
1118 
1119 		if (BP_IS_HOLE(wbp)) {	/* compressed to a hole */
1120 			bzero(wbuf, wlen);
1121 		} else {
1122 			/*
1123 			 * A subsequent write may have overwritten this block,
1124 			 * in which case wbp may have been been freed and
1125 			 * reallocated, and our read of wbp may fail with a
1126 			 * checksum error.  We can safely ignore this because
1127 			 * the later write will provide the correct data.
1128 			 */
1129 			(void) zio_wait(zio_read(NULL, zilog->zl_spa,
1130 			    wbp, wbuf, BP_GET_LSIZE(wbp), NULL, NULL,
1131 			    ZIO_PRIORITY_SYNC_READ,
1132 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE));
1133 			(void) memmove(wbuf, wbuf + lrw->lr_blkoff, wlen);
1134 		}
1135 	}
1136 
1137 	/*
1138 	 * We must now do two things atomically: replay this log record,
1139 	 * and update the log header to reflect the fact that we did so.
1140 	 * We use the DMU's ability to assign into a specific txg to do this.
1141 	 */
1142 	for (pass = 1; /* CONSTANTCONDITION */; pass++) {
1143 		uint64_t replay_txg;
1144 		dmu_tx_t *replay_tx;
1145 
1146 		replay_tx = dmu_tx_create(zr->zr_os);
1147 		error = dmu_tx_assign(replay_tx, TXG_WAIT);
1148 		if (error) {
1149 			dmu_tx_abort(replay_tx);
1150 			break;
1151 		}
1152 
1153 		replay_txg = dmu_tx_get_txg(replay_tx);
1154 
1155 		if (txtype == 0 || txtype >= TX_MAX_TYPE) {
1156 			error = EINVAL;
1157 		} else {
1158 			/*
1159 			 * On the first pass, arrange for the replay vector
1160 			 * to fail its dmu_tx_assign().  That's the only way
1161 			 * to ensure that those code paths remain well tested.
1162 			 */
1163 			*zr->zr_txgp = replay_txg - (pass == 1);
1164 			error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
1165 			    zr->zr_byteswap);
1166 			*zr->zr_txgp = TXG_NOWAIT;
1167 		}
1168 
1169 		if (error == 0) {
1170 			dsl_dataset_dirty(dmu_objset_ds(zr->zr_os), replay_tx);
1171 			zilog->zl_replay_seq[replay_txg & TXG_MASK] =
1172 			    lr->lrc_seq;
1173 		}
1174 
1175 		dmu_tx_commit(replay_tx);
1176 
1177 		if (error != ERESTART)
1178 			break;
1179 
1180 		if (pass != 1)
1181 			txg_wait_open(spa_get_dsl(zilog->zl_spa),
1182 			    replay_txg + 1);
1183 
1184 		dprintf("pass %d, retrying\n", pass);
1185 	}
1186 
1187 	if (error) {
1188 		char *name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
1189 		dmu_objset_name(zr->zr_os, name);
1190 		cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1191 		    "dataset %s, seq 0x%llx, txtype %llu\n",
1192 		    error, name,
1193 		    (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype);
1194 		zilog->zl_stop_replay = 1;
1195 		kmem_free(name, MAXNAMELEN);
1196 	}
1197 
1198 	/*
1199 	 * The DMU's dnode layer doesn't see removes until the txg commits,
1200 	 * so a subsequent claim can spuriously fail with EEXIST.
1201 	 * To prevent this, if we might have removed an object,
1202 	 * wait for the delete thread to delete it, and then
1203 	 * wait for the transaction group to sync.
1204 	 */
1205 	if (txtype == TX_REMOVE || txtype == TX_RMDIR || txtype == TX_RENAME) {
1206 		if (zr->zr_rm_sync != NULL)
1207 			zr->zr_rm_sync(zr->zr_arg);
1208 		txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
1209 	}
1210 }
1211 
1212 /*
1213  * If this dataset has an intent log, replay it and destroy it.
1214  */
1215 void
1216 zil_replay(objset_t *os, void *arg, uint64_t *txgp,
1217 	zil_replay_func_t *replay_func[TX_MAX_TYPE], void (*rm_sync)(void *arg))
1218 {
1219 	zilog_t *zilog = dmu_objset_zil(os);
1220 	zil_replay_arg_t zr;
1221 
1222 	zr.zr_os = os;
1223 	zr.zr_replay = replay_func;
1224 	zr.zr_arg = arg;
1225 	zr.zr_rm_sync = rm_sync;
1226 	zr.zr_txgp = txgp;
1227 	zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zilog->zl_header->zh_log);
1228 	zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1229 
1230 	/*
1231 	 * Wait for in-progress removes to sync before starting replay.
1232 	 */
1233 	if (rm_sync != NULL)
1234 		rm_sync(arg);
1235 	txg_wait_synced(zilog->zl_dmu_pool, 0);
1236 
1237 	zilog->zl_stop_replay = 0;
1238 	zil_parse(zilog, NULL, zil_replay_log_record, &zr,
1239 	    zilog->zl_header->zh_claim_txg);
1240 	kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
1241 
1242 	zil_destroy(zilog);
1243 }
1244