xref: /titanic_51/usr/src/uts/common/fs/zfs/zil.c (revision d485aa23b5e424dd136afdf657683389f93f72d6)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <sys/zfs_context.h>
30 #include <sys/spa.h>
31 #include <sys/dmu.h>
32 #include <sys/zap.h>
33 #include <sys/arc.h>
34 #include <sys/stat.h>
35 #include <sys/resource.h>
36 #include <sys/zil.h>
37 #include <sys/zil_impl.h>
38 #include <sys/dsl_dataset.h>
39 #include <sys/vdev.h>
40 
41 
42 /*
43  * The zfs intent log (ZIL) saves transaction records of system calls
44  * that change the file system in memory with enough information
45  * to be able to replay them. These are stored in memory until
46  * either the DMU transaction group (txg) commits them to the stable pool
47  * and they can be discarded, or they are flushed to the stable log
48  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
49  * requirement. In the event of a panic or power fail then those log
50  * records (transactions) are replayed.
51  *
52  * There is one ZIL per file system. Its on-disk (pool) format consists
53  * of 3 parts:
54  *
55  * 	- ZIL header
56  * 	- ZIL blocks
57  * 	- ZIL records
58  *
59  * A log record holds a system call transaction. Log blocks can
60  * hold many log records and the blocks are chained together.
61  * Each ZIL block contains a block pointer (blkptr_t) to the next
62  * ZIL block in the chain. The ZIL header points to the first
63  * block in the chain. Note there is not a fixed place in the pool
64  * to hold blocks. They are dynamically allocated and freed as
65  * needed from the blocks available. Figure X shows the ZIL structure:
66  */
67 
68 /*
69  * These global ZIL switches affect all pools
70  */
71 int zil_disable = 0;	/* disable intent logging */
72 int zil_always = 0;	/* make every transaction synchronous */
73 int zil_purge = 0;	/* at pool open, just throw everything away */
74 int zil_noflush = 0;	/* don't flush write cache buffers on disks */
75 
76 static kmem_cache_t *zil_lwb_cache;
77 
78 static int
79 zil_dva_compare(const void *x1, const void *x2)
80 {
81 	const dva_t *dva1 = x1;
82 	const dva_t *dva2 = x2;
83 
84 	if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
85 		return (-1);
86 	if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
87 		return (1);
88 
89 	if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
90 		return (-1);
91 	if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
92 		return (1);
93 
94 	return (0);
95 }
96 
97 static void
98 zil_dva_tree_init(avl_tree_t *t)
99 {
100 	avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
101 	    offsetof(zil_dva_node_t, zn_node));
102 }
103 
104 static void
105 zil_dva_tree_fini(avl_tree_t *t)
106 {
107 	zil_dva_node_t *zn;
108 	void *cookie = NULL;
109 
110 	while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
111 		kmem_free(zn, sizeof (zil_dva_node_t));
112 
113 	avl_destroy(t);
114 }
115 
116 static int
117 zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
118 {
119 	zil_dva_node_t *zn;
120 	avl_index_t where;
121 
122 	if (avl_find(t, dva, &where) != NULL)
123 		return (EEXIST);
124 
125 	zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
126 	zn->zn_dva = *dva;
127 	avl_insert(t, zn, where);
128 
129 	return (0);
130 }
131 
132 /*
133  * Read a log block, make sure it's valid, and byteswap it if necessary.
134  */
135 static int
136 zil_read_log_block(zilog_t *zilog, blkptr_t *bp, char *buf)
137 {
138 	uint64_t blksz = BP_GET_LSIZE(bp);
139 	zil_trailer_t *ztp = (zil_trailer_t *)(buf + blksz) - 1;
140 	zio_cksum_t cksum;
141 	int error;
142 
143 	error = zio_wait(zio_read(NULL, zilog->zl_spa, bp, buf, blksz,
144 	    NULL, NULL, ZIO_PRIORITY_SYNC_READ,
145 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE));
146 	if (error) {
147 		dprintf_bp(bp, "zilog %p bp %p read failed, error %d: ",
148 		    zilog, bp, error);
149 		return (error);
150 	}
151 
152 	if (BP_SHOULD_BYTESWAP(bp))
153 		byteswap_uint64_array(buf, blksz);
154 
155 	/*
156 	 * Sequence numbers should be... sequential.  The checksum verifier for
157 	 * the next block should be: <logid[0], logid[1], objset id, seq + 1>.
158 	 */
159 	cksum = bp->blk_cksum;
160 	cksum.zc_word[3]++;
161 	if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum, sizeof (cksum)) != 0) {
162 		dprintf_bp(bp, "zilog %p bp %p stale pointer: ", zilog, bp);
163 		return (ESTALE);
164 	}
165 
166 	if (BP_IS_HOLE(&ztp->zit_next_blk)) {
167 		dprintf_bp(bp, "zilog %p bp %p hole: ", zilog, bp);
168 		return (ENOENT);
169 	}
170 
171 	if (ztp->zit_nused > (blksz - sizeof (zil_trailer_t))) {
172 		dprintf("zilog %p bp %p nused exceeds blksz\n", zilog, bp);
173 		return (EOVERFLOW);
174 	}
175 
176 	dprintf_bp(bp, "zilog %p bp %p good block: ", zilog, bp);
177 
178 	return (0);
179 }
180 
181 /*
182  * Parse the intent log, and call parse_func for each valid record within.
183  */
184 void
185 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
186     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
187 {
188 	blkptr_t blk;
189 	char *lrbuf, *lrp;
190 	zil_trailer_t *ztp;
191 	int reclen, error;
192 
193 	blk = zilog->zl_header->zh_log;
194 	if (BP_IS_HOLE(&blk))
195 		return;
196 
197 	/*
198 	 * Starting at the block pointed to by zh_log we read the log chain.
199 	 * For each block in the chain we strongly check that block to
200 	 * ensure its validity.  We stop when an invalid block is found.
201 	 * For each block pointer in the chain we call parse_blk_func().
202 	 * For each record in each valid block we call parse_lr_func().
203 	 */
204 	zil_dva_tree_init(&zilog->zl_dva_tree);
205 	lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
206 	for (;;) {
207 		error = zil_read_log_block(zilog, &blk, lrbuf);
208 
209 		if (parse_blk_func != NULL)
210 			parse_blk_func(zilog, &blk, arg, txg);
211 
212 		if (error)
213 			break;
214 
215 		ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
216 		blk = ztp->zit_next_blk;
217 
218 		if (parse_lr_func == NULL)
219 			continue;
220 
221 		for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
222 			lr_t *lr = (lr_t *)lrp;
223 			reclen = lr->lrc_reclen;
224 			ASSERT3U(reclen, >=, sizeof (lr_t));
225 			parse_lr_func(zilog, lr, arg, txg);
226 		}
227 	}
228 	zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
229 	zil_dva_tree_fini(&zilog->zl_dva_tree);
230 }
231 
232 /* ARGSUSED */
233 static void
234 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
235 {
236 	spa_t *spa = zilog->zl_spa;
237 	int err;
238 
239 	dprintf_bp(bp, "first_txg %llu: ", first_txg);
240 
241 	/*
242 	 * Claim log block if not already committed and not already claimed.
243 	 */
244 	if (bp->blk_birth >= first_txg &&
245 	    zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
246 		err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL));
247 		ASSERT(err == 0);
248 	}
249 }
250 
251 static void
252 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
253 {
254 	if (lrc->lrc_txtype == TX_WRITE) {
255 		lr_write_t *lr = (lr_write_t *)lrc;
256 		zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
257 	}
258 }
259 
260 /* ARGSUSED */
261 static void
262 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
263 {
264 	zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
265 }
266 
267 static void
268 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
269 {
270 	/*
271 	 * If we previously claimed it, we need to free it.
272 	 */
273 	if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
274 		lr_write_t *lr = (lr_write_t *)lrc;
275 		blkptr_t *bp = &lr->lr_blkptr;
276 		if (bp->blk_birth >= claim_txg &&
277 		    !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
278 			(void) arc_free(NULL, zilog->zl_spa,
279 			    dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
280 		}
281 	}
282 }
283 
284 /*
285  * Create an on-disk intent log.
286  */
287 static void
288 zil_create(zilog_t *zilog)
289 {
290 	lwb_t *lwb;
291 	uint64_t txg;
292 	dmu_tx_t *tx;
293 	blkptr_t blk;
294 	int error;
295 
296 	ASSERT(zilog->zl_header->zh_claim_txg == 0);
297 	ASSERT(zilog->zl_header->zh_replay_seq == 0);
298 
299 	/*
300 	 * Initialize the log header block.
301 	 */
302 	tx = dmu_tx_create(zilog->zl_os);
303 	(void) dmu_tx_assign(tx, TXG_WAIT);
304 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
305 	txg = dmu_tx_get_txg(tx);
306 
307 	/*
308 	 * Allocate the first log block and assign its checksum verifier.
309 	 */
310 	error = zio_alloc_blk(zilog->zl_spa, ZIO_CHECKSUM_ZILOG,
311 	    ZIL_MIN_BLKSZ, &blk, txg);
312 	if (error == 0) {
313 		ZIO_SET_CHECKSUM(&blk.blk_cksum,
314 		    spa_get_random(-1ULL), spa_get_random(-1ULL),
315 		    dmu_objset_id(zilog->zl_os), 1ULL);
316 
317 		/*
318 		 * Allocate a log write buffer (lwb) for the first log block.
319 		 */
320 		lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
321 		lwb->lwb_zilog = zilog;
322 		lwb->lwb_blk = blk;
323 		lwb->lwb_nused = 0;
324 		lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
325 		lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
326 		lwb->lwb_max_txg = txg;
327 		lwb->lwb_seq = 0;
328 		lwb->lwb_state = UNWRITTEN;
329 		mutex_enter(&zilog->zl_lock);
330 		list_insert_tail(&zilog->zl_lwb_list, lwb);
331 		mutex_exit(&zilog->zl_lock);
332 	}
333 
334 	dmu_tx_commit(tx);
335 	txg_wait_synced(zilog->zl_dmu_pool, txg);
336 }
337 
338 /*
339  * In one tx, free all log blocks and clear the log header.
340  */
341 void
342 zil_destroy(zilog_t *zilog)
343 {
344 	dmu_tx_t *tx;
345 	uint64_t txg;
346 
347 	mutex_enter(&zilog->zl_destroy_lock);
348 
349 	if (BP_IS_HOLE(&zilog->zl_header->zh_log)) {
350 		mutex_exit(&zilog->zl_destroy_lock);
351 		return;
352 	}
353 
354 	tx = dmu_tx_create(zilog->zl_os);
355 	(void) dmu_tx_assign(tx, TXG_WAIT);
356 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
357 	txg = dmu_tx_get_txg(tx);
358 
359 	zil_parse(zilog, zil_free_log_block, zil_free_log_record, tx,
360 	    zilog->zl_header->zh_claim_txg);
361 	zilog->zl_destroy_txg = txg;
362 
363 	dmu_tx_commit(tx);
364 	txg_wait_synced(zilog->zl_dmu_pool, txg);
365 
366 	mutex_exit(&zilog->zl_destroy_lock);
367 }
368 
369 void
370 zil_claim(char *osname, void *txarg)
371 {
372 	dmu_tx_t *tx = txarg;
373 	uint64_t first_txg = dmu_tx_get_txg(tx);
374 	zilog_t *zilog;
375 	zil_header_t *zh;
376 	objset_t *os;
377 	int error;
378 
379 	error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_STANDARD, &os);
380 	if (error) {
381 		cmn_err(CE_WARN, "can't process intent log for %s", osname);
382 		return;
383 	}
384 
385 	zilog = dmu_objset_zil(os);
386 	zh = zilog->zl_header;
387 
388 	/*
389 	 * Claim all log blocks if we haven't already done so.
390 	 */
391 	ASSERT3U(zh->zh_claim_txg, <=, first_txg);
392 	if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
393 		zh->zh_claim_txg = first_txg;
394 		zil_parse(zilog, zil_claim_log_block, zil_claim_log_record,
395 		    tx, first_txg);
396 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
397 	}
398 	ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
399 	dmu_objset_close(os);
400 }
401 
402 void
403 zil_add_vdev(zilog_t *zilog, uint64_t vdev, uint64_t seq)
404 {
405 	zil_vdev_t *zv;
406 
407 	if (zil_noflush)
408 		return;
409 
410 	ASSERT(MUTEX_HELD(&zilog->zl_lock));
411 	zv = kmem_alloc(sizeof (zil_vdev_t), KM_SLEEP);
412 	zv->vdev = vdev;
413 	zv->seq = seq;
414 	list_insert_tail(&zilog->zl_vdev_list, zv);
415 }
416 
417 
418 void
419 zil_flush_vdevs(zilog_t *zilog, uint64_t seq)
420 {
421 	vdev_t *vd;
422 	zil_vdev_t *zv, *zv2;
423 	zio_t *zio;
424 	spa_t *spa;
425 	uint64_t vdev;
426 
427 	if (zil_noflush)
428 		return;
429 
430 	ASSERT(MUTEX_HELD(&zilog->zl_lock));
431 
432 	spa = zilog->zl_spa;
433 	zio = NULL;
434 
435 	while ((zv = list_head(&zilog->zl_vdev_list)) != NULL &&
436 	    zv->seq <= seq) {
437 		vdev = zv->vdev;
438 		list_remove(&zilog->zl_vdev_list, zv);
439 		kmem_free(zv, sizeof (zil_vdev_t));
440 
441 		/*
442 		 * remove all chained entries <= seq with same vdev
443 		 */
444 		zv = list_head(&zilog->zl_vdev_list);
445 		while (zv && zv->seq <= seq) {
446 			zv2 = list_next(&zilog->zl_vdev_list, zv);
447 			if (zv->vdev == vdev) {
448 				list_remove(&zilog->zl_vdev_list, zv);
449 				kmem_free(zv, sizeof (zil_vdev_t));
450 			}
451 			zv = zv2;
452 		}
453 
454 		/* flush the write cache for this vdev */
455 		mutex_exit(&zilog->zl_lock);
456 		if (zio == NULL)
457 			zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
458 		vd = vdev_lookup_top(spa, vdev);
459 		ASSERT(vd);
460 		(void) zio_nowait(zio_ioctl(zio, spa, vd, DKIOCFLUSHWRITECACHE,
461 		    NULL, NULL, ZIO_PRIORITY_NOW,
462 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY));
463 		mutex_enter(&zilog->zl_lock);
464 	}
465 
466 	/*
467 	 * Wait for all the flushes to complete.  Not all devices actually
468 	 * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
469 	 */
470 	if (zio != NULL)
471 		(void) zio_wait(zio);
472 }
473 
474 /*
475  * Function called when a log block write completes
476  */
477 static void
478 zil_lwb_write_done(zio_t *zio)
479 {
480 	lwb_t *prev;
481 	lwb_t *lwb = zio->io_private;
482 	zilog_t *zilog = lwb->lwb_zilog;
483 	uint64_t max_seq;
484 
485 	/*
486 	 * Now that we've written this log block, we have a stable pointer
487 	 * to the next block in the chain, so it's OK to let the txg in
488 	 * which we allocated the next block sync.
489 	 */
490 	txg_rele_to_sync(&lwb->lwb_txgh);
491 
492 	zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
493 	mutex_enter(&zilog->zl_lock);
494 	lwb->lwb_buf = NULL;
495 	if (zio->io_error) {
496 		zilog->zl_log_error = B_TRUE;
497 		mutex_exit(&zilog->zl_lock);
498 		cv_broadcast(&zilog->zl_cv_seq);
499 		return;
500 	}
501 
502 	prev = list_prev(&zilog->zl_lwb_list, lwb);
503 	if (prev && prev->lwb_state != SEQ_COMPLETE) {
504 		/* There's an unwritten buffer in the chain before this one */
505 		lwb->lwb_state = SEQ_INCOMPLETE;
506 		mutex_exit(&zilog->zl_lock);
507 		return;
508 	}
509 
510 	max_seq = lwb->lwb_seq;
511 	lwb->lwb_state = SEQ_COMPLETE;
512 	/*
513 	 * We must also follow up the chain for already written buffers
514 	 * to see if we can set zl_ss_seq even higher.
515 	 */
516 	while (lwb = list_next(&zilog->zl_lwb_list, lwb)) {
517 		if (lwb->lwb_state != SEQ_INCOMPLETE)
518 			break;
519 		lwb->lwb_state = SEQ_COMPLETE;
520 		/* lwb_seq will be zero if we've written an empty buffer */
521 		if (lwb->lwb_seq) {
522 			ASSERT3U(max_seq, <, lwb->lwb_seq);
523 			max_seq = lwb->lwb_seq;
524 		}
525 	}
526 	zilog->zl_ss_seq = MAX(max_seq, zilog->zl_ss_seq);
527 	mutex_exit(&zilog->zl_lock);
528 	cv_broadcast(&zilog->zl_cv_seq);
529 }
530 
531 /*
532  * Start a log block write and advance to the next log block.
533  * Calls are serialized.
534  */
535 static lwb_t *
536 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
537 {
538 	lwb_t *nlwb;
539 	zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
540 	uint64_t txg;
541 	uint64_t zil_blksz;
542 	int error;
543 
544 	ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
545 
546 	/*
547 	 * Allocate the next block and save its address in this block
548 	 * before writing it in order to establish the log chain.
549 	 * Note that if the allocation of nlwb synced before we wrote
550 	 * the block that points at it (lwb), we'd leak it if we crashed.
551 	 * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
552 	 */
553 	txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
554 	txg_rele_to_quiesce(&lwb->lwb_txgh);
555 
556 	/*
557 	 * Pick a ZIL blocksize based upon the size of the outstanding
558 	 * in-memory transactions, or if none the same size as the
559 	 * last block.
560 	 */
561 	if (zilog->zl_itx_list_sz) {
562 		zil_blksz = zilog->zl_itx_list_sz + sizeof (*ztp);
563 		zil_blksz = P2ROUNDUP(zil_blksz, ZIL_MIN_BLKSZ);
564 		if (zil_blksz > ZIL_MAX_BLKSZ)
565 			zil_blksz = ZIL_MAX_BLKSZ;
566 		zilog->zl_prev_blk_sz = zil_blksz;
567 	} else {
568 		zil_blksz = zilog->zl_prev_blk_sz;
569 	}
570 
571 	error = zio_alloc_blk(zilog->zl_spa, ZIO_CHECKSUM_ZILOG,
572 	    zil_blksz, &ztp->zit_next_blk, txg);
573 	if (error) {
574 		txg_rele_to_sync(&lwb->lwb_txgh);
575 		return (NULL);
576 	}
577 
578 	ASSERT3U(ztp->zit_next_blk.blk_birth, ==, txg);
579 	ztp->zit_nused = lwb->lwb_nused;
580 	ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
581 	ztp->zit_next_blk.blk_cksum = lwb->lwb_blk.blk_cksum;
582 	ztp->zit_next_blk.blk_cksum.zc_word[3]++;
583 
584 	/*
585 	 * Allocate a new log write buffer (lwb).
586 	 */
587 	nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
588 
589 	nlwb->lwb_zilog = zilog;
590 	nlwb->lwb_blk = ztp->zit_next_blk;
591 	nlwb->lwb_nused = 0;
592 	nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
593 	nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
594 	nlwb->lwb_max_txg = txg;
595 	nlwb->lwb_seq = 0;
596 	nlwb->lwb_state = UNWRITTEN;
597 
598 	/*
599 	 * Put new lwb at the end of the log chain,
600 	 * and record the vdev for later flushing
601 	 */
602 	mutex_enter(&zilog->zl_lock);
603 	list_insert_tail(&zilog->zl_lwb_list, nlwb);
604 	zil_add_vdev(zilog, DVA_GET_VDEV(BP_IDENTITY(&(lwb->lwb_blk))),
605 	    lwb->lwb_seq);
606 	mutex_exit(&zilog->zl_lock);
607 
608 	/*
609 	 * write the old log block
610 	 */
611 	dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
612 	zio_nowait(zio_rewrite(NULL, zilog->zl_spa, ZIO_CHECKSUM_ZILOG, 0,
613 	    &lwb->lwb_blk, lwb->lwb_buf, lwb->lwb_sz, zil_lwb_write_done, lwb,
614 	    ZIO_PRIORITY_LOG_WRITE, ZIO_FLAG_MUSTSUCCEED));
615 
616 	return (nlwb);
617 }
618 
619 static lwb_t *
620 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
621 {
622 	lr_t *lrc = &itx->itx_lr; /* common log record */
623 	uint64_t seq = lrc->lrc_seq;
624 	uint64_t txg = lrc->lrc_txg;
625 	uint64_t reclen = lrc->lrc_reclen;
626 	int error;
627 
628 	if (lwb == NULL)
629 		return (NULL);
630 	ASSERT(lwb->lwb_buf != NULL);
631 
632 	/*
633 	 * If it's a write, fetch the data or get its blkptr as appropriate.
634 	 */
635 	if (lrc->lrc_txtype == TX_WRITE) {
636 		lr_write_t *lr = (lr_write_t *)lrc;
637 		if (txg > spa_freeze_txg(zilog->zl_spa))
638 			txg_wait_synced(zilog->zl_dmu_pool, txg);
639 
640 		if (!itx->itx_data_copied &&
641 		    (error = zilog->zl_get_data(itx->itx_private, lr)) != 0) {
642 			if (error != ENOENT && error != EALREADY) {
643 				txg_wait_synced(zilog->zl_dmu_pool, txg);
644 				mutex_enter(&zilog->zl_lock);
645 				zilog->zl_ss_seq = MAX(seq, zilog->zl_ss_seq);
646 				zil_add_vdev(zilog,
647 				    DVA_GET_VDEV(BP_IDENTITY(&(lr->lr_blkptr))),
648 				    seq);
649 				mutex_exit(&zilog->zl_lock);
650 				return (lwb);
651 			}
652 			mutex_enter(&zilog->zl_lock);
653 			zil_add_vdev(zilog,
654 			    DVA_GET_VDEV(BP_IDENTITY(&(lr->lr_blkptr))), seq);
655 			mutex_exit(&zilog->zl_lock);
656 			return (lwb);
657 		}
658 	}
659 
660 	/*
661 	 * If this record won't fit in the current log block, start a new one.
662 	 */
663 	if (lwb->lwb_nused + reclen > ZIL_BLK_DATA_SZ(lwb)) {
664 		lwb = zil_lwb_write_start(zilog, lwb);
665 		if (lwb == NULL)
666 			return (NULL);
667 		if (lwb->lwb_nused + reclen > ZIL_BLK_DATA_SZ(lwb)) {
668 			txg_wait_synced(zilog->zl_dmu_pool, txg);
669 			mutex_enter(&zilog->zl_lock);
670 			zilog->zl_ss_seq = MAX(seq, zilog->zl_ss_seq);
671 			mutex_exit(&zilog->zl_lock);
672 			return (lwb);
673 		}
674 	}
675 
676 	bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
677 	lwb->lwb_nused += reclen;
678 	lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
679 	ASSERT3U(lwb->lwb_seq, <, seq);
680 	lwb->lwb_seq = seq;
681 	ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
682 	ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
683 
684 	return (lwb);
685 }
686 
687 itx_t *
688 zil_itx_create(int txtype, size_t lrsize)
689 {
690 	itx_t *itx;
691 
692 	lrsize = P2ROUNDUP(lrsize, sizeof (uint64_t));
693 
694 	itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
695 	itx->itx_lr.lrc_txtype = txtype;
696 	itx->itx_lr.lrc_reclen = lrsize;
697 	itx->itx_lr.lrc_seq = 0;	/* defensive */
698 
699 	return (itx);
700 }
701 
702 uint64_t
703 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
704 {
705 	uint64_t seq;
706 
707 	ASSERT(itx->itx_lr.lrc_seq == 0);
708 
709 	mutex_enter(&zilog->zl_lock);
710 	list_insert_tail(&zilog->zl_itx_list, itx);
711 	zilog->zl_itx_list_sz += itx->itx_lr.lrc_reclen;
712 	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
713 	itx->itx_lr.lrc_seq = seq = ++zilog->zl_itx_seq;
714 	mutex_exit(&zilog->zl_lock);
715 
716 	return (seq);
717 }
718 
719 /*
720  * Free up all in-memory intent log transactions that have now been synced.
721  */
722 static void
723 zil_itx_clean(zilog_t *zilog)
724 {
725 	uint64_t synced_txg = spa_last_synced_txg(zilog->zl_spa);
726 	uint64_t freeze_txg = spa_freeze_txg(zilog->zl_spa);
727 	uint64_t max_seq = 0;
728 	itx_t *itx;
729 
730 	mutex_enter(&zilog->zl_lock);
731 	while ((itx = list_head(&zilog->zl_itx_list)) != NULL &&
732 	    itx->itx_lr.lrc_txg <= MIN(synced_txg, freeze_txg)) {
733 		list_remove(&zilog->zl_itx_list, itx);
734 		zilog->zl_itx_list_sz -= itx->itx_lr.lrc_reclen;
735 		ASSERT3U(max_seq, <, itx->itx_lr.lrc_seq);
736 		max_seq = itx->itx_lr.lrc_seq;
737 		kmem_free(itx, offsetof(itx_t, itx_lr)
738 		    + itx->itx_lr.lrc_reclen);
739 	}
740 	if (max_seq > zilog->zl_ss_seq) {
741 		zilog->zl_ss_seq = max_seq;
742 		cv_broadcast(&zilog->zl_cv_seq);
743 	}
744 	mutex_exit(&zilog->zl_lock);
745 }
746 
747 void
748 zil_clean(zilog_t *zilog)
749 {
750 	/*
751 	 * Check for any log blocks that can be freed.
752 	 * Log blocks are only freed when the log block allocation and
753 	 * log records contained within are both known to be committed.
754 	 */
755 	mutex_enter(&zilog->zl_lock);
756 	if (list_head(&zilog->zl_itx_list) != NULL)
757 		(void) taskq_dispatch(zilog->zl_clean_taskq,
758 		    (void (*)(void *))zil_itx_clean, zilog, TQ_NOSLEEP);
759 	mutex_exit(&zilog->zl_lock);
760 }
761 
762 /*
763  * Push zfs transactions to stable storage up to the supplied sequence number.
764  */
765 void
766 zil_commit(zilog_t *zilog, uint64_t seq, int ioflag)
767 {
768 	uint64_t txg;
769 	uint64_t max_seq;
770 	uint64_t reclen;
771 	itx_t *itx;
772 	lwb_t *lwb;
773 	spa_t *spa;
774 
775 	if (zilog == NULL || seq == 0 ||
776 	    ((ioflag & (FSYNC | FDSYNC | FRSYNC)) == 0 && !zil_always))
777 		return;
778 
779 	spa = zilog->zl_spa;
780 	mutex_enter(&zilog->zl_lock);
781 
782 	seq = MIN(seq, zilog->zl_itx_seq);	/* cap seq at largest itx seq */
783 
784 	for (;;) {
785 		if (zilog->zl_ss_seq >= seq) {	/* already on stable storage */
786 			cv_signal(&zilog->zl_cv_write);
787 			mutex_exit(&zilog->zl_lock);
788 			return;
789 		}
790 
791 		if (zilog->zl_writer == B_FALSE) /* no one writing, do it */
792 			break;
793 
794 		cv_wait(&zilog->zl_cv_write, &zilog->zl_lock);
795 	}
796 
797 	zilog->zl_writer = B_TRUE;
798 	max_seq = 0;
799 
800 	if (zilog->zl_suspend) {
801 		lwb = NULL;
802 	} else {
803 		lwb = list_tail(&zilog->zl_lwb_list);
804 		if (lwb == NULL) {
805 			mutex_exit(&zilog->zl_lock);
806 			zil_create(zilog);
807 			mutex_enter(&zilog->zl_lock);
808 			lwb = list_tail(&zilog->zl_lwb_list);
809 		}
810 	}
811 
812 	/*
813 	 * Loop through in-memory log transactions filling log blocks,
814 	 * until we reach the given sequence number and there's no more
815 	 * room in the write buffer.
816 	 */
817 	for (;;) {
818 		itx = list_head(&zilog->zl_itx_list);
819 		if (itx == NULL)
820 			break;
821 
822 		reclen = itx->itx_lr.lrc_reclen;
823 		if ((itx->itx_lr.lrc_seq > seq) &&
824 		    ((lwb == NULL) || (lwb->lwb_nused + reclen >
825 		    ZIL_BLK_DATA_SZ(lwb))))
826 			break;
827 
828 		list_remove(&zilog->zl_itx_list, itx);
829 		txg = itx->itx_lr.lrc_txg;
830 		ASSERT(txg);
831 
832 		mutex_exit(&zilog->zl_lock);
833 		if (txg > spa_last_synced_txg(spa) ||
834 		    txg > spa_freeze_txg(spa))
835 			lwb = zil_lwb_commit(zilog, itx, lwb);
836 		else
837 			max_seq = itx->itx_lr.lrc_seq;
838 		kmem_free(itx, offsetof(itx_t, itx_lr)
839 		    + itx->itx_lr.lrc_reclen);
840 		mutex_enter(&zilog->zl_lock);
841 		zilog->zl_itx_list_sz -= reclen;
842 	}
843 
844 	mutex_exit(&zilog->zl_lock);
845 
846 	/* write the last block out */
847 	if (lwb != NULL && lwb->lwb_nused != 0)
848 		lwb = zil_lwb_write_start(zilog, lwb);
849 
850 	/* wake up others waiting to start a write */
851 	mutex_enter(&zilog->zl_lock);
852 	zilog->zl_writer = B_FALSE;
853 	cv_signal(&zilog->zl_cv_write);
854 
855 	if (max_seq > zilog->zl_ss_seq) {
856 		zilog->zl_ss_seq = max_seq;
857 		cv_broadcast(&zilog->zl_cv_seq);
858 	}
859 	/*
860 	 * Wait if necessary for our seq to be committed.
861 	 */
862 	if (lwb) {
863 		while (zilog->zl_ss_seq < seq && zilog->zl_log_error == 0)
864 			cv_wait(&zilog->zl_cv_seq, &zilog->zl_lock);
865 		zil_flush_vdevs(zilog, seq);
866 	}
867 	if (zilog->zl_log_error || lwb == NULL) {
868 		zilog->zl_log_error = 0;
869 		max_seq = zilog->zl_itx_seq;
870 		mutex_exit(&zilog->zl_lock);
871 		txg_wait_synced(zilog->zl_dmu_pool, 0);
872 		mutex_enter(&zilog->zl_lock);
873 		zilog->zl_ss_seq = MAX(max_seq, zilog->zl_ss_seq);
874 		cv_broadcast(&zilog->zl_cv_seq);
875 	}
876 	mutex_exit(&zilog->zl_lock);
877 }
878 
879 /*
880  * Called in syncing context to free committed log blocks and update log header.
881  */
882 void
883 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
884 {
885 	uint64_t txg = dmu_tx_get_txg(tx);
886 	spa_t *spa = zilog->zl_spa;
887 	lwb_t *lwb;
888 
889 	ASSERT(zilog->zl_stop_sync == 0);
890 
891 	zilog->zl_header->zh_replay_seq = zilog->zl_replay_seq[txg & TXG_MASK];
892 
893 	if (zilog->zl_destroy_txg == txg) {
894 		bzero(zilog->zl_header, sizeof (zil_header_t));
895 		bzero(zilog->zl_replay_seq, sizeof (zilog->zl_replay_seq));
896 		zilog->zl_destroy_txg = 0;
897 	}
898 
899 	mutex_enter(&zilog->zl_lock);
900 	for (;;) {
901 		lwb = list_head(&zilog->zl_lwb_list);
902 		if (lwb == NULL) {
903 			mutex_exit(&zilog->zl_lock);
904 			return;
905 		}
906 		if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
907 			break;
908 		list_remove(&zilog->zl_lwb_list, lwb);
909 		zio_free_blk(spa, &lwb->lwb_blk, txg);
910 		kmem_cache_free(zil_lwb_cache, lwb);
911 	}
912 	zilog->zl_header->zh_log = lwb->lwb_blk;
913 	mutex_exit(&zilog->zl_lock);
914 }
915 
916 void
917 zil_init(void)
918 {
919 	zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
920 	    sizeof (struct lwb), NULL, NULL, NULL, NULL, NULL, NULL, 0);
921 }
922 
923 void
924 zil_fini(void)
925 {
926 	kmem_cache_destroy(zil_lwb_cache);
927 }
928 
929 zilog_t *
930 zil_alloc(objset_t *os, zil_header_t *zh_phys)
931 {
932 	zilog_t *zilog;
933 
934 	zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
935 
936 	zilog->zl_header = zh_phys;
937 	zilog->zl_os = os;
938 	zilog->zl_spa = dmu_objset_spa(os);
939 	zilog->zl_dmu_pool = dmu_objset_pool(os);
940 	zilog->zl_prev_blk_sz = ZIL_MIN_BLKSZ;
941 
942 	list_create(&zilog->zl_itx_list, sizeof (itx_t),
943 	    offsetof(itx_t, itx_node));
944 
945 	list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
946 	    offsetof(lwb_t, lwb_node));
947 
948 	list_create(&zilog->zl_vdev_list, sizeof (zil_vdev_t),
949 	    offsetof(zil_vdev_t, vdev_seq_node));
950 
951 	return (zilog);
952 }
953 
954 void
955 zil_free(zilog_t *zilog)
956 {
957 	lwb_t *lwb;
958 	zil_vdev_t *zv;
959 
960 	zilog->zl_stop_sync = 1;
961 
962 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
963 		list_remove(&zilog->zl_lwb_list, lwb);
964 		if (lwb->lwb_buf != NULL)
965 			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
966 		kmem_cache_free(zil_lwb_cache, lwb);
967 	}
968 	list_destroy(&zilog->zl_lwb_list);
969 
970 	while ((zv = list_head(&zilog->zl_vdev_list)) != NULL) {
971 		list_remove(&zilog->zl_vdev_list, zv);
972 		kmem_free(zv, sizeof (zil_vdev_t));
973 	}
974 	list_destroy(&zilog->zl_vdev_list);
975 
976 	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
977 	list_destroy(&zilog->zl_itx_list);
978 
979 	kmem_free(zilog, sizeof (zilog_t));
980 }
981 
982 /*
983  * Open an intent log.
984  */
985 zilog_t *
986 zil_open(objset_t *os, zil_get_data_t *get_data)
987 {
988 	zilog_t *zilog = dmu_objset_zil(os);
989 
990 	zilog->zl_get_data = get_data;
991 	zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
992 	    2, 2, TASKQ_PREPOPULATE);
993 
994 	return (zilog);
995 }
996 
997 /*
998  * Close an intent log.
999  */
1000 void
1001 zil_close(zilog_t *zilog)
1002 {
1003 	txg_wait_synced(zilog->zl_dmu_pool, 0);
1004 	taskq_destroy(zilog->zl_clean_taskq);
1005 	zilog->zl_clean_taskq = NULL;
1006 	zilog->zl_get_data = NULL;
1007 
1008 	zil_itx_clean(zilog);
1009 	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
1010 }
1011 
1012 /*
1013  * Suspend an intent log.  While in suspended mode, we still honor
1014  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1015  * We suspend the log briefly when taking a snapshot so that the snapshot
1016  * contains all the data it's supposed to, and has an empty intent log.
1017  */
1018 int
1019 zil_suspend(zilog_t *zilog)
1020 {
1021 	lwb_t *lwb;
1022 
1023 	mutex_enter(&zilog->zl_lock);
1024 	if (zilog->zl_header->zh_claim_txg != 0) {	/* unplayed log */
1025 		mutex_exit(&zilog->zl_lock);
1026 		return (EBUSY);
1027 	}
1028 	zilog->zl_suspend++;
1029 	mutex_exit(&zilog->zl_lock);
1030 
1031 	zil_commit(zilog, UINT64_MAX, FSYNC);
1032 
1033 	mutex_enter(&zilog->zl_lock);
1034 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1035 		if (lwb->lwb_buf != NULL) {
1036 			/*
1037 			 * Wait for the buffer if it's in the process of
1038 			 * being written.
1039 			 */
1040 			if ((lwb->lwb_seq != 0) &&
1041 			    (lwb->lwb_state != SEQ_COMPLETE)) {
1042 				cv_wait(&zilog->zl_cv_seq, &zilog->zl_lock);
1043 				continue;
1044 			}
1045 			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1046 		}
1047 		list_remove(&zilog->zl_lwb_list, lwb);
1048 		kmem_cache_free(zil_lwb_cache, lwb);
1049 	}
1050 	mutex_exit(&zilog->zl_lock);
1051 
1052 	zil_destroy(zilog);
1053 
1054 	return (0);
1055 }
1056 
1057 void
1058 zil_resume(zilog_t *zilog)
1059 {
1060 	mutex_enter(&zilog->zl_lock);
1061 	ASSERT(zilog->zl_suspend != 0);
1062 	zilog->zl_suspend--;
1063 	mutex_exit(&zilog->zl_lock);
1064 }
1065 
1066 typedef struct zil_replay_arg {
1067 	objset_t	*zr_os;
1068 	zil_replay_func_t **zr_replay;
1069 	void		*zr_arg;
1070 	void		(*zr_rm_sync)(void *arg);
1071 	uint64_t	*zr_txgp;
1072 	boolean_t	zr_byteswap;
1073 	char		*zr_lrbuf;
1074 } zil_replay_arg_t;
1075 
1076 static void
1077 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1078 {
1079 	zil_replay_arg_t *zr = zra;
1080 	zil_header_t *zh = zilog->zl_header;
1081 	uint64_t reclen = lr->lrc_reclen;
1082 	uint64_t txtype = lr->lrc_txtype;
1083 	int pass, error;
1084 
1085 	if (zilog->zl_stop_replay)
1086 		return;
1087 
1088 	if (lr->lrc_txg < claim_txg)		/* already committed */
1089 		return;
1090 
1091 	if (lr->lrc_seq <= zh->zh_replay_seq)	/* already replayed */
1092 		return;
1093 
1094 	/*
1095 	 * Make a copy of the data so we can revise and extend it.
1096 	 */
1097 	bcopy(lr, zr->zr_lrbuf, reclen);
1098 
1099 	/*
1100 	 * The log block containing this lr may have been byteswapped
1101 	 * so that we can easily examine common fields like lrc_txtype.
1102 	 * However, the log is a mix of different data types, and only the
1103 	 * replay vectors know how to byteswap their records.  Therefore, if
1104 	 * the lr was byteswapped, undo it before invoking the replay vector.
1105 	 */
1106 	if (zr->zr_byteswap)
1107 		byteswap_uint64_array(zr->zr_lrbuf, reclen);
1108 
1109 	/*
1110 	 * If this is a TX_WRITE with a blkptr, suck in the data.
1111 	 */
1112 	if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1113 		lr_write_t *lrw = (lr_write_t *)lr;
1114 		blkptr_t *wbp = &lrw->lr_blkptr;
1115 		uint64_t wlen = lrw->lr_length;
1116 		char *wbuf = zr->zr_lrbuf + reclen;
1117 
1118 		if (BP_IS_HOLE(wbp)) {	/* compressed to a hole */
1119 			bzero(wbuf, wlen);
1120 		} else {
1121 			/*
1122 			 * A subsequent write may have overwritten this block,
1123 			 * in which case wbp may have been been freed and
1124 			 * reallocated, and our read of wbp may fail with a
1125 			 * checksum error.  We can safely ignore this because
1126 			 * the later write will provide the correct data.
1127 			 */
1128 			(void) zio_wait(zio_read(NULL, zilog->zl_spa,
1129 			    wbp, wbuf, BP_GET_LSIZE(wbp), NULL, NULL,
1130 			    ZIO_PRIORITY_SYNC_READ,
1131 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE));
1132 			(void) memmove(wbuf, wbuf + lrw->lr_blkoff, wlen);
1133 		}
1134 	}
1135 
1136 	/*
1137 	 * We must now do two things atomically: replay this log record,
1138 	 * and update the log header to reflect the fact that we did so.
1139 	 * We use the DMU's ability to assign into a specific txg to do this.
1140 	 */
1141 	for (pass = 1; /* CONSTANTCONDITION */; pass++) {
1142 		uint64_t replay_txg;
1143 		dmu_tx_t *replay_tx;
1144 
1145 		replay_tx = dmu_tx_create(zr->zr_os);
1146 		error = dmu_tx_assign(replay_tx, TXG_WAIT);
1147 		if (error) {
1148 			dmu_tx_abort(replay_tx);
1149 			break;
1150 		}
1151 
1152 		replay_txg = dmu_tx_get_txg(replay_tx);
1153 
1154 		if (txtype == 0 || txtype >= TX_MAX_TYPE) {
1155 			error = EINVAL;
1156 		} else {
1157 			/*
1158 			 * On the first pass, arrange for the replay vector
1159 			 * to fail its dmu_tx_assign().  That's the only way
1160 			 * to ensure that those code paths remain well tested.
1161 			 */
1162 			*zr->zr_txgp = replay_txg - (pass == 1);
1163 			error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
1164 			    zr->zr_byteswap);
1165 			*zr->zr_txgp = TXG_NOWAIT;
1166 		}
1167 
1168 		if (error == 0) {
1169 			dsl_dataset_dirty(dmu_objset_ds(zr->zr_os), replay_tx);
1170 			zilog->zl_replay_seq[replay_txg & TXG_MASK] =
1171 			    lr->lrc_seq;
1172 		}
1173 
1174 		dmu_tx_commit(replay_tx);
1175 
1176 		if (error != ERESTART)
1177 			break;
1178 
1179 		if (pass != 1)
1180 			txg_wait_open(spa_get_dsl(zilog->zl_spa),
1181 			    replay_txg + 1);
1182 
1183 		dprintf("pass %d, retrying\n", pass);
1184 	}
1185 
1186 	if (error) {
1187 		char *name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
1188 		dmu_objset_name(zr->zr_os, name);
1189 		cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1190 		    "dataset %s, seq 0x%llx, txtype %llu\n",
1191 		    error, name,
1192 		    (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype);
1193 		zilog->zl_stop_replay = 1;
1194 		kmem_free(name, MAXNAMELEN);
1195 	}
1196 
1197 	/*
1198 	 * The DMU's dnode layer doesn't see removes until the txg commits,
1199 	 * so a subsequent claim can spuriously fail with EEXIST.
1200 	 * To prevent this, if we might have removed an object,
1201 	 * wait for the delete thread to delete it, and then
1202 	 * wait for the transaction group to sync.
1203 	 */
1204 	if (txtype == TX_REMOVE || txtype == TX_RMDIR || txtype == TX_RENAME) {
1205 		if (zr->zr_rm_sync != NULL)
1206 			zr->zr_rm_sync(zr->zr_arg);
1207 		txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
1208 	}
1209 }
1210 
1211 /*
1212  * If this dataset has an intent log, replay it and destroy it.
1213  */
1214 void
1215 zil_replay(objset_t *os, void *arg, uint64_t *txgp,
1216 	zil_replay_func_t *replay_func[TX_MAX_TYPE], void (*rm_sync)(void *arg))
1217 {
1218 	zilog_t *zilog = dmu_objset_zil(os);
1219 	zil_replay_arg_t zr;
1220 
1221 	zr.zr_os = os;
1222 	zr.zr_replay = replay_func;
1223 	zr.zr_arg = arg;
1224 	zr.zr_rm_sync = rm_sync;
1225 	zr.zr_txgp = txgp;
1226 	zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zilog->zl_header->zh_log);
1227 	zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1228 
1229 	/*
1230 	 * Wait for in-progress removes to sync before starting replay.
1231 	 */
1232 	if (rm_sync != NULL)
1233 		rm_sync(arg);
1234 	txg_wait_synced(zilog->zl_dmu_pool, 0);
1235 
1236 	zilog->zl_stop_replay = 0;
1237 	zil_parse(zilog, NULL, zil_replay_log_record, &zr,
1238 	    zilog->zl_header->zh_claim_txg);
1239 	kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
1240 
1241 	zil_destroy(zilog);
1242 }
1243