xref: /titanic_50/usr/src/uts/common/fs/zfs/zil.c (revision 65cd9f2809a015b46790a9c5c2ef992d56177624)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <sys/zfs_context.h>
30 #include <sys/spa.h>
31 #include <sys/dmu.h>
32 #include <sys/zap.h>
33 #include <sys/arc.h>
34 #include <sys/stat.h>
35 #include <sys/resource.h>
36 #include <sys/zil.h>
37 #include <sys/zil_impl.h>
38 #include <sys/dsl_dataset.h>
39 #include <sys/vdev.h>
40 
41 /*
42  * The zfs intent log (ZIL) saves transaction records of system calls
43  * that change the file system in memory with enough information
44  * to be able to replay them. These are stored in memory until
45  * either the DMU transaction group (txg) commits them to the stable pool
46  * and they can be discarded, or they are flushed to the stable log
47  * (also in the pool) due to a fsync, O_DSYNC or other synchronous
48  * requirement. In the event of a panic or power fail then those log
49  * records (transactions) are replayed.
50  *
51  * There is one ZIL per file system. Its on-disk (pool) format consists
52  * of 3 parts:
53  *
54  * 	- ZIL header
55  * 	- ZIL blocks
56  * 	- ZIL records
57  *
58  * A log record holds a system call transaction. Log blocks can
59  * hold many log records and the blocks are chained together.
60  * Each ZIL block contains a block pointer (blkptr_t) to the next
61  * ZIL block in the chain. The ZIL header points to the first
62  * block in the chain. Note there is not a fixed place in the pool
63  * to hold blocks. They are dynamically allocated and freed as
64  * needed from the blocks available. Figure X shows the ZIL structure:
65  */
66 
67 /*
68  * These global ZIL switches affect all pools
69  */
70 int zil_disable = 0;	/* disable intent logging */
71 int zil_always = 0;	/* make every transaction synchronous */
72 int zil_purge = 0;	/* at pool open, just throw everything away */
73 int zil_noflush = 0;	/* don't flush write cache buffers on disks */
74 
75 static kmem_cache_t *zil_lwb_cache;
76 
77 static int
78 zil_dva_compare(const void *x1, const void *x2)
79 {
80 	const dva_t *dva1 = x1;
81 	const dva_t *dva2 = x2;
82 
83 	if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
84 		return (-1);
85 	if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
86 		return (1);
87 
88 	if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
89 		return (-1);
90 	if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
91 		return (1);
92 
93 	return (0);
94 }
95 
96 static void
97 zil_dva_tree_init(avl_tree_t *t)
98 {
99 	avl_create(t, zil_dva_compare, sizeof (zil_dva_node_t),
100 	    offsetof(zil_dva_node_t, zn_node));
101 }
102 
103 static void
104 zil_dva_tree_fini(avl_tree_t *t)
105 {
106 	zil_dva_node_t *zn;
107 	void *cookie = NULL;
108 
109 	while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
110 		kmem_free(zn, sizeof (zil_dva_node_t));
111 
112 	avl_destroy(t);
113 }
114 
115 static int
116 zil_dva_tree_add(avl_tree_t *t, dva_t *dva)
117 {
118 	zil_dva_node_t *zn;
119 	avl_index_t where;
120 
121 	if (avl_find(t, dva, &where) != NULL)
122 		return (EEXIST);
123 
124 	zn = kmem_alloc(sizeof (zil_dva_node_t), KM_SLEEP);
125 	zn->zn_dva = *dva;
126 	avl_insert(t, zn, where);
127 
128 	return (0);
129 }
130 
131 /*
132  * Read a log block, make sure it's valid, and byteswap it if necessary.
133  */
134 static int
135 zil_read_log_block(zilog_t *zilog, blkptr_t *bp, char *buf)
136 {
137 	uint64_t blksz = BP_GET_LSIZE(bp);
138 	zil_trailer_t *ztp = (zil_trailer_t *)(buf + blksz) - 1;
139 	zio_cksum_t cksum;
140 	int error;
141 
142 	error = zio_wait(zio_read(NULL, zilog->zl_spa, bp, buf, blksz,
143 	    NULL, NULL, ZIO_PRIORITY_SYNC_READ,
144 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE));
145 	if (error) {
146 		dprintf_bp(bp, "zilog %p bp %p read failed, error %d: ",
147 		    zilog, bp, error);
148 		return (error);
149 	}
150 
151 	if (BP_SHOULD_BYTESWAP(bp))
152 		byteswap_uint64_array(buf, blksz);
153 
154 	/*
155 	 * Sequence numbers should be... sequential.  The checksum verifier for
156 	 * the next block should be: <logid[0], logid[1], objset id, seq + 1>.
157 	 */
158 	cksum = bp->blk_cksum;
159 	cksum.zc_word[3]++;
160 	if (bcmp(&cksum, &ztp->zit_next_blk.blk_cksum, sizeof (cksum)) != 0) {
161 		dprintf_bp(bp, "zilog %p bp %p stale pointer: ", zilog, bp);
162 		return (ESTALE);
163 	}
164 
165 	if (BP_IS_HOLE(&ztp->zit_next_blk)) {
166 		dprintf_bp(bp, "zilog %p bp %p hole: ", zilog, bp);
167 		return (ENOENT);
168 	}
169 
170 	if (ztp->zit_nused > (blksz - sizeof (zil_trailer_t))) {
171 		dprintf("zilog %p bp %p nused exceeds blksz\n", zilog, bp);
172 		return (EOVERFLOW);
173 	}
174 
175 	dprintf_bp(bp, "zilog %p bp %p good block: ", zilog, bp);
176 
177 	return (0);
178 }
179 
180 /*
181  * Parse the intent log, and call parse_func for each valid record within.
182  */
183 void
184 zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
185     zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
186 {
187 	blkptr_t blk;
188 	char *lrbuf, *lrp;
189 	zil_trailer_t *ztp;
190 	int reclen, error;
191 
192 	blk = zilog->zl_header->zh_log;
193 	if (BP_IS_HOLE(&blk))
194 		return;
195 
196 	/*
197 	 * Starting at the block pointed to by zh_log we read the log chain.
198 	 * For each block in the chain we strongly check that block to
199 	 * ensure its validity.  We stop when an invalid block is found.
200 	 * For each block pointer in the chain we call parse_blk_func().
201 	 * For each record in each valid block we call parse_lr_func().
202 	 */
203 	zil_dva_tree_init(&zilog->zl_dva_tree);
204 	lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
205 	for (;;) {
206 		error = zil_read_log_block(zilog, &blk, lrbuf);
207 
208 		if (parse_blk_func != NULL)
209 			parse_blk_func(zilog, &blk, arg, txg);
210 
211 		if (error)
212 			break;
213 
214 		ztp = (zil_trailer_t *)(lrbuf + BP_GET_LSIZE(&blk)) - 1;
215 		blk = ztp->zit_next_blk;
216 
217 		if (parse_lr_func == NULL)
218 			continue;
219 
220 		for (lrp = lrbuf; lrp < lrbuf + ztp->zit_nused; lrp += reclen) {
221 			lr_t *lr = (lr_t *)lrp;
222 			reclen = lr->lrc_reclen;
223 			ASSERT3U(reclen, >=, sizeof (lr_t));
224 			parse_lr_func(zilog, lr, arg, txg);
225 		}
226 	}
227 	zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
228 	zil_dva_tree_fini(&zilog->zl_dva_tree);
229 }
230 
231 /* ARGSUSED */
232 static void
233 zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
234 {
235 	spa_t *spa = zilog->zl_spa;
236 	int err;
237 
238 	dprintf_bp(bp, "first_txg %llu: ", first_txg);
239 
240 	/*
241 	 * Claim log block if not already committed and not already claimed.
242 	 */
243 	if (bp->blk_birth >= first_txg &&
244 	    zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp)) == 0) {
245 		err = zio_wait(zio_claim(NULL, spa, first_txg, bp, NULL, NULL));
246 		ASSERT(err == 0);
247 	}
248 }
249 
250 static void
251 zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
252 {
253 	if (lrc->lrc_txtype == TX_WRITE) {
254 		lr_write_t *lr = (lr_write_t *)lrc;
255 		zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg);
256 	}
257 }
258 
259 /* ARGSUSED */
260 static void
261 zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
262 {
263 	zio_free_blk(zilog->zl_spa, bp, dmu_tx_get_txg(tx));
264 }
265 
266 static void
267 zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
268 {
269 	/*
270 	 * If we previously claimed it, we need to free it.
271 	 */
272 	if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE) {
273 		lr_write_t *lr = (lr_write_t *)lrc;
274 		blkptr_t *bp = &lr->lr_blkptr;
275 		if (bp->blk_birth >= claim_txg &&
276 		    !zil_dva_tree_add(&zilog->zl_dva_tree, BP_IDENTITY(bp))) {
277 			(void) arc_free(NULL, zilog->zl_spa,
278 			    dmu_tx_get_txg(tx), bp, NULL, NULL, ARC_WAIT);
279 		}
280 	}
281 }
282 
283 /*
284  * Create an on-disk intent log.
285  */
286 static void
287 zil_create(zilog_t *zilog)
288 {
289 	lwb_t *lwb;
290 	uint64_t txg;
291 	dmu_tx_t *tx;
292 	blkptr_t blk;
293 	int error;
294 	int no_blk;
295 
296 	ASSERT(zilog->zl_header->zh_claim_txg == 0);
297 	ASSERT(zilog->zl_header->zh_replay_seq == 0);
298 
299 	/*
300 	 * Initialize the log header block.
301 	 */
302 	tx = dmu_tx_create(zilog->zl_os);
303 	(void) dmu_tx_assign(tx, TXG_WAIT);
304 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
305 	txg = dmu_tx_get_txg(tx);
306 
307 	/*
308 	 * If we don't have a log block already then
309 	 * allocate the first log block and assign its checksum verifier.
310 	 */
311 	no_blk = BP_IS_HOLE(&zilog->zl_header->zh_log);
312 	if (no_blk) {
313 		error = zio_alloc_blk(zilog->zl_spa, ZIO_CHECKSUM_ZILOG,
314 		    ZIL_MIN_BLKSZ, &blk, txg);
315 	} else {
316 		blk = zilog->zl_header->zh_log;
317 		error = 0;
318 	}
319 	if (error == 0) {
320 		ZIO_SET_CHECKSUM(&blk.blk_cksum,
321 		    spa_get_random(-1ULL), spa_get_random(-1ULL),
322 		    dmu_objset_id(zilog->zl_os), 1ULL);
323 
324 		/*
325 		 * Allocate a log write buffer (lwb) for the first log block.
326 		 */
327 		lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
328 		lwb->lwb_zilog = zilog;
329 		lwb->lwb_blk = blk;
330 		lwb->lwb_nused = 0;
331 		lwb->lwb_sz = BP_GET_LSIZE(&lwb->lwb_blk);
332 		lwb->lwb_buf = zio_buf_alloc(lwb->lwb_sz);
333 		lwb->lwb_max_txg = txg;
334 		lwb->lwb_seq = 0;
335 		lwb->lwb_state = UNWRITTEN;
336 		mutex_enter(&zilog->zl_lock);
337 		list_insert_tail(&zilog->zl_lwb_list, lwb);
338 		mutex_exit(&zilog->zl_lock);
339 	}
340 
341 	dmu_tx_commit(tx);
342 	if (no_blk)
343 		txg_wait_synced(zilog->zl_dmu_pool, txg);
344 }
345 
346 /*
347  * In one tx, free all log blocks and clear the log header.
348  */
349 void
350 zil_destroy(zilog_t *zilog)
351 {
352 	dmu_tx_t *tx;
353 	uint64_t txg;
354 
355 	mutex_enter(&zilog->zl_destroy_lock);
356 
357 	if (BP_IS_HOLE(&zilog->zl_header->zh_log)) {
358 		mutex_exit(&zilog->zl_destroy_lock);
359 		return;
360 	}
361 
362 	tx = dmu_tx_create(zilog->zl_os);
363 	(void) dmu_tx_assign(tx, TXG_WAIT);
364 	dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
365 	txg = dmu_tx_get_txg(tx);
366 
367 	zil_parse(zilog, zil_free_log_block, zil_free_log_record, tx,
368 	    zilog->zl_header->zh_claim_txg);
369 	/*
370 	 * zil_sync clears the zil header as soon as the zl_destroy_txg commits
371 	 */
372 	zilog->zl_destroy_txg = txg;
373 
374 	dmu_tx_commit(tx);
375 	txg_wait_synced(zilog->zl_dmu_pool, txg);
376 
377 	mutex_exit(&zilog->zl_destroy_lock);
378 }
379 
380 void
381 zil_claim(char *osname, void *txarg)
382 {
383 	dmu_tx_t *tx = txarg;
384 	uint64_t first_txg = dmu_tx_get_txg(tx);
385 	zilog_t *zilog;
386 	zil_header_t *zh;
387 	objset_t *os;
388 	int error;
389 
390 	error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_STANDARD, &os);
391 	if (error) {
392 		cmn_err(CE_WARN, "can't process intent log for %s", osname);
393 		return;
394 	}
395 
396 	zilog = dmu_objset_zil(os);
397 	zh = zilog->zl_header;
398 
399 	/*
400 	 * Claim all log blocks if we haven't already done so.
401 	 */
402 	ASSERT3U(zh->zh_claim_txg, <=, first_txg);
403 	if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
404 		zh->zh_claim_txg = first_txg;
405 		zil_parse(zilog, zil_claim_log_block, zil_claim_log_record,
406 		    tx, first_txg);
407 		dsl_dataset_dirty(dmu_objset_ds(os), tx);
408 	}
409 	ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
410 	dmu_objset_close(os);
411 }
412 
413 void
414 zil_add_vdev(zilog_t *zilog, uint64_t vdev, uint64_t seq)
415 {
416 	zil_vdev_t *zv;
417 
418 	if (zil_noflush)
419 		return;
420 
421 	ASSERT(MUTEX_HELD(&zilog->zl_lock));
422 	zv = kmem_alloc(sizeof (zil_vdev_t), KM_SLEEP);
423 	zv->vdev = vdev;
424 	zv->seq = seq;
425 	list_insert_tail(&zilog->zl_vdev_list, zv);
426 }
427 
428 void
429 zil_flush_vdevs(zilog_t *zilog, uint64_t seq)
430 {
431 	vdev_t *vd;
432 	zil_vdev_t *zv, *zv2;
433 	zio_t *zio;
434 	spa_t *spa;
435 	uint64_t vdev;
436 
437 	if (zil_noflush)
438 		return;
439 
440 	ASSERT(MUTEX_HELD(&zilog->zl_lock));
441 
442 	spa = zilog->zl_spa;
443 	zio = NULL;
444 
445 	while ((zv = list_head(&zilog->zl_vdev_list)) != NULL &&
446 	    zv->seq <= seq) {
447 		vdev = zv->vdev;
448 		list_remove(&zilog->zl_vdev_list, zv);
449 		kmem_free(zv, sizeof (zil_vdev_t));
450 
451 		/*
452 		 * remove all chained entries <= seq with same vdev
453 		 */
454 		zv = list_head(&zilog->zl_vdev_list);
455 		while (zv && zv->seq <= seq) {
456 			zv2 = list_next(&zilog->zl_vdev_list, zv);
457 			if (zv->vdev == vdev) {
458 				list_remove(&zilog->zl_vdev_list, zv);
459 				kmem_free(zv, sizeof (zil_vdev_t));
460 			}
461 			zv = zv2;
462 		}
463 
464 		/* flush the write cache for this vdev */
465 		mutex_exit(&zilog->zl_lock);
466 		if (zio == NULL)
467 			zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
468 		vd = vdev_lookup_top(spa, vdev);
469 		ASSERT(vd);
470 		(void) zio_nowait(zio_ioctl(zio, spa, vd, DKIOCFLUSHWRITECACHE,
471 		    NULL, NULL, ZIO_PRIORITY_NOW,
472 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY));
473 		mutex_enter(&zilog->zl_lock);
474 	}
475 
476 	/*
477 	 * Wait for all the flushes to complete.  Not all devices actually
478 	 * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
479 	 */
480 	if (zio != NULL) {
481 		mutex_exit(&zilog->zl_lock);
482 		(void) zio_wait(zio);
483 		mutex_enter(&zilog->zl_lock);
484 	}
485 }
486 
487 /*
488  * Function called when a log block write completes
489  */
490 static void
491 zil_lwb_write_done(zio_t *zio)
492 {
493 	lwb_t *prev;
494 	lwb_t *lwb = zio->io_private;
495 	zilog_t *zilog = lwb->lwb_zilog;
496 	uint64_t max_seq;
497 
498 	/*
499 	 * Now that we've written this log block, we have a stable pointer
500 	 * to the next block in the chain, so it's OK to let the txg in
501 	 * which we allocated the next block sync.
502 	 */
503 	txg_rele_to_sync(&lwb->lwb_txgh);
504 
505 	zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
506 	mutex_enter(&zilog->zl_lock);
507 	lwb->lwb_buf = NULL;
508 	if (zio->io_error) {
509 		zilog->zl_log_error = B_TRUE;
510 		mutex_exit(&zilog->zl_lock);
511 		cv_broadcast(&zilog->zl_cv_seq);
512 		return;
513 	}
514 
515 	prev = list_prev(&zilog->zl_lwb_list, lwb);
516 	if (prev && prev->lwb_state != SEQ_COMPLETE) {
517 		/* There's an unwritten buffer in the chain before this one */
518 		lwb->lwb_state = SEQ_INCOMPLETE;
519 		mutex_exit(&zilog->zl_lock);
520 		return;
521 	}
522 
523 	max_seq = lwb->lwb_seq;
524 	lwb->lwb_state = SEQ_COMPLETE;
525 	/*
526 	 * We must also follow up the chain for already written buffers
527 	 * to see if we can set zl_ss_seq even higher.
528 	 */
529 	while (lwb = list_next(&zilog->zl_lwb_list, lwb)) {
530 		if (lwb->lwb_state != SEQ_INCOMPLETE)
531 			break;
532 		lwb->lwb_state = SEQ_COMPLETE;
533 		/* lwb_seq will be zero if we've written an empty buffer */
534 		if (lwb->lwb_seq) {
535 			ASSERT3U(max_seq, <, lwb->lwb_seq);
536 			max_seq = lwb->lwb_seq;
537 		}
538 	}
539 	zilog->zl_ss_seq = MAX(max_seq, zilog->zl_ss_seq);
540 	mutex_exit(&zilog->zl_lock);
541 	cv_broadcast(&zilog->zl_cv_seq);
542 }
543 
544 /*
545  * Start a log block write and advance to the next log block.
546  * Calls are serialized.
547  */
548 static lwb_t *
549 zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
550 {
551 	lwb_t *nlwb;
552 	zil_trailer_t *ztp = (zil_trailer_t *)(lwb->lwb_buf + lwb->lwb_sz) - 1;
553 	uint64_t txg;
554 	uint64_t zil_blksz;
555 	int error;
556 
557 	ASSERT(lwb->lwb_nused <= ZIL_BLK_DATA_SZ(lwb));
558 
559 	/*
560 	 * Allocate the next block and save its address in this block
561 	 * before writing it in order to establish the log chain.
562 	 * Note that if the allocation of nlwb synced before we wrote
563 	 * the block that points at it (lwb), we'd leak it if we crashed.
564 	 * Therefore, we don't do txg_rele_to_sync() until zil_lwb_write_done().
565 	 */
566 	txg = txg_hold_open(zilog->zl_dmu_pool, &lwb->lwb_txgh);
567 	txg_rele_to_quiesce(&lwb->lwb_txgh);
568 
569 	/*
570 	 * Pick a ZIL blocksize. We request a size that is the
571 	 * maximum of the previous used size, the current used size and
572 	 * the amount waiting in the queue.
573 	 */
574 	zil_blksz = MAX(zilog->zl_cur_used, zilog->zl_prev_used);
575 	zil_blksz = MAX(zil_blksz, zilog->zl_itx_list_sz + sizeof (*ztp));
576 	zil_blksz = P2ROUNDUP(zil_blksz, ZIL_MIN_BLKSZ);
577 	if (zil_blksz > ZIL_MAX_BLKSZ)
578 		zil_blksz = ZIL_MAX_BLKSZ;
579 
580 	error = zio_alloc_blk(zilog->zl_spa, ZIO_CHECKSUM_ZILOG,
581 	    zil_blksz, &ztp->zit_next_blk, txg);
582 	if (error) {
583 		txg_rele_to_sync(&lwb->lwb_txgh);
584 		return (NULL);
585 	}
586 
587 	ASSERT3U(ztp->zit_next_blk.blk_birth, ==, txg);
588 	ztp->zit_nused = lwb->lwb_nused;
589 	ztp->zit_bt.zbt_cksum = lwb->lwb_blk.blk_cksum;
590 	ztp->zit_next_blk.blk_cksum = lwb->lwb_blk.blk_cksum;
591 	ztp->zit_next_blk.blk_cksum.zc_word[3]++;
592 
593 	/*
594 	 * Allocate a new log write buffer (lwb).
595 	 */
596 	nlwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
597 
598 	nlwb->lwb_zilog = zilog;
599 	nlwb->lwb_blk = ztp->zit_next_blk;
600 	nlwb->lwb_nused = 0;
601 	nlwb->lwb_sz = BP_GET_LSIZE(&nlwb->lwb_blk);
602 	nlwb->lwb_buf = zio_buf_alloc(nlwb->lwb_sz);
603 	nlwb->lwb_max_txg = txg;
604 	nlwb->lwb_seq = 0;
605 	nlwb->lwb_state = UNWRITTEN;
606 
607 	/*
608 	 * Put new lwb at the end of the log chain,
609 	 * and record the vdev for later flushing
610 	 */
611 	mutex_enter(&zilog->zl_lock);
612 	list_insert_tail(&zilog->zl_lwb_list, nlwb);
613 	zil_add_vdev(zilog, DVA_GET_VDEV(BP_IDENTITY(&(lwb->lwb_blk))),
614 	    lwb->lwb_seq);
615 	mutex_exit(&zilog->zl_lock);
616 
617 	/*
618 	 * write the old log block
619 	 */
620 	dprintf_bp(&lwb->lwb_blk, "lwb %p txg %llu: ", lwb, txg);
621 	zio_nowait(zio_rewrite(NULL, zilog->zl_spa, ZIO_CHECKSUM_ZILOG, 0,
622 	    &lwb->lwb_blk, lwb->lwb_buf, lwb->lwb_sz, zil_lwb_write_done, lwb,
623 	    ZIO_PRIORITY_LOG_WRITE, ZIO_FLAG_MUSTSUCCEED));
624 
625 	return (nlwb);
626 }
627 
628 static lwb_t *
629 zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
630 {
631 	lr_t *lrc = &itx->itx_lr; /* common log record */
632 	uint64_t seq = lrc->lrc_seq;
633 	uint64_t txg = lrc->lrc_txg;
634 	uint64_t reclen = lrc->lrc_reclen;
635 	int error;
636 
637 	if (lwb == NULL)
638 		return (NULL);
639 	ASSERT(lwb->lwb_buf != NULL);
640 
641 	/*
642 	 * If it's a write, fetch the data or get its blkptr as appropriate.
643 	 */
644 	if (lrc->lrc_txtype == TX_WRITE) {
645 		lr_write_t *lr = (lr_write_t *)lrc;
646 		if (txg > spa_freeze_txg(zilog->zl_spa))
647 			txg_wait_synced(zilog->zl_dmu_pool, txg);
648 
649 		if (!itx->itx_data_copied &&
650 		    (error = zilog->zl_get_data(itx->itx_private, lr)) != 0) {
651 			if (error != ENOENT && error != EALREADY) {
652 				txg_wait_synced(zilog->zl_dmu_pool, txg);
653 				mutex_enter(&zilog->zl_lock);
654 				zilog->zl_ss_seq = MAX(seq, zilog->zl_ss_seq);
655 				zil_add_vdev(zilog,
656 				    DVA_GET_VDEV(BP_IDENTITY(&(lr->lr_blkptr))),
657 				    seq);
658 				mutex_exit(&zilog->zl_lock);
659 				return (lwb);
660 			}
661 			mutex_enter(&zilog->zl_lock);
662 			zil_add_vdev(zilog,
663 			    DVA_GET_VDEV(BP_IDENTITY(&(lr->lr_blkptr))), seq);
664 			mutex_exit(&zilog->zl_lock);
665 			return (lwb);
666 		}
667 	}
668 
669 	zilog->zl_cur_used += reclen;
670 
671 	/*
672 	 * If this record won't fit in the current log block, start a new one.
673 	 */
674 	if (lwb->lwb_nused + reclen > ZIL_BLK_DATA_SZ(lwb)) {
675 		lwb = zil_lwb_write_start(zilog, lwb);
676 		if (lwb == NULL)
677 			return (NULL);
678 		if (lwb->lwb_nused + reclen > ZIL_BLK_DATA_SZ(lwb)) {
679 			txg_wait_synced(zilog->zl_dmu_pool, txg);
680 			mutex_enter(&zilog->zl_lock);
681 			zilog->zl_ss_seq = MAX(seq, zilog->zl_ss_seq);
682 			mutex_exit(&zilog->zl_lock);
683 			return (lwb);
684 		}
685 	}
686 
687 	bcopy(lrc, lwb->lwb_buf + lwb->lwb_nused, reclen);
688 	lwb->lwb_nused += reclen;
689 	lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
690 	ASSERT3U(lwb->lwb_seq, <, seq);
691 	lwb->lwb_seq = seq;
692 	ASSERT3U(lwb->lwb_nused, <=, ZIL_BLK_DATA_SZ(lwb));
693 	ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
694 
695 	return (lwb);
696 }
697 
698 itx_t *
699 zil_itx_create(int txtype, size_t lrsize)
700 {
701 	itx_t *itx;
702 
703 	lrsize = P2ROUNDUP(lrsize, sizeof (uint64_t));
704 
705 	itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
706 	itx->itx_lr.lrc_txtype = txtype;
707 	itx->itx_lr.lrc_reclen = lrsize;
708 	itx->itx_lr.lrc_seq = 0;	/* defensive */
709 
710 	return (itx);
711 }
712 
713 uint64_t
714 zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
715 {
716 	uint64_t seq;
717 
718 	ASSERT(itx->itx_lr.lrc_seq == 0);
719 
720 	mutex_enter(&zilog->zl_lock);
721 	list_insert_tail(&zilog->zl_itx_list, itx);
722 	zilog->zl_itx_list_sz += itx->itx_lr.lrc_reclen;
723 	itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
724 	itx->itx_lr.lrc_seq = seq = ++zilog->zl_itx_seq;
725 	mutex_exit(&zilog->zl_lock);
726 
727 	return (seq);
728 }
729 
730 /*
731  * Free up all in-memory intent log transactions that have now been synced.
732  */
733 static void
734 zil_itx_clean(zilog_t *zilog)
735 {
736 	uint64_t synced_txg = spa_last_synced_txg(zilog->zl_spa);
737 	uint64_t freeze_txg = spa_freeze_txg(zilog->zl_spa);
738 	uint64_t max_seq = 0;
739 	itx_t *itx;
740 
741 	mutex_enter(&zilog->zl_lock);
742 	while ((itx = list_head(&zilog->zl_itx_list)) != NULL &&
743 	    itx->itx_lr.lrc_txg <= MIN(synced_txg, freeze_txg)) {
744 		list_remove(&zilog->zl_itx_list, itx);
745 		zilog->zl_itx_list_sz -= itx->itx_lr.lrc_reclen;
746 		ASSERT3U(max_seq, <, itx->itx_lr.lrc_seq);
747 		max_seq = itx->itx_lr.lrc_seq;
748 		kmem_free(itx, offsetof(itx_t, itx_lr)
749 		    + itx->itx_lr.lrc_reclen);
750 	}
751 	if (max_seq > zilog->zl_ss_seq) {
752 		zilog->zl_ss_seq = max_seq;
753 		cv_broadcast(&zilog->zl_cv_seq);
754 	}
755 	mutex_exit(&zilog->zl_lock);
756 }
757 
758 void
759 zil_clean(zilog_t *zilog)
760 {
761 	/*
762 	 * Check for any log blocks that can be freed.
763 	 * Log blocks are only freed when the log block allocation and
764 	 * log records contained within are both known to be committed.
765 	 */
766 	mutex_enter(&zilog->zl_lock);
767 	if (list_head(&zilog->zl_itx_list) != NULL)
768 		(void) taskq_dispatch(zilog->zl_clean_taskq,
769 		    (void (*)(void *))zil_itx_clean, zilog, TQ_NOSLEEP);
770 	mutex_exit(&zilog->zl_lock);
771 }
772 
773 /*
774  * Push zfs transactions to stable storage up to the supplied sequence number.
775  */
776 void
777 zil_commit(zilog_t *zilog, uint64_t seq, int ioflag)
778 {
779 	uint64_t txg;
780 	uint64_t max_seq;
781 	uint64_t reclen;
782 	itx_t *itx;
783 	lwb_t *lwb;
784 	spa_t *spa;
785 
786 	if (zilog == NULL || seq == 0 ||
787 	    ((ioflag & (FSYNC | FDSYNC | FRSYNC)) == 0 && !zil_always))
788 		return;
789 
790 	spa = zilog->zl_spa;
791 	mutex_enter(&zilog->zl_lock);
792 
793 	seq = MIN(seq, zilog->zl_itx_seq);	/* cap seq at largest itx seq */
794 
795 	for (;;) {
796 		if (zilog->zl_ss_seq >= seq) {	/* already on stable storage */
797 			cv_signal(&zilog->zl_cv_write);
798 			mutex_exit(&zilog->zl_lock);
799 			return;
800 		}
801 
802 		if (zilog->zl_writer == B_FALSE) /* no one writing, do it */
803 			break;
804 
805 		cv_wait(&zilog->zl_cv_write, &zilog->zl_lock);
806 	}
807 
808 	zilog->zl_writer = B_TRUE;
809 	max_seq = 0;
810 
811 	if (zilog->zl_suspend) {
812 		lwb = NULL;
813 	} else {
814 		lwb = list_tail(&zilog->zl_lwb_list);
815 		if (lwb == NULL) {
816 			mutex_exit(&zilog->zl_lock);
817 			zil_create(zilog);
818 			mutex_enter(&zilog->zl_lock);
819 			lwb = list_tail(&zilog->zl_lwb_list);
820 		}
821 	}
822 
823 	/*
824 	 * Loop through in-memory log transactions filling log blocks,
825 	 * until we reach the given sequence number and there's no more
826 	 * room in the write buffer.
827 	 */
828 	for (;;) {
829 		itx = list_head(&zilog->zl_itx_list);
830 		if (itx == NULL)
831 			break;
832 
833 		reclen = itx->itx_lr.lrc_reclen;
834 		if ((itx->itx_lr.lrc_seq > seq) &&
835 		    ((lwb == NULL) || (lwb->lwb_nused + reclen >
836 		    ZIL_BLK_DATA_SZ(lwb))))
837 			break;
838 
839 		list_remove(&zilog->zl_itx_list, itx);
840 		txg = itx->itx_lr.lrc_txg;
841 		ASSERT(txg);
842 
843 		mutex_exit(&zilog->zl_lock);
844 		if (txg > spa_last_synced_txg(spa) ||
845 		    txg > spa_freeze_txg(spa))
846 			lwb = zil_lwb_commit(zilog, itx, lwb);
847 		else
848 			max_seq = itx->itx_lr.lrc_seq;
849 		kmem_free(itx, offsetof(itx_t, itx_lr)
850 		    + itx->itx_lr.lrc_reclen);
851 		mutex_enter(&zilog->zl_lock);
852 		zilog->zl_itx_list_sz -= reclen;
853 	}
854 
855 	mutex_exit(&zilog->zl_lock);
856 
857 	/* write the last block out */
858 	if (lwb != NULL && lwb->lwb_nused != 0)
859 		lwb = zil_lwb_write_start(zilog, lwb);
860 
861 	zilog->zl_prev_used = zilog->zl_cur_used;
862 	zilog->zl_cur_used = 0;
863 
864 	mutex_enter(&zilog->zl_lock);
865 	if (max_seq > zilog->zl_ss_seq) {
866 		zilog->zl_ss_seq = max_seq;
867 		cv_broadcast(&zilog->zl_cv_seq);
868 	}
869 	/*
870 	 * Wait if necessary for our seq to be committed.
871 	 */
872 	if (lwb) {
873 		while (zilog->zl_ss_seq < seq && zilog->zl_log_error == 0)
874 			cv_wait(&zilog->zl_cv_seq, &zilog->zl_lock);
875 		zil_flush_vdevs(zilog, seq);
876 	}
877 
878 	if (zilog->zl_log_error || lwb == NULL) {
879 		zilog->zl_log_error = 0;
880 		max_seq = zilog->zl_itx_seq;
881 		mutex_exit(&zilog->zl_lock);
882 		txg_wait_synced(zilog->zl_dmu_pool, 0);
883 		mutex_enter(&zilog->zl_lock);
884 		zilog->zl_ss_seq = MAX(max_seq, zilog->zl_ss_seq);
885 		cv_broadcast(&zilog->zl_cv_seq);
886 	}
887 	/* wake up others waiting to start a write */
888 	zilog->zl_writer = B_FALSE;
889 	mutex_exit(&zilog->zl_lock);
890 	cv_signal(&zilog->zl_cv_write);
891 }
892 
893 /*
894  * Called in syncing context to free committed log blocks and update log header.
895  */
896 void
897 zil_sync(zilog_t *zilog, dmu_tx_t *tx)
898 {
899 	uint64_t txg = dmu_tx_get_txg(tx);
900 	spa_t *spa = zilog->zl_spa;
901 	lwb_t *lwb;
902 
903 	ASSERT(zilog->zl_stop_sync == 0);
904 
905 	zilog->zl_header->zh_replay_seq = zilog->zl_replay_seq[txg & TXG_MASK];
906 
907 	if (zilog->zl_destroy_txg == txg) {
908 		bzero(zilog->zl_header, sizeof (zil_header_t));
909 		bzero(zilog->zl_replay_seq, sizeof (zilog->zl_replay_seq));
910 		zilog->zl_destroy_txg = 0;
911 	}
912 
913 	mutex_enter(&zilog->zl_lock);
914 	for (;;) {
915 		lwb = list_head(&zilog->zl_lwb_list);
916 		if (lwb == NULL) {
917 			mutex_exit(&zilog->zl_lock);
918 			return;
919 		}
920 		if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
921 			break;
922 		list_remove(&zilog->zl_lwb_list, lwb);
923 		zio_free_blk(spa, &lwb->lwb_blk, txg);
924 		kmem_cache_free(zil_lwb_cache, lwb);
925 	}
926 	zilog->zl_header->zh_log = lwb->lwb_blk;
927 	mutex_exit(&zilog->zl_lock);
928 }
929 
930 void
931 zil_init(void)
932 {
933 	zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
934 	    sizeof (struct lwb), NULL, NULL, NULL, NULL, NULL, NULL, 0);
935 }
936 
937 void
938 zil_fini(void)
939 {
940 	kmem_cache_destroy(zil_lwb_cache);
941 }
942 
943 zilog_t *
944 zil_alloc(objset_t *os, zil_header_t *zh_phys)
945 {
946 	zilog_t *zilog;
947 
948 	zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
949 
950 	zilog->zl_header = zh_phys;
951 	zilog->zl_os = os;
952 	zilog->zl_spa = dmu_objset_spa(os);
953 	zilog->zl_dmu_pool = dmu_objset_pool(os);
954 
955 	list_create(&zilog->zl_itx_list, sizeof (itx_t),
956 	    offsetof(itx_t, itx_node));
957 
958 	list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
959 	    offsetof(lwb_t, lwb_node));
960 
961 	list_create(&zilog->zl_vdev_list, sizeof (zil_vdev_t),
962 	    offsetof(zil_vdev_t, vdev_seq_node));
963 
964 	return (zilog);
965 }
966 
967 void
968 zil_free(zilog_t *zilog)
969 {
970 	lwb_t *lwb;
971 	zil_vdev_t *zv;
972 
973 	zilog->zl_stop_sync = 1;
974 
975 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
976 		list_remove(&zilog->zl_lwb_list, lwb);
977 		if (lwb->lwb_buf != NULL)
978 			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
979 		kmem_cache_free(zil_lwb_cache, lwb);
980 	}
981 	list_destroy(&zilog->zl_lwb_list);
982 
983 	while ((zv = list_head(&zilog->zl_vdev_list)) != NULL) {
984 		list_remove(&zilog->zl_vdev_list, zv);
985 		kmem_free(zv, sizeof (zil_vdev_t));
986 	}
987 	list_destroy(&zilog->zl_vdev_list);
988 
989 	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
990 	list_destroy(&zilog->zl_itx_list);
991 
992 	kmem_free(zilog, sizeof (zilog_t));
993 }
994 
995 /*
996  * return true if there is a valid initial zil log block
997  */
998 static int
999 zil_empty(zilog_t *zilog)
1000 {
1001 	blkptr_t blk;
1002 	char *lrbuf;
1003 	int error;
1004 
1005 	blk = zilog->zl_header->zh_log;
1006 	if (BP_IS_HOLE(&blk))
1007 		return (1);
1008 
1009 	lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
1010 	error = zil_read_log_block(zilog, &blk, lrbuf);
1011 	zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
1012 	return (error ? 1 : 0);
1013 }
1014 
1015 /*
1016  * Open an intent log.
1017  */
1018 zilog_t *
1019 zil_open(objset_t *os, zil_get_data_t *get_data)
1020 {
1021 	zilog_t *zilog = dmu_objset_zil(os);
1022 
1023 	zilog->zl_get_data = get_data;
1024 	zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
1025 	    2, 2, TASKQ_PREPOPULATE);
1026 
1027 	return (zilog);
1028 }
1029 
1030 /*
1031  * Close an intent log.
1032  */
1033 void
1034 zil_close(zilog_t *zilog)
1035 {
1036 	if (!zil_empty(zilog))
1037 		txg_wait_synced(zilog->zl_dmu_pool, 0);
1038 	taskq_destroy(zilog->zl_clean_taskq);
1039 	zilog->zl_clean_taskq = NULL;
1040 	zilog->zl_get_data = NULL;
1041 
1042 	zil_itx_clean(zilog);
1043 	ASSERT(list_head(&zilog->zl_itx_list) == NULL);
1044 }
1045 
1046 /*
1047  * Suspend an intent log.  While in suspended mode, we still honor
1048  * synchronous semantics, but we rely on txg_wait_synced() to do it.
1049  * We suspend the log briefly when taking a snapshot so that the snapshot
1050  * contains all the data it's supposed to, and has an empty intent log.
1051  */
1052 int
1053 zil_suspend(zilog_t *zilog)
1054 {
1055 	lwb_t *lwb;
1056 
1057 	mutex_enter(&zilog->zl_lock);
1058 	if (zilog->zl_header->zh_claim_txg != 0) {	/* unplayed log */
1059 		mutex_exit(&zilog->zl_lock);
1060 		return (EBUSY);
1061 	}
1062 	zilog->zl_suspend++;
1063 	mutex_exit(&zilog->zl_lock);
1064 
1065 	zil_commit(zilog, UINT64_MAX, FSYNC);
1066 
1067 	mutex_enter(&zilog->zl_lock);
1068 	while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1069 		if (lwb->lwb_buf != NULL) {
1070 			/*
1071 			 * Wait for the buffer if it's in the process of
1072 			 * being written.
1073 			 */
1074 			if ((lwb->lwb_seq != 0) &&
1075 			    (lwb->lwb_state != SEQ_COMPLETE)) {
1076 				cv_wait(&zilog->zl_cv_seq, &zilog->zl_lock);
1077 				continue;
1078 			}
1079 			zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1080 		}
1081 		list_remove(&zilog->zl_lwb_list, lwb);
1082 		kmem_cache_free(zil_lwb_cache, lwb);
1083 	}
1084 	mutex_exit(&zilog->zl_lock);
1085 
1086 	zil_destroy(zilog);
1087 
1088 	return (0);
1089 }
1090 
1091 void
1092 zil_resume(zilog_t *zilog)
1093 {
1094 	mutex_enter(&zilog->zl_lock);
1095 	ASSERT(zilog->zl_suspend != 0);
1096 	zilog->zl_suspend--;
1097 	mutex_exit(&zilog->zl_lock);
1098 }
1099 
1100 typedef struct zil_replay_arg {
1101 	objset_t	*zr_os;
1102 	zil_replay_func_t **zr_replay;
1103 	void		*zr_arg;
1104 	void		(*zr_rm_sync)(void *arg);
1105 	uint64_t	*zr_txgp;
1106 	boolean_t	zr_byteswap;
1107 	char		*zr_lrbuf;
1108 } zil_replay_arg_t;
1109 
1110 static void
1111 zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1112 {
1113 	zil_replay_arg_t *zr = zra;
1114 	zil_header_t *zh = zilog->zl_header;
1115 	uint64_t reclen = lr->lrc_reclen;
1116 	uint64_t txtype = lr->lrc_txtype;
1117 	int pass, error;
1118 
1119 	if (zilog->zl_stop_replay)
1120 		return;
1121 
1122 	if (lr->lrc_txg < claim_txg)		/* already committed */
1123 		return;
1124 
1125 	if (lr->lrc_seq <= zh->zh_replay_seq)	/* already replayed */
1126 		return;
1127 
1128 	/*
1129 	 * Make a copy of the data so we can revise and extend it.
1130 	 */
1131 	bcopy(lr, zr->zr_lrbuf, reclen);
1132 
1133 	/*
1134 	 * The log block containing this lr may have been byteswapped
1135 	 * so that we can easily examine common fields like lrc_txtype.
1136 	 * However, the log is a mix of different data types, and only the
1137 	 * replay vectors know how to byteswap their records.  Therefore, if
1138 	 * the lr was byteswapped, undo it before invoking the replay vector.
1139 	 */
1140 	if (zr->zr_byteswap)
1141 		byteswap_uint64_array(zr->zr_lrbuf, reclen);
1142 
1143 	/*
1144 	 * If this is a TX_WRITE with a blkptr, suck in the data.
1145 	 */
1146 	if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1147 		lr_write_t *lrw = (lr_write_t *)lr;
1148 		blkptr_t *wbp = &lrw->lr_blkptr;
1149 		uint64_t wlen = lrw->lr_length;
1150 		char *wbuf = zr->zr_lrbuf + reclen;
1151 
1152 		if (BP_IS_HOLE(wbp)) {	/* compressed to a hole */
1153 			bzero(wbuf, wlen);
1154 		} else {
1155 			/*
1156 			 * A subsequent write may have overwritten this block,
1157 			 * in which case wbp may have been been freed and
1158 			 * reallocated, and our read of wbp may fail with a
1159 			 * checksum error.  We can safely ignore this because
1160 			 * the later write will provide the correct data.
1161 			 */
1162 			(void) zio_wait(zio_read(NULL, zilog->zl_spa,
1163 			    wbp, wbuf, BP_GET_LSIZE(wbp), NULL, NULL,
1164 			    ZIO_PRIORITY_SYNC_READ,
1165 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE));
1166 			(void) memmove(wbuf, wbuf + lrw->lr_blkoff, wlen);
1167 		}
1168 	}
1169 
1170 	/*
1171 	 * We must now do two things atomically: replay this log record,
1172 	 * and update the log header to reflect the fact that we did so.
1173 	 * We use the DMU's ability to assign into a specific txg to do this.
1174 	 */
1175 	for (pass = 1; /* CONSTANTCONDITION */; pass++) {
1176 		uint64_t replay_txg;
1177 		dmu_tx_t *replay_tx;
1178 
1179 		replay_tx = dmu_tx_create(zr->zr_os);
1180 		error = dmu_tx_assign(replay_tx, TXG_WAIT);
1181 		if (error) {
1182 			dmu_tx_abort(replay_tx);
1183 			break;
1184 		}
1185 
1186 		replay_txg = dmu_tx_get_txg(replay_tx);
1187 
1188 		if (txtype == 0 || txtype >= TX_MAX_TYPE) {
1189 			error = EINVAL;
1190 		} else {
1191 			/*
1192 			 * On the first pass, arrange for the replay vector
1193 			 * to fail its dmu_tx_assign().  That's the only way
1194 			 * to ensure that those code paths remain well tested.
1195 			 */
1196 			*zr->zr_txgp = replay_txg - (pass == 1);
1197 			error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lrbuf,
1198 			    zr->zr_byteswap);
1199 			*zr->zr_txgp = TXG_NOWAIT;
1200 		}
1201 
1202 		if (error == 0) {
1203 			dsl_dataset_dirty(dmu_objset_ds(zr->zr_os), replay_tx);
1204 			zilog->zl_replay_seq[replay_txg & TXG_MASK] =
1205 			    lr->lrc_seq;
1206 		}
1207 
1208 		dmu_tx_commit(replay_tx);
1209 
1210 		if (error != ERESTART)
1211 			break;
1212 
1213 		if (pass != 1)
1214 			txg_wait_open(spa_get_dsl(zilog->zl_spa),
1215 			    replay_txg + 1);
1216 
1217 		dprintf("pass %d, retrying\n", pass);
1218 	}
1219 
1220 	if (error) {
1221 		char *name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
1222 		dmu_objset_name(zr->zr_os, name);
1223 		cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1224 		    "dataset %s, seq 0x%llx, txtype %llu\n",
1225 		    error, name,
1226 		    (u_longlong_t)lr->lrc_seq, (u_longlong_t)txtype);
1227 		zilog->zl_stop_replay = 1;
1228 		kmem_free(name, MAXNAMELEN);
1229 	}
1230 
1231 	/*
1232 	 * The DMU's dnode layer doesn't see removes until the txg commits,
1233 	 * so a subsequent claim can spuriously fail with EEXIST.
1234 	 * To prevent this, if we might have removed an object,
1235 	 * wait for the delete thread to delete it, and then
1236 	 * wait for the transaction group to sync.
1237 	 */
1238 	if (txtype == TX_REMOVE || txtype == TX_RMDIR || txtype == TX_RENAME) {
1239 		if (zr->zr_rm_sync != NULL)
1240 			zr->zr_rm_sync(zr->zr_arg);
1241 		txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
1242 	}
1243 }
1244 
1245 /*
1246  * If this dataset has a non-empty intent log, replay it and destroy it.
1247  */
1248 void
1249 zil_replay(objset_t *os, void *arg, uint64_t *txgp,
1250 	zil_replay_func_t *replay_func[TX_MAX_TYPE], void (*rm_sync)(void *arg))
1251 {
1252 	zilog_t *zilog = dmu_objset_zil(os);
1253 		zil_replay_arg_t zr;
1254 
1255 	if (zil_empty(zilog)) {
1256 		/*
1257 		 * Initialise the log header but don't free the log block
1258 		 * which will get reused.
1259 		 */
1260 		zilog->zl_header->zh_claim_txg = 0;
1261 		zilog->zl_header->zh_replay_seq = 0;
1262 		return;
1263 	}
1264 
1265 	zr.zr_os = os;
1266 	zr.zr_replay = replay_func;
1267 	zr.zr_arg = arg;
1268 	zr.zr_rm_sync = rm_sync;
1269 	zr.zr_txgp = txgp;
1270 	zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zilog->zl_header->zh_log);
1271 	zr.zr_lrbuf = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1272 
1273 	/*
1274 	 * Wait for in-progress removes to sync before starting replay.
1275 	 */
1276 	if (rm_sync != NULL)
1277 		rm_sync(arg);
1278 	txg_wait_synced(zilog->zl_dmu_pool, 0);
1279 
1280 	zilog->zl_stop_replay = 0;
1281 	zil_parse(zilog, NULL, zil_replay_log_record, &zr,
1282 	    zilog->zl_header->zh_claim_txg);
1283 	kmem_free(zr.zr_lrbuf, 2 * SPA_MAXBLOCKSIZE);
1284 
1285 	zil_destroy(zilog);
1286 }
1287