xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu_tx.c (revision 65a89a64c60f3061bbe2381edaacc81660af9a95)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <sys/dmu.h>
30 #include <sys/dmu_impl.h>
31 #include <sys/dbuf.h>
32 #include <sys/dmu_tx.h>
33 #include <sys/dmu_objset.h>
34 #include <sys/dsl_dataset.h> /* for dsl_dataset_block_freeable() */
35 #include <sys/dsl_dir.h> /* for dsl_dir_tempreserve_*() */
36 #include <sys/dsl_pool.h>
37 #include <sys/zap_impl.h>	/* for ZAP_BLOCK_SHIFT */
38 #include <sys/spa.h>
39 #include <sys/zfs_context.h>
40 
41 #ifdef ZFS_DEBUG
42 int dmu_use_tx_debug_bufs = 1;
43 #endif
44 
45 dmu_tx_t *
46 dmu_tx_create_ds(dsl_dir_t *dd)
47 {
48 	dmu_tx_t *tx = kmem_zalloc(sizeof (dmu_tx_t), KM_SLEEP);
49 	tx->tx_dir = dd;
50 	if (dd)
51 		tx->tx_pool = dd->dd_pool;
52 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
53 	    offsetof(dmu_tx_hold_t, dth_node));
54 	refcount_create(&tx->tx_space_written);
55 	refcount_create(&tx->tx_space_freed);
56 	return (tx);
57 }
58 
59 dmu_tx_t *
60 dmu_tx_create(objset_t *os)
61 {
62 	dmu_tx_t *tx = dmu_tx_create_ds(os->os->os_dsl_dataset->ds_dir);
63 	tx->tx_objset = os;
64 	return (tx);
65 }
66 
67 dmu_tx_t *
68 dmu_tx_create_assigned(struct dsl_pool *dp, uint64_t txg)
69 {
70 	dmu_tx_t *tx = dmu_tx_create_ds(NULL);
71 
72 	ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
73 	tx->tx_pool = dp;
74 	tx->tx_txg = txg;
75 	tx->tx_anyobj = TRUE;
76 
77 	return (tx);
78 }
79 
80 int
81 dmu_tx_is_syncing(dmu_tx_t *tx)
82 {
83 	return (tx->tx_anyobj);
84 }
85 
86 int
87 dmu_tx_private_ok(dmu_tx_t *tx)
88 {
89 	return (tx->tx_anyobj || tx->tx_privateobj);
90 }
91 
92 static void
93 dmu_tx_hold_object_impl(dmu_tx_t *tx, objset_t *os, uint64_t object,
94     enum dmu_tx_hold_type type, dmu_tx_hold_func_t func,
95     uint64_t arg1, uint64_t arg2)
96 {
97 	dmu_tx_hold_t *dth;
98 	dnode_t *dn = NULL;
99 
100 	if (object != DMU_NEW_OBJECT) {
101 		dn = dnode_hold(os->os, object, tx);
102 
103 		if (tx->tx_txg != 0) {
104 			mutex_enter(&dn->dn_mtx);
105 			/*
106 			 * dn->dn_assigned_txg == tx->tx_txg doesn't pose a
107 			 * problem, but there's no way for it to happen (for
108 			 * now, at least).
109 			 */
110 			ASSERT(dn->dn_assigned_txg == 0);
111 			ASSERT(dn->dn_assigned_tx == NULL);
112 			dn->dn_assigned_txg = tx->tx_txg;
113 			dn->dn_assigned_tx = tx;
114 			(void) refcount_add(&dn->dn_tx_holds, tx);
115 			mutex_exit(&dn->dn_mtx);
116 		}
117 	}
118 
119 	dth = kmem_zalloc(sizeof (dmu_tx_hold_t), KM_SLEEP);
120 	dth->dth_dnode = dn;
121 	dth->dth_type = type;
122 	dth->dth_func = func;
123 	dth->dth_arg1 = arg1;
124 	dth->dth_arg2 = arg2;
125 	/*
126 	 * XXX Investigate using a different data structure to keep
127 	 * track of dnodes in a tx.  Maybe array, since there will
128 	 * generally not be many entries?
129 	 */
130 	list_insert_tail(&tx->tx_holds, dth);
131 }
132 
133 void
134 dmu_tx_add_new_object(dmu_tx_t *tx, objset_t *os, uint64_t object)
135 {
136 	/*
137 	 * If we're syncing, they can manipulate any object anyhow, and
138 	 * the hold on the dnode_t can cause problems.
139 	 */
140 	if (!dmu_tx_is_syncing(tx)) {
141 		dmu_tx_hold_object_impl(tx, os, object, THT_NEWOBJECT,
142 		    NULL, 0, 0);
143 	}
144 }
145 
146 /* ARGSUSED */
147 static void
148 dmu_tx_count_write(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
149 {
150 	uint64_t start, end, space;
151 	int min_bs, max_bs, min_ibs, max_ibs, epbs, bits;
152 
153 	if (len == 0)
154 		return;
155 
156 	min_bs = SPA_MINBLOCKSHIFT;
157 	max_bs = SPA_MAXBLOCKSHIFT;
158 	min_ibs = DN_MIN_INDBLKSHIFT;
159 	max_ibs = DN_MAX_INDBLKSHIFT;
160 
161 	/*
162 	 * If there's more than one block, the blocksize can't change,
163 	 * so we can make a more precise estimate.  Alternatively,
164 	 * if the dnode's ibs is larger than max_ibs, always use that.
165 	 * This ensures that if we reduce DN_MAX_INDBLKSHIFT,
166 	 * the code will still work correctly on existing pools.
167 	 */
168 	if (dn && (dn->dn_maxblkid != 0 || dn->dn_indblkshift > max_ibs)) {
169 		min_ibs = max_ibs = dn->dn_indblkshift;
170 		if (dn->dn_datablkshift != 0)
171 			min_bs = max_bs = dn->dn_datablkshift;
172 	}
173 
174 	/*
175 	 * 'end' is the last thing we will access, not one past.
176 	 * This way we won't overflow when accessing the last byte.
177 	 */
178 	start = P2ALIGN(off, 1ULL << max_bs);
179 	end = P2ROUNDUP(off + len, 1ULL << max_bs) - 1;
180 	space = end - start + 1;
181 
182 	start >>= min_bs;
183 	end >>= min_bs;
184 
185 	epbs = min_ibs - SPA_BLKPTRSHIFT;
186 
187 	/*
188 	 * The object contains at most 2^(64 - min_bs) blocks,
189 	 * and each indirect level maps 2^epbs.
190 	 */
191 	for (bits = 64 - min_bs; bits >= 0; bits -= epbs) {
192 		start >>= epbs;
193 		end >>= epbs;
194 		/*
195 		 * If we increase the number of levels of indirection,
196 		 * we'll need new blkid=0 indirect blocks.  If start == 0,
197 		 * we're already accounting for that blocks; and if end == 0,
198 		 * we can't increase the number of levels beyond that.
199 		 */
200 		if (start != 0 && end != 0)
201 			space += 1ULL << max_ibs;
202 		space += (end - start + 1) << max_ibs;
203 	}
204 
205 	ASSERT(space < 2 * DMU_MAX_ACCESS);
206 
207 	tx->tx_space_towrite += space;
208 }
209 
210 static void
211 dmu_tx_count_dnode(dmu_tx_t *tx, dnode_t *dn)
212 {
213 	dnode_t *mdn = tx->tx_objset->os->os_meta_dnode;
214 	uint64_t object = dn ? dn->dn_object : DN_MAX_OBJECT - 1;
215 	uint64_t pre_write_space;
216 
217 	ASSERT(object < DN_MAX_OBJECT);
218 	pre_write_space = tx->tx_space_towrite;
219 	dmu_tx_count_write(tx, mdn, object << DNODE_SHIFT, 1 << DNODE_SHIFT);
220 	if (dn && dn->dn_dbuf->db_blkptr &&
221 	    dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
222 	    dn->dn_dbuf->db_blkptr->blk_birth, tx)) {
223 		tx->tx_space_tooverwrite +=
224 			tx->tx_space_towrite - pre_write_space;
225 		tx->tx_space_towrite = pre_write_space;
226 	}
227 }
228 
229 /* ARGSUSED */
230 static void
231 dmu_tx_hold_write_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
232 {
233 	dmu_tx_count_write(tx, dn, off, len);
234 	dmu_tx_count_dnode(tx, dn);
235 }
236 
237 void
238 dmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
239 {
240 	ASSERT(tx->tx_txg == 0);
241 	ASSERT(len > 0 && len < DMU_MAX_ACCESS);
242 	ASSERT(UINT64_MAX - off >= len - 1);
243 
244 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_WRITE,
245 	    dmu_tx_hold_write_impl, off, len);
246 }
247 
248 static void
249 dmu_tx_count_free(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
250 {
251 	uint64_t blkid, nblks;
252 	uint64_t space = 0;
253 	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
254 
255 	ASSERT(dn->dn_assigned_tx == tx || dn->dn_assigned_tx == NULL);
256 
257 	if (dn->dn_datablkshift == 0)
258 		return;
259 	/*
260 	 * not that the dnode can change, since it isn't dirty, but
261 	 * dbuf_hold_impl() wants us to have the struct_rwlock.
262 	 * also need it to protect dn_maxblkid.
263 	 */
264 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
265 	blkid = off >> dn->dn_datablkshift;
266 	nblks = (off + len) >> dn->dn_datablkshift;
267 
268 	if (blkid >= dn->dn_maxblkid)
269 		goto out;
270 	if (blkid + nblks > dn->dn_maxblkid)
271 		nblks = dn->dn_maxblkid - blkid;
272 
273 	/* don't bother after the 100,000 blocks */
274 	nblks = MIN(nblks, 128*1024);
275 
276 	if (dn->dn_phys->dn_nlevels == 1) {
277 		int i;
278 		for (i = 0; i < nblks; i++) {
279 			blkptr_t *bp = dn->dn_phys->dn_blkptr;
280 			ASSERT3U(blkid + i, <, dn->dn_phys->dn_nblkptr);
281 			bp += blkid + i;
282 			if (dsl_dataset_block_freeable(ds, bp->blk_birth, tx)) {
283 				dprintf_bp(bp, "can free old%s", "");
284 				space += BP_GET_ASIZE(bp);
285 			}
286 		}
287 		goto out;
288 	}
289 
290 	while (nblks) {
291 		dmu_buf_impl_t *dbuf;
292 		int err, epbs, blkoff, tochk;
293 
294 		epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
295 		blkoff = P2PHASE(blkid, 1<<epbs);
296 		tochk = MIN((1<<epbs) - blkoff, nblks);
297 
298 		err = dbuf_hold_impl(dn, 1, blkid >> epbs, TRUE, FTAG, &dbuf);
299 		if (err == 0) {
300 			int i;
301 			blkptr_t *bp;
302 
303 			dbuf_read_havestruct(dbuf);
304 
305 			bp = dbuf->db.db_data;
306 			bp += blkoff;
307 
308 			for (i = 0; i < tochk; i++) {
309 				if (dsl_dataset_block_freeable(ds,
310 				    bp[i].blk_birth, tx)) {
311 					dprintf_bp(&bp[i],
312 					    "can free old%s", "");
313 					space += BP_GET_ASIZE(&bp[i]);
314 				}
315 			}
316 			dbuf_remove_ref(dbuf, FTAG);
317 		} else {
318 			/* the indirect block is sparse */
319 			ASSERT(err == ENOENT);
320 		}
321 
322 		blkid += tochk;
323 		nblks -= tochk;
324 	}
325 out:
326 	rw_exit(&dn->dn_struct_rwlock);
327 
328 	tx->tx_space_tofree += space;
329 }
330 
331 static void
332 dmu_tx_hold_free_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
333 {
334 	int dirty;
335 
336 	/* first block */
337 	if (off != 0 /* || dn->dn_maxblkid == 0 */)
338 		dmu_tx_count_write(tx, dn, off, 1);
339 	/* last block */
340 	if (len != DMU_OBJECT_END)
341 		dmu_tx_count_write(tx, dn, off+len, 1);
342 
343 	dmu_tx_count_dnode(tx, dn);
344 
345 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
346 		return;
347 	if (len == DMU_OBJECT_END)
348 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
349 
350 	/* XXX locking */
351 	dirty = dn->dn_dirtyblksz[0] | dn->dn_dirtyblksz[1] |
352 	    dn->dn_dirtyblksz[2] | dn->dn_dirtyblksz[3];
353 	if (dn->dn_assigned_tx != NULL && !dirty)
354 		dmu_tx_count_free(tx, dn, off, len);
355 }
356 
357 void
358 dmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off, uint64_t len)
359 {
360 	ASSERT(tx->tx_txg == 0);
361 
362 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_FREE,
363 	    dmu_tx_hold_free_impl, off, len);
364 }
365 
366 /* ARGSUSED */
367 static void
368 dmu_tx_hold_zap_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t nops, uint64_t cops)
369 {
370 	uint64_t nblocks;
371 	int epbs;
372 
373 	dmu_tx_count_dnode(tx, dn);
374 
375 	if (dn == NULL) {
376 		/*
377 		 * Assuming that nops+cops is not super huge, we will be
378 		 * able to fit a new object's entries into one leaf
379 		 * block.  So there will be at most 2 blocks total,
380 		 * including the header block.
381 		 */
382 		dmu_tx_count_write(tx, dn, 0, 2 << ZAP_BLOCK_SHIFT);
383 		return;
384 	}
385 
386 	ASSERT3P(dmu_ot[dn->dn_type].ot_byteswap, ==, zap_byteswap);
387 
388 	if (dn->dn_maxblkid == 0 && nops == 0) {
389 		/*
390 		 * If there is only one block  (i.e. this is a micro-zap)
391 		 * and we are only doing updates, the accounting is simple.
392 		 */
393 		if (dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
394 		    dn->dn_phys->dn_blkptr[0].blk_birth, tx))
395 			tx->tx_space_tooverwrite += dn->dn_datablksz;
396 		else
397 			tx->tx_space_towrite += dn->dn_datablksz;
398 		return;
399 	}
400 
401 	/*
402 	 * 3 blocks overwritten per op: target leaf, ptrtbl block, header block
403 	 * 3 new blocks written per op: new split leaf, 2 grown ptrtbl blocks
404 	 */
405 	dmu_tx_count_write(tx, dn, dn->dn_maxblkid * dn->dn_datablksz,
406 	    (nops * 6ULL + cops * 3ULL) << ZAP_BLOCK_SHIFT);
407 
408 	/*
409 	 * If the modified blocks are scattered to the four winds,
410 	 * we'll have to modify an indirect twig for each.
411 	 */
412 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
413 	for (nblocks = dn->dn_maxblkid >> epbs; nblocks != 0; nblocks >>= epbs)
414 		tx->tx_space_towrite +=
415 		    ((nops + cops) * 3ULL) << dn->dn_indblkshift;
416 }
417 
418 void
419 dmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int ops)
420 {
421 	ASSERT(tx->tx_txg == 0);
422 
423 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_ZAP,
424 	    dmu_tx_hold_zap_impl, (ops > 0?ops:0), (ops < 0?-ops:0));
425 }
426 
427 void
428 dmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
429 {
430 	ASSERT(tx->tx_txg == 0);
431 
432 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_BONUS,
433 	    dmu_tx_hold_write_impl, 0, 0);
434 }
435 
436 
437 /* ARGSUSED */
438 static void
439 dmu_tx_hold_space_impl(dmu_tx_t *tx, dnode_t *dn,
440     uint64_t space, uint64_t unused)
441 {
442 	tx->tx_space_towrite += space;
443 }
444 
445 void
446 dmu_tx_hold_space(dmu_tx_t *tx, uint64_t space)
447 {
448 	ASSERT(tx->tx_txg == 0);
449 
450 	dmu_tx_hold_object_impl(tx, tx->tx_objset, DMU_NEW_OBJECT, THT_SPACE,
451 	    dmu_tx_hold_space_impl, space, 0);
452 }
453 
454 int
455 dmu_tx_holds(dmu_tx_t *tx, uint64_t object)
456 {
457 	dmu_tx_hold_t *dth;
458 	int holds = 0;
459 
460 	/*
461 	 * By asserting that the tx is assigned, we're counting the
462 	 * number of dn_tx_holds, which is the same as the number of
463 	 * dn_holds.  Otherwise, we'd be counting dn_holds, but
464 	 * dn_tx_holds could be 0.
465 	 */
466 	ASSERT(tx->tx_txg != 0);
467 
468 	/* if (tx->tx_anyobj == TRUE) */
469 		/* return (0); */
470 
471 	for (dth = list_head(&tx->tx_holds); dth;
472 	    dth = list_next(&tx->tx_holds, dth)) {
473 		if (dth->dth_dnode && dth->dth_dnode->dn_object == object)
474 			holds++;
475 	}
476 
477 	return (holds);
478 }
479 
480 #ifdef ZFS_DEBUG
481 void
482 dmu_tx_dirty_buf(dmu_tx_t *tx, dmu_buf_impl_t *db)
483 {
484 	dmu_tx_hold_t *dth;
485 	int match_object = FALSE, match_offset = FALSE;
486 	dnode_t *dn = db->db_dnode;
487 
488 	ASSERT(tx->tx_txg != 0);
489 	ASSERT(tx->tx_objset == NULL || dn->dn_objset == tx->tx_objset->os);
490 	ASSERT3U(dn->dn_object, ==, db->db.db_object);
491 
492 	if (tx->tx_anyobj)
493 		return;
494 
495 	/* XXX No checking on the meta dnode for now */
496 	if (db->db.db_object & DMU_PRIVATE_OBJECT)
497 		return;
498 
499 	for (dth = list_head(&tx->tx_holds); dth;
500 	    dth = list_next(&tx->tx_holds, dth)) {
501 		ASSERT(dn == NULL || dn->dn_assigned_txg == tx->tx_txg);
502 		if (dth->dth_dnode == dn && dth->dth_type != THT_NEWOBJECT)
503 			match_object = TRUE;
504 		if (dth->dth_dnode == NULL || dth->dth_dnode == dn) {
505 			int datablkshift = dn->dn_datablkshift ?
506 			    dn->dn_datablkshift : SPA_MAXBLOCKSHIFT;
507 			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
508 			int shift = datablkshift + epbs * db->db_level;
509 			uint64_t beginblk = shift >= 64 ? 0 :
510 			    (dth->dth_arg1 >> shift);
511 			uint64_t endblk = shift >= 64 ? 0 :
512 			    ((dth->dth_arg1 + dth->dth_arg2 - 1) >> shift);
513 			uint64_t blkid = db->db_blkid;
514 
515 			/* XXX dth_arg2 better not be zero... */
516 
517 			dprintf("found dth type %x beginblk=%llx endblk=%llx\n",
518 			    dth->dth_type, beginblk, endblk);
519 
520 			switch (dth->dth_type) {
521 			case THT_WRITE:
522 				if (blkid >= beginblk && blkid <= endblk)
523 					match_offset = TRUE;
524 				/*
525 				 * We will let this hold work for the bonus
526 				 * buffer so that we don't need to hold it
527 				 * when creating a new object.
528 				 */
529 				if (blkid == DB_BONUS_BLKID)
530 					match_offset = TRUE;
531 				/*
532 				 * They might have to increase nlevels,
533 				 * thus dirtying the new TLIBs.  Or the
534 				 * might have to change the block size,
535 				 * thus dirying the new lvl=0 blk=0.
536 				 */
537 				if (blkid == 0)
538 					match_offset = TRUE;
539 				break;
540 			case THT_FREE:
541 				if (blkid == beginblk &&
542 				    (dth->dth_arg1 != 0 ||
543 				    dn->dn_maxblkid == 0))
544 					match_offset = TRUE;
545 				if (blkid == endblk &&
546 				    dth->dth_arg2 != DMU_OBJECT_END)
547 					match_offset = TRUE;
548 				break;
549 			case THT_BONUS:
550 				if (blkid == DB_BONUS_BLKID)
551 					match_offset = TRUE;
552 				break;
553 			case THT_ZAP:
554 				match_offset = TRUE;
555 				break;
556 			case THT_NEWOBJECT:
557 				match_object = TRUE;
558 				break;
559 			default:
560 				ASSERT(!"bad dth_type");
561 			}
562 		}
563 		if (match_object && match_offset)
564 			return;
565 	}
566 	panic("dirtying dbuf obj=%llx lvl=%u blkid=%llx but not tx_held\n",
567 	    (u_longlong_t)db->db.db_object, db->db_level,
568 	    (u_longlong_t)db->db_blkid);
569 }
570 #endif
571 
572 static int
573 dmu_tx_try_assign(dmu_tx_t *tx, uint64_t txg_how, dmu_tx_hold_t **last_dth)
574 {
575 	dmu_tx_hold_t *dth;
576 	uint64_t lsize, asize, fsize;
577 
578 	*last_dth = NULL;
579 
580 	tx->tx_space_towrite = 0;
581 	tx->tx_space_tofree = 0;
582 	tx->tx_space_tooverwrite = 0;
583 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
584 
585 	if (txg_how >= TXG_INITIAL && txg_how != tx->tx_txg)
586 		return (ERESTART);
587 
588 	for (dth = list_head(&tx->tx_holds); dth;
589 	    *last_dth = dth, dth = list_next(&tx->tx_holds, dth)) {
590 		dnode_t *dn = dth->dth_dnode;
591 		if (dn != NULL) {
592 			mutex_enter(&dn->dn_mtx);
593 			while (dn->dn_assigned_txg == tx->tx_txg - 1) {
594 				if (txg_how != TXG_WAIT) {
595 					mutex_exit(&dn->dn_mtx);
596 					return (ERESTART);
597 				}
598 				cv_wait(&dn->dn_notxholds, &dn->dn_mtx);
599 			}
600 			if (dn->dn_assigned_txg == 0) {
601 				ASSERT(dn->dn_assigned_tx == NULL);
602 				dn->dn_assigned_txg = tx->tx_txg;
603 				dn->dn_assigned_tx = tx;
604 			} else {
605 				ASSERT(dn->dn_assigned_txg == tx->tx_txg);
606 				if (dn->dn_assigned_tx != tx)
607 					dn->dn_assigned_tx = NULL;
608 			}
609 			(void) refcount_add(&dn->dn_tx_holds, tx);
610 			mutex_exit(&dn->dn_mtx);
611 		}
612 		if (dth->dth_func)
613 			dth->dth_func(tx, dn, dth->dth_arg1, dth->dth_arg2);
614 	}
615 
616 	/*
617 	 * Convert logical size to worst-case allocated size.
618 	 */
619 	fsize = spa_get_asize(tx->tx_pool->dp_spa, tx->tx_space_tooverwrite) +
620 	    tx->tx_space_tofree;
621 	lsize = tx->tx_space_towrite + tx->tx_space_tooverwrite;
622 	asize = spa_get_asize(tx->tx_pool->dp_spa, lsize);
623 	tx->tx_space_towrite = asize;
624 
625 	if (tx->tx_dir && asize != 0) {
626 		int err = dsl_dir_tempreserve_space(tx->tx_dir,
627 		    lsize, asize, fsize, &tx->tx_tempreserve_cookie, tx);
628 		if (err)
629 			return (err);
630 	}
631 
632 	return (0);
633 }
634 
635 static uint64_t
636 dmu_tx_unassign(dmu_tx_t *tx, dmu_tx_hold_t *last_dth)
637 {
638 	uint64_t txg = tx->tx_txg;
639 	dmu_tx_hold_t *dth;
640 
641 	ASSERT(txg != 0);
642 
643 	txg_rele_to_quiesce(&tx->tx_txgh);
644 
645 	for (dth = last_dth; dth; dth = list_prev(&tx->tx_holds, dth)) {
646 		dnode_t *dn = dth->dth_dnode;
647 
648 		if (dn == NULL)
649 			continue;
650 		mutex_enter(&dn->dn_mtx);
651 		ASSERT3U(dn->dn_assigned_txg, ==, txg);
652 
653 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
654 			dn->dn_assigned_txg = 0;
655 			dn->dn_assigned_tx = NULL;
656 			cv_broadcast(&dn->dn_notxholds);
657 		}
658 		mutex_exit(&dn->dn_mtx);
659 	}
660 
661 	txg_rele_to_sync(&tx->tx_txgh);
662 
663 	tx->tx_txg = 0;
664 	return (txg);
665 }
666 
667 /*
668  * Assign tx to a transaction group.  txg_how can be one of:
669  *
670  * (1)	TXG_WAIT.  If the current open txg is full, waits until there's
671  *	a new one.  This should be used when you're not holding locks.
672  *	If will only fail if we're truly out of space (or over quota).
673  *
674  * (2)	TXG_NOWAIT.  If we can't assign into the current open txg without
675  *	blocking, returns immediately with ERESTART.  This should be used
676  *	whenever you're holding locks.  On an ERESTART error, the caller
677  *	should drop locks, do a txg_wait_open(dp, 0), and try again.
678  *
679  * (3)	A specific txg.  Use this if you need to ensure that multiple
680  *	transactions all sync in the same txg.  Like TXG_NOWAIT, it
681  *	returns ERESTART if it can't assign you into the requested txg.
682  */
683 int
684 dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
685 {
686 	dmu_tx_hold_t *last_dth;
687 	int err;
688 
689 	ASSERT(tx->tx_txg == 0);
690 	ASSERT(txg_how != 0);
691 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
692 	ASSERT3U(tx->tx_space_towrite, ==, 0);
693 	ASSERT3U(tx->tx_space_tofree, ==, 0);
694 
695 	while ((err = dmu_tx_try_assign(tx, txg_how, &last_dth)) != 0) {
696 		uint64_t txg = dmu_tx_unassign(tx, last_dth);
697 
698 		if (err != ERESTART || txg_how != TXG_WAIT)
699 			return (err);
700 
701 		txg_wait_open(tx->tx_pool, txg + 1);
702 	}
703 
704 	txg_rele_to_quiesce(&tx->tx_txgh);
705 
706 	return (0);
707 }
708 
709 void
710 dmu_tx_willuse_space(dmu_tx_t *tx, int64_t delta)
711 {
712 	if (tx->tx_dir == NULL || delta == 0)
713 		return;
714 
715 	if (delta > 0) {
716 		ASSERT3U(refcount_count(&tx->tx_space_written) + delta, <=,
717 		    tx->tx_space_towrite);
718 		(void) refcount_add_many(&tx->tx_space_written, delta, NULL);
719 	} else {
720 		(void) refcount_add_many(&tx->tx_space_freed, -delta, NULL);
721 	}
722 }
723 
724 void
725 dmu_tx_commit(dmu_tx_t *tx)
726 {
727 	dmu_tx_hold_t *dth;
728 
729 	ASSERT(tx->tx_txg != 0);
730 
731 	while (dth = list_head(&tx->tx_holds)) {
732 		dnode_t *dn = dth->dth_dnode;
733 
734 		list_remove(&tx->tx_holds, dth);
735 		kmem_free(dth, sizeof (dmu_tx_hold_t));
736 		if (dn == NULL)
737 			continue;
738 		mutex_enter(&dn->dn_mtx);
739 		ASSERT3U(dn->dn_assigned_txg, ==, tx->tx_txg);
740 
741 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
742 			dn->dn_assigned_txg = 0;
743 			dn->dn_assigned_tx = NULL;
744 			cv_broadcast(&dn->dn_notxholds);
745 		}
746 		mutex_exit(&dn->dn_mtx);
747 		dnode_rele(dn, tx);
748 	}
749 
750 	if (tx->tx_dir && tx->tx_space_towrite > 0) {
751 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
752 	}
753 
754 	if (tx->tx_anyobj == FALSE)
755 		txg_rele_to_sync(&tx->tx_txgh);
756 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
757 	    tx->tx_space_towrite, refcount_count(&tx->tx_space_written),
758 	    tx->tx_space_tofree, refcount_count(&tx->tx_space_freed));
759 	refcount_destroy_many(&tx->tx_space_written,
760 	    refcount_count(&tx->tx_space_written));
761 	refcount_destroy_many(&tx->tx_space_freed,
762 	    refcount_count(&tx->tx_space_freed));
763 #ifdef ZFS_DEBUG
764 	if (tx->tx_debug_buf)
765 		kmem_free(tx->tx_debug_buf, 4096);
766 #endif
767 	kmem_free(tx, sizeof (dmu_tx_t));
768 }
769 
770 void
771 dmu_tx_abort(dmu_tx_t *tx)
772 {
773 	dmu_tx_hold_t *dth;
774 
775 	ASSERT(tx->tx_txg == 0);
776 
777 	while (dth = list_head(&tx->tx_holds)) {
778 		dnode_t *dn = dth->dth_dnode;
779 
780 		list_remove(&tx->tx_holds, dth);
781 		kmem_free(dth, sizeof (dmu_tx_hold_t));
782 		if (dn != NULL)
783 			dnode_rele(dn, tx);
784 	}
785 	refcount_destroy_many(&tx->tx_space_written,
786 	    refcount_count(&tx->tx_space_written));
787 	refcount_destroy_many(&tx->tx_space_freed,
788 	    refcount_count(&tx->tx_space_freed));
789 #ifdef ZFS_DEBUG
790 	if (tx->tx_debug_buf)
791 		kmem_free(tx->tx_debug_buf, 4096);
792 #endif
793 	kmem_free(tx, sizeof (dmu_tx_t));
794 }
795 
796 uint64_t
797 dmu_tx_get_txg(dmu_tx_t *tx)
798 {
799 	ASSERT(tx->tx_txg != 0);
800 	return (tx->tx_txg);
801 }
802