xref: /titanic_52/usr/src/uts/common/fs/zfs/dmu_tx.c (revision 103b2b152ab1f30e081cd8f98b88e71e6cd6d2fc)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dbuf.h>
31 #include <sys/dmu_tx.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dsl_dataset.h> /* for dsl_dataset_block_freeable() */
34 #include <sys/dsl_dir.h> /* for dsl_dir_tempreserve_*() */
35 #include <sys/dsl_pool.h>
36 #include <sys/zap_impl.h>	/* for ZAP_BLOCK_SHIFT */
37 #include <sys/spa.h>
38 #include <sys/zfs_context.h>
39 
40 typedef void (*dmu_tx_hold_func_t)(dmu_tx_t *tx, struct dnode *dn,
41     uint64_t arg1, uint64_t arg2);
42 
43 #ifdef ZFS_DEBUG
44 int dmu_use_tx_debug_bufs = 1;
45 #endif
46 
47 dmu_tx_t *
48 dmu_tx_create_ds(dsl_dir_t *dd)
49 {
50 	dmu_tx_t *tx = kmem_zalloc(sizeof (dmu_tx_t), KM_SLEEP);
51 	tx->tx_dir = dd;
52 	if (dd)
53 		tx->tx_pool = dd->dd_pool;
54 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
55 	    offsetof(dmu_tx_hold_t, dth_node));
56 	refcount_create(&tx->tx_space_written);
57 	refcount_create(&tx->tx_space_freed);
58 	return (tx);
59 }
60 
61 dmu_tx_t *
62 dmu_tx_create(objset_t *os)
63 {
64 	dmu_tx_t *tx = dmu_tx_create_ds(os->os->os_dsl_dataset->ds_dir);
65 	tx->tx_objset = os;
66 	tx->tx_lastsnap_txg = dsl_dataset_prev_snap_txg(os->os->os_dsl_dataset);
67 	return (tx);
68 }
69 
70 dmu_tx_t *
71 dmu_tx_create_assigned(struct dsl_pool *dp, uint64_t txg)
72 {
73 	dmu_tx_t *tx = dmu_tx_create_ds(NULL);
74 
75 	ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
76 	tx->tx_pool = dp;
77 	tx->tx_txg = txg;
78 	tx->tx_anyobj = TRUE;
79 
80 	return (tx);
81 }
82 
83 int
84 dmu_tx_is_syncing(dmu_tx_t *tx)
85 {
86 	return (tx->tx_anyobj);
87 }
88 
89 int
90 dmu_tx_private_ok(dmu_tx_t *tx)
91 {
92 	return (tx->tx_anyobj);
93 }
94 
95 static void
96 dmu_tx_hold_object_impl(dmu_tx_t *tx, objset_t *os, uint64_t object,
97     enum dmu_tx_hold_type type, dmu_tx_hold_func_t func,
98     uint64_t arg1, uint64_t arg2)
99 {
100 	dmu_tx_hold_t *dth;
101 	dnode_t *dn = NULL;
102 	int err;
103 
104 	if (object != DMU_NEW_OBJECT) {
105 		err = dnode_hold(os->os, object, tx, &dn);
106 		if (err) {
107 			tx->tx_err = err;
108 			return;
109 		}
110 
111 		if (err == 0 && tx->tx_txg != 0) {
112 			mutex_enter(&dn->dn_mtx);
113 			/*
114 			 * dn->dn_assigned_txg == tx->tx_txg doesn't pose a
115 			 * problem, but there's no way for it to happen (for
116 			 * now, at least).
117 			 */
118 			ASSERT(dn->dn_assigned_txg == 0);
119 			ASSERT(dn->dn_assigned_tx == NULL);
120 			dn->dn_assigned_txg = tx->tx_txg;
121 			dn->dn_assigned_tx = tx;
122 			(void) refcount_add(&dn->dn_tx_holds, tx);
123 			mutex_exit(&dn->dn_mtx);
124 		}
125 	}
126 
127 	dth = kmem_zalloc(sizeof (dmu_tx_hold_t), KM_SLEEP);
128 	dth->dth_dnode = dn;
129 	dth->dth_type = type;
130 	dth->dth_arg1 = arg1;
131 	dth->dth_arg2 = arg2;
132 	list_insert_tail(&tx->tx_holds, dth);
133 
134 	if (func)
135 		func(tx, dn, arg1, arg2);
136 }
137 
138 void
139 dmu_tx_add_new_object(dmu_tx_t *tx, objset_t *os, uint64_t object)
140 {
141 	/*
142 	 * If we're syncing, they can manipulate any object anyhow, and
143 	 * the hold on the dnode_t can cause problems.
144 	 */
145 	if (!dmu_tx_is_syncing(tx)) {
146 		dmu_tx_hold_object_impl(tx, os, object, THT_NEWOBJECT,
147 		    NULL, 0, 0);
148 	}
149 }
150 
151 static int
152 dmu_tx_check_ioerr(zio_t *zio, dnode_t *dn, int level, uint64_t blkid)
153 {
154 	int err;
155 	dmu_buf_impl_t *db;
156 
157 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
158 	db = dbuf_hold_level(dn, level, blkid, FTAG);
159 	rw_exit(&dn->dn_struct_rwlock);
160 	if (db == NULL)
161 		return (EIO);
162 	err = dbuf_read(db, zio, DB_RF_CANFAIL);
163 	dbuf_rele(db, FTAG);
164 	return (err);
165 }
166 
167 /* ARGSUSED */
168 static void
169 dmu_tx_count_write(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
170 {
171 	uint64_t start, end, i, space;
172 	int min_bs, max_bs, min_ibs, max_ibs, epbs, bits;
173 
174 	if (len == 0)
175 		return;
176 
177 	min_bs = SPA_MINBLOCKSHIFT;
178 	max_bs = SPA_MAXBLOCKSHIFT;
179 	min_ibs = DN_MIN_INDBLKSHIFT;
180 	max_ibs = DN_MAX_INDBLKSHIFT;
181 
182 	/*
183 	 * For i/o error checking, read the first and last level-0
184 	 * blocks, and all the level-1 blocks.  We needn't do this on
185 	 * the meta-dnode, because we've already read it in.
186 	 */
187 
188 	if (dn && dn->dn_object != DMU_META_DNODE_OBJECT) {
189 		int err;
190 
191 		if (dn->dn_maxblkid == 0) {
192 			err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
193 			if (err) {
194 				tx->tx_err = err;
195 				return;
196 			}
197 		} else {
198 			zio_t *zio = zio_root(tx->tx_pool->dp_spa,
199 			    NULL, NULL, ZIO_FLAG_CANFAIL);
200 
201 			/* first level-0 block */
202 			start = off/dn->dn_datablksz;
203 			err = dmu_tx_check_ioerr(zio, dn, 0, start);
204 			if (err) {
205 				tx->tx_err = err;
206 				return;
207 			}
208 
209 			/* last level-0 block */
210 			end = (off+len)/dn->dn_datablksz;
211 			if (end != start) {
212 				err = dmu_tx_check_ioerr(zio, dn, 0, end);
213 				if (err) {
214 					tx->tx_err = err;
215 					return;
216 				}
217 			}
218 
219 			/* level-1 blocks */
220 			if (dn->dn_nlevels > 1) {
221 				start >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
222 				end >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
223 				for (i = start+1; i < end; i++) {
224 					err = dmu_tx_check_ioerr(zio, dn, 1, i);
225 					if (err) {
226 						tx->tx_err = err;
227 						return;
228 					}
229 				}
230 			}
231 
232 			err = zio_wait(zio);
233 			if (err) {
234 				tx->tx_err = err;
235 				return;
236 			}
237 		}
238 	}
239 
240 	/*
241 	 * If there's more than one block, the blocksize can't change,
242 	 * so we can make a more precise estimate.  Alternatively,
243 	 * if the dnode's ibs is larger than max_ibs, always use that.
244 	 * This ensures that if we reduce DN_MAX_INDBLKSHIFT,
245 	 * the code will still work correctly on existing pools.
246 	 */
247 	if (dn && (dn->dn_maxblkid != 0 || dn->dn_indblkshift > max_ibs)) {
248 		min_ibs = max_ibs = dn->dn_indblkshift;
249 		if (dn->dn_datablkshift != 0)
250 			min_bs = max_bs = dn->dn_datablkshift;
251 	}
252 
253 	/*
254 	 * 'end' is the last thing we will access, not one past.
255 	 * This way we won't overflow when accessing the last byte.
256 	 */
257 	start = P2ALIGN(off, 1ULL << max_bs);
258 	end = P2ROUNDUP(off + len, 1ULL << max_bs) - 1;
259 	space = end - start + 1;
260 
261 	start >>= min_bs;
262 	end >>= min_bs;
263 
264 	epbs = min_ibs - SPA_BLKPTRSHIFT;
265 
266 	/*
267 	 * The object contains at most 2^(64 - min_bs) blocks,
268 	 * and each indirect level maps 2^epbs.
269 	 */
270 	for (bits = 64 - min_bs; bits >= 0; bits -= epbs) {
271 		start >>= epbs;
272 		end >>= epbs;
273 		/*
274 		 * If we increase the number of levels of indirection,
275 		 * we'll need new blkid=0 indirect blocks.  If start == 0,
276 		 * we're already accounting for that blocks; and if end == 0,
277 		 * we can't increase the number of levels beyond that.
278 		 */
279 		if (start != 0 && end != 0)
280 			space += 1ULL << max_ibs;
281 		space += (end - start + 1) << max_ibs;
282 	}
283 
284 	ASSERT(space < 2 * DMU_MAX_ACCESS);
285 
286 	tx->tx_space_towrite += space;
287 }
288 
289 static void
290 dmu_tx_count_dnode(dmu_tx_t *tx, dnode_t *dn)
291 {
292 	dnode_t *mdn = tx->tx_objset->os->os_meta_dnode;
293 	uint64_t object = dn ? dn->dn_object : DN_MAX_OBJECT - 1;
294 	uint64_t pre_write_space;
295 
296 	ASSERT(object < DN_MAX_OBJECT);
297 	pre_write_space = tx->tx_space_towrite;
298 	dmu_tx_count_write(tx, mdn, object << DNODE_SHIFT, 1 << DNODE_SHIFT);
299 	if (dn && dn->dn_dbuf->db_blkptr &&
300 	    dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
301 	    dn->dn_dbuf->db_blkptr->blk_birth)) {
302 		tx->tx_space_tooverwrite +=
303 			tx->tx_space_towrite - pre_write_space;
304 		tx->tx_space_towrite = pre_write_space;
305 	}
306 }
307 
308 /* ARGSUSED */
309 static void
310 dmu_tx_hold_write_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
311 {
312 	dmu_tx_count_write(tx, dn, off, len);
313 	dmu_tx_count_dnode(tx, dn);
314 }
315 
316 void
317 dmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
318 {
319 	ASSERT(tx->tx_txg == 0);
320 	ASSERT(len < DMU_MAX_ACCESS);
321 	ASSERT(UINT64_MAX - off >= len - 1);
322 
323 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_WRITE,
324 	    dmu_tx_hold_write_impl, off, len);
325 }
326 
327 static void
328 dmu_tx_count_free(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
329 {
330 	uint64_t blkid, nblks;
331 	uint64_t space = 0;
332 	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
333 	int dirty;
334 
335 	/*
336 	 * We don't need to use any locking to check for dirtyness
337 	 * because it's OK if we get stale data -- the dnode may become
338 	 * dirty immediately after our check anyway.  This is just a
339 	 * means to avoid the expensive count when we aren't sure we
340 	 * need it.  We need to be able to deal with a dirty dnode.
341 	 */
342 	dirty = list_link_active(&dn->dn_dirty_link[0]) |
343 	    list_link_active(&dn->dn_dirty_link[1]) |
344 	    list_link_active(&dn->dn_dirty_link[2]) |
345 	    list_link_active(&dn->dn_dirty_link[3]);
346 	if (dirty || dn->dn_assigned_tx || dn->dn_phys->dn_nlevels == 0)
347 		return;
348 
349 	/*
350 	 * the struct_rwlock protects us against dn_phys->dn_nlevels
351 	 * changing, in case (against all odds) we manage to dirty &
352 	 * sync out the changes after we check for being dirty.
353 	 * also, dbuf_hold_impl() wants us to have the struct_rwlock.
354 	 *
355 	 * It's fine to use dn_datablkshift rather than the dn_phys
356 	 * equivalent because if it is changing, maxblkid==0 and we will
357 	 * bail.
358 	 */
359 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
360 	if (dn->dn_phys->dn_maxblkid == 0) {
361 		if (off == 0 && len >= dn->dn_datablksz) {
362 			blkid = 0;
363 			nblks = 1;
364 		} else {
365 			rw_exit(&dn->dn_struct_rwlock);
366 			return;
367 		}
368 	} else {
369 		blkid = off >> dn->dn_datablkshift;
370 		nblks = (off + len) >> dn->dn_datablkshift;
371 
372 		if (blkid >= dn->dn_phys->dn_maxblkid) {
373 			rw_exit(&dn->dn_struct_rwlock);
374 			return;
375 		}
376 		if (blkid + nblks > dn->dn_phys->dn_maxblkid)
377 			nblks = dn->dn_phys->dn_maxblkid - blkid;
378 
379 		/* don't bother after 128,000 blocks */
380 		nblks = MIN(nblks, 128*1024);
381 	}
382 
383 	if (dn->dn_phys->dn_nlevels == 1) {
384 		int i;
385 		for (i = 0; i < nblks; i++) {
386 			blkptr_t *bp = dn->dn_phys->dn_blkptr;
387 			ASSERT3U(blkid + i, <, dn->dn_phys->dn_nblkptr);
388 			bp += blkid + i;
389 			if (dsl_dataset_block_freeable(ds, bp->blk_birth)) {
390 				dprintf_bp(bp, "can free old%s", "");
391 				space += BP_GET_ASIZE(bp);
392 			}
393 		}
394 		nblks = 0;
395 	}
396 
397 	while (nblks) {
398 		dmu_buf_impl_t *dbuf;
399 		int err, epbs, blkoff, tochk;
400 
401 		epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
402 		blkoff = P2PHASE(blkid, 1<<epbs);
403 		tochk = MIN((1<<epbs) - blkoff, nblks);
404 
405 		err = dbuf_hold_impl(dn, 1, blkid >> epbs, TRUE, FTAG, &dbuf);
406 		if (err == 0) {
407 			int i;
408 			blkptr_t *bp;
409 
410 			err = dbuf_read(dbuf, NULL,
411 			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL);
412 			if (err != 0) {
413 				tx->tx_err = err;
414 				dbuf_rele(dbuf, FTAG);
415 				break;
416 			}
417 
418 			bp = dbuf->db.db_data;
419 			bp += blkoff;
420 
421 			for (i = 0; i < tochk; i++) {
422 				if (dsl_dataset_block_freeable(ds,
423 				    bp[i].blk_birth)) {
424 					dprintf_bp(&bp[i],
425 					    "can free old%s", "");
426 					space += BP_GET_ASIZE(&bp[i]);
427 				}
428 			}
429 			dbuf_rele(dbuf, FTAG);
430 		}
431 		if (err != 0 && err != ENOENT) {
432 			tx->tx_err = err;
433 			break;
434 		}
435 
436 		blkid += tochk;
437 		nblks -= tochk;
438 	}
439 	rw_exit(&dn->dn_struct_rwlock);
440 
441 	tx->tx_space_tofree += space;
442 }
443 
444 static void
445 dmu_tx_hold_free_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
446 {
447 	uint64_t start, end, i;
448 	int err, shift;
449 	zio_t *zio;
450 
451 	/* first block */
452 	if (off != 0 /* || dn->dn_maxblkid == 0 */)
453 		dmu_tx_count_write(tx, dn, off, 1);
454 	/* last block */
455 	if (len != DMU_OBJECT_END)
456 		dmu_tx_count_write(tx, dn, off+len, 1);
457 
458 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
459 		return;
460 	if (len == DMU_OBJECT_END)
461 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
462 
463 	/*
464 	 * For i/o error checking, read the first and last level-0
465 	 * blocks, and all the level-1 blocks.  The above count_write's
466 	 * will take care of the level-0 blocks.
467 	 */
468 	shift = dn->dn_datablkshift + dn->dn_indblkshift - SPA_BLKPTRSHIFT;
469 	start = off >> shift;
470 	end = dn->dn_datablkshift ? ((off+len) >> shift) : 0;
471 
472 	zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
473 	for (i = start+1; i < end; i++) {
474 		uint64_t ibyte = i << shift;
475 		err = dnode_next_offset(dn, FALSE, &ibyte, 2, 1);
476 		i = ibyte >> shift;
477 		if (err == ESRCH)
478 			break;
479 		if (err) {
480 			tx->tx_err = err;
481 			return;
482 		}
483 
484 		err = dmu_tx_check_ioerr(zio, dn, 1, i);
485 		if (err) {
486 			tx->tx_err = err;
487 			return;
488 		}
489 	}
490 	err = zio_wait(zio);
491 	if (err) {
492 		tx->tx_err = err;
493 		return;
494 	}
495 
496 	dmu_tx_count_dnode(tx, dn);
497 	dmu_tx_count_free(tx, dn, off, len);
498 }
499 
500 void
501 dmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off, uint64_t len)
502 {
503 	ASSERT(tx->tx_txg == 0);
504 
505 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_FREE,
506 	    dmu_tx_hold_free_impl, off, len);
507 }
508 
509 /* ARGSUSED */
510 static void
511 dmu_tx_hold_zap_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t add, uint64_t iname)
512 {
513 	uint64_t nblocks;
514 	int epbs, err;
515 	char *name = (char *)(uintptr_t)iname;
516 
517 	dmu_tx_count_dnode(tx, dn);
518 
519 	if (dn == NULL) {
520 		/*
521 		 * We will be able to fit a new object's entries into one leaf
522 		 * block.  So there will be at most 2 blocks total,
523 		 * including the header block.
524 		 */
525 		dmu_tx_count_write(tx, dn, 0, 2 << fzap_default_block_shift);
526 		return;
527 	}
528 
529 	ASSERT3P(dmu_ot[dn->dn_type].ot_byteswap, ==, zap_byteswap);
530 
531 	if (dn->dn_maxblkid == 0 && !add) {
532 		/*
533 		 * If there is only one block  (i.e. this is a micro-zap)
534 		 * and we are not adding anything, the accounting is simple.
535 		 */
536 		err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
537 		if (err) {
538 			tx->tx_err = err;
539 			return;
540 		}
541 
542 		if (dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
543 		    dn->dn_phys->dn_blkptr[0].blk_birth))
544 			tx->tx_space_tooverwrite += dn->dn_datablksz;
545 		else
546 			tx->tx_space_towrite += dn->dn_datablksz;
547 		return;
548 	}
549 
550 	if (dn->dn_maxblkid > 0 && name) {
551 		/*
552 		 * access the name in this fat-zap so that we'll check
553 		 * for i/o errors to the leaf blocks, etc.
554 		 */
555 		err = zap_lookup(&dn->dn_objset->os, dn->dn_object, name,
556 		    8, 0, NULL);
557 		if (err == EIO) {
558 			tx->tx_err = err;
559 			return;
560 		}
561 	}
562 
563 	/*
564 	 * 3 blocks overwritten: target leaf, ptrtbl block, header block
565 	 * 3 new blocks written if adding: new split leaf, 2 grown ptrtbl blocks
566 	 */
567 	dmu_tx_count_write(tx, dn, dn->dn_maxblkid * dn->dn_datablksz,
568 	    (3 + add ? 3 : 0) << dn->dn_datablkshift);
569 
570 	/*
571 	 * If the modified blocks are scattered to the four winds,
572 	 * we'll have to modify an indirect twig for each.
573 	 */
574 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
575 	for (nblocks = dn->dn_maxblkid >> epbs; nblocks != 0; nblocks >>= epbs)
576 		tx->tx_space_towrite += 3 << dn->dn_indblkshift;
577 }
578 
579 void
580 dmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int add, char *name)
581 {
582 	ASSERT(tx->tx_txg == 0);
583 
584 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_ZAP,
585 	    dmu_tx_hold_zap_impl, add, (uintptr_t)name);
586 }
587 
588 void
589 dmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
590 {
591 	ASSERT(tx->tx_txg == 0);
592 
593 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_BONUS,
594 	    dmu_tx_hold_write_impl, 0, 0);
595 }
596 
597 
598 /* ARGSUSED */
599 static void
600 dmu_tx_hold_space_impl(dmu_tx_t *tx, dnode_t *dn,
601     uint64_t space, uint64_t unused)
602 {
603 	tx->tx_space_towrite += space;
604 }
605 
606 void
607 dmu_tx_hold_space(dmu_tx_t *tx, uint64_t space)
608 {
609 	ASSERT(tx->tx_txg == 0);
610 
611 	dmu_tx_hold_object_impl(tx, tx->tx_objset, DMU_NEW_OBJECT, THT_SPACE,
612 	    dmu_tx_hold_space_impl, space, 0);
613 }
614 
615 int
616 dmu_tx_holds(dmu_tx_t *tx, uint64_t object)
617 {
618 	dmu_tx_hold_t *dth;
619 	int holds = 0;
620 
621 	/*
622 	 * By asserting that the tx is assigned, we're counting the
623 	 * number of dn_tx_holds, which is the same as the number of
624 	 * dn_holds.  Otherwise, we'd be counting dn_holds, but
625 	 * dn_tx_holds could be 0.
626 	 */
627 	ASSERT(tx->tx_txg != 0);
628 
629 	/* if (tx->tx_anyobj == TRUE) */
630 		/* return (0); */
631 
632 	for (dth = list_head(&tx->tx_holds); dth;
633 	    dth = list_next(&tx->tx_holds, dth)) {
634 		if (dth->dth_dnode && dth->dth_dnode->dn_object == object)
635 			holds++;
636 	}
637 
638 	return (holds);
639 }
640 
641 #ifdef ZFS_DEBUG
642 void
643 dmu_tx_dirty_buf(dmu_tx_t *tx, dmu_buf_impl_t *db)
644 {
645 	dmu_tx_hold_t *dth;
646 	int match_object = FALSE, match_offset = FALSE;
647 	dnode_t *dn = db->db_dnode;
648 
649 	ASSERT(tx->tx_txg != 0);
650 	ASSERT(tx->tx_objset == NULL || dn->dn_objset == tx->tx_objset->os);
651 	ASSERT3U(dn->dn_object, ==, db->db.db_object);
652 
653 	if (tx->tx_anyobj)
654 		return;
655 
656 	/* XXX No checking on the meta dnode for now */
657 	if (db->db.db_object == DMU_META_DNODE_OBJECT)
658 		return;
659 
660 	for (dth = list_head(&tx->tx_holds); dth;
661 	    dth = list_next(&tx->tx_holds, dth)) {
662 		ASSERT(dn == NULL || dn->dn_assigned_txg == tx->tx_txg);
663 		if (dth->dth_dnode == dn && dth->dth_type != THT_NEWOBJECT)
664 			match_object = TRUE;
665 		if (dth->dth_dnode == NULL || dth->dth_dnode == dn) {
666 			int datablkshift = dn->dn_datablkshift ?
667 			    dn->dn_datablkshift : SPA_MAXBLOCKSHIFT;
668 			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
669 			int shift = datablkshift + epbs * db->db_level;
670 			uint64_t beginblk = shift >= 64 ? 0 :
671 			    (dth->dth_arg1 >> shift);
672 			uint64_t endblk = shift >= 64 ? 0 :
673 			    ((dth->dth_arg1 + dth->dth_arg2 - 1) >> shift);
674 			uint64_t blkid = db->db_blkid;
675 
676 			/* XXX dth_arg2 better not be zero... */
677 
678 			dprintf("found dth type %x beginblk=%llx endblk=%llx\n",
679 			    dth->dth_type, beginblk, endblk);
680 
681 			switch (dth->dth_type) {
682 			case THT_WRITE:
683 				if (blkid >= beginblk && blkid <= endblk)
684 					match_offset = TRUE;
685 				/*
686 				 * We will let this hold work for the bonus
687 				 * buffer so that we don't need to hold it
688 				 * when creating a new object.
689 				 */
690 				if (blkid == DB_BONUS_BLKID)
691 					match_offset = TRUE;
692 				/*
693 				 * They might have to increase nlevels,
694 				 * thus dirtying the new TLIBs.  Or the
695 				 * might have to change the block size,
696 				 * thus dirying the new lvl=0 blk=0.
697 				 */
698 				if (blkid == 0)
699 					match_offset = TRUE;
700 				break;
701 			case THT_FREE:
702 				if (blkid == beginblk &&
703 				    (dth->dth_arg1 != 0 ||
704 				    dn->dn_maxblkid == 0))
705 					match_offset = TRUE;
706 				if (blkid == endblk &&
707 				    dth->dth_arg2 != DMU_OBJECT_END)
708 					match_offset = TRUE;
709 				break;
710 			case THT_BONUS:
711 				if (blkid == DB_BONUS_BLKID)
712 					match_offset = TRUE;
713 				break;
714 			case THT_ZAP:
715 				match_offset = TRUE;
716 				break;
717 			case THT_NEWOBJECT:
718 				match_object = TRUE;
719 				break;
720 			default:
721 				ASSERT(!"bad dth_type");
722 			}
723 		}
724 		if (match_object && match_offset)
725 			return;
726 	}
727 	panic("dirtying dbuf obj=%llx lvl=%u blkid=%llx but not tx_held\n",
728 	    (u_longlong_t)db->db.db_object, db->db_level,
729 	    (u_longlong_t)db->db_blkid);
730 }
731 #endif
732 
733 static int
734 dmu_tx_try_assign(dmu_tx_t *tx, uint64_t txg_how, dmu_tx_hold_t **last_dth)
735 {
736 	dmu_tx_hold_t *dth;
737 	uint64_t lsize, asize, fsize, towrite;
738 
739 	*last_dth = NULL;
740 
741 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
742 
743 	if (txg_how >= TXG_INITIAL && txg_how != tx->tx_txg)
744 		return (ERESTART);
745 	if (tx->tx_err)
746 		return (tx->tx_err);
747 
748 	for (dth = list_head(&tx->tx_holds); dth;
749 	    dth = list_next(&tx->tx_holds, dth)) {
750 		dnode_t *dn = dth->dth_dnode;
751 		if (dn != NULL) {
752 			mutex_enter(&dn->dn_mtx);
753 			while (dn->dn_assigned_txg == tx->tx_txg - 1) {
754 				if (txg_how != TXG_WAIT) {
755 					mutex_exit(&dn->dn_mtx);
756 					return (ERESTART);
757 				}
758 				cv_wait(&dn->dn_notxholds, &dn->dn_mtx);
759 			}
760 			if (dn->dn_assigned_txg == 0) {
761 				ASSERT(dn->dn_assigned_tx == NULL);
762 				dn->dn_assigned_txg = tx->tx_txg;
763 				dn->dn_assigned_tx = tx;
764 			} else {
765 				ASSERT(dn->dn_assigned_txg == tx->tx_txg);
766 				if (dn->dn_assigned_tx != tx)
767 					dn->dn_assigned_tx = NULL;
768 			}
769 			(void) refcount_add(&dn->dn_tx_holds, tx);
770 			mutex_exit(&dn->dn_mtx);
771 		}
772 		*last_dth = dth;
773 		if (tx->tx_err)
774 			return (tx->tx_err);
775 	}
776 
777 	/*
778 	 * If a snapshot has been taken since we made our estimates,
779 	 * assume that we won't be able to free or overwrite anything.
780 	 */
781 	if (tx->tx_objset &&
782 	    dsl_dataset_prev_snap_txg(tx->tx_objset->os->os_dsl_dataset) >
783 	    tx->tx_lastsnap_txg) {
784 		tx->tx_space_towrite += tx->tx_space_tooverwrite;
785 		tx->tx_space_tooverwrite = 0;
786 		tx->tx_space_tofree = 0;
787 	}
788 
789 	/*
790 	 * Convert logical size to worst-case allocated size.
791 	 */
792 	fsize = spa_get_asize(tx->tx_pool->dp_spa, tx->tx_space_tooverwrite) +
793 	    tx->tx_space_tofree;
794 	lsize = tx->tx_space_towrite + tx->tx_space_tooverwrite;
795 	asize = spa_get_asize(tx->tx_pool->dp_spa, lsize);
796 	towrite = tx->tx_space_towrite;
797 	tx->tx_space_towrite = asize;
798 
799 	if (tx->tx_dir && asize != 0) {
800 		int err = dsl_dir_tempreserve_space(tx->tx_dir,
801 		    lsize, asize, fsize, &tx->tx_tempreserve_cookie, tx);
802 		if (err) {
803 			tx->tx_space_towrite = towrite;
804 			return (err);
805 		}
806 	}
807 
808 	return (0);
809 }
810 
811 static uint64_t
812 dmu_tx_unassign(dmu_tx_t *tx, dmu_tx_hold_t *last_dth)
813 {
814 	uint64_t txg = tx->tx_txg;
815 	dmu_tx_hold_t *dth;
816 
817 	ASSERT(txg != 0);
818 
819 	txg_rele_to_quiesce(&tx->tx_txgh);
820 
821 	for (dth = last_dth; dth; dth = list_prev(&tx->tx_holds, dth)) {
822 		dnode_t *dn = dth->dth_dnode;
823 
824 		if (dn == NULL)
825 			continue;
826 		mutex_enter(&dn->dn_mtx);
827 		ASSERT3U(dn->dn_assigned_txg, ==, txg);
828 
829 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
830 			dn->dn_assigned_txg = 0;
831 			dn->dn_assigned_tx = NULL;
832 			cv_broadcast(&dn->dn_notxholds);
833 		}
834 		mutex_exit(&dn->dn_mtx);
835 	}
836 
837 	txg_rele_to_sync(&tx->tx_txgh);
838 
839 	tx->tx_txg = 0;
840 	return (txg);
841 }
842 
843 /*
844  * Assign tx to a transaction group.  txg_how can be one of:
845  *
846  * (1)	TXG_WAIT.  If the current open txg is full, waits until there's
847  *	a new one.  This should be used when you're not holding locks.
848  *	If will only fail if we're truly out of space (or over quota).
849  *
850  * (2)	TXG_NOWAIT.  If we can't assign into the current open txg without
851  *	blocking, returns immediately with ERESTART.  This should be used
852  *	whenever you're holding locks.  On an ERESTART error, the caller
853  *	should drop locks, do a txg_wait_open(dp, 0), and try again.
854  *
855  * (3)	A specific txg.  Use this if you need to ensure that multiple
856  *	transactions all sync in the same txg.  Like TXG_NOWAIT, it
857  *	returns ERESTART if it can't assign you into the requested txg.
858  */
859 int
860 dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
861 {
862 	dmu_tx_hold_t *last_dth;
863 	int err;
864 
865 	ASSERT(tx->tx_txg == 0);
866 	ASSERT(txg_how != 0);
867 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
868 
869 	while ((err = dmu_tx_try_assign(tx, txg_how, &last_dth)) != 0) {
870 		uint64_t txg = dmu_tx_unassign(tx, last_dth);
871 
872 		if (err != ERESTART || txg_how != TXG_WAIT)
873 			return (err);
874 
875 		txg_wait_open(tx->tx_pool, txg + 1);
876 	}
877 
878 	txg_rele_to_quiesce(&tx->tx_txgh);
879 
880 	return (0);
881 }
882 
883 void
884 dmu_tx_willuse_space(dmu_tx_t *tx, int64_t delta)
885 {
886 	if (tx->tx_dir == NULL || delta == 0)
887 		return;
888 
889 	if (delta > 0) {
890 		ASSERT3U(refcount_count(&tx->tx_space_written) + delta, <=,
891 		    tx->tx_space_towrite);
892 		(void) refcount_add_many(&tx->tx_space_written, delta, NULL);
893 	} else {
894 		(void) refcount_add_many(&tx->tx_space_freed, -delta, NULL);
895 	}
896 }
897 
898 void
899 dmu_tx_commit(dmu_tx_t *tx)
900 {
901 	dmu_tx_hold_t *dth;
902 
903 	ASSERT(tx->tx_txg != 0);
904 
905 	while (dth = list_head(&tx->tx_holds)) {
906 		dnode_t *dn = dth->dth_dnode;
907 
908 		list_remove(&tx->tx_holds, dth);
909 		kmem_free(dth, sizeof (dmu_tx_hold_t));
910 		if (dn == NULL)
911 			continue;
912 		mutex_enter(&dn->dn_mtx);
913 		ASSERT3U(dn->dn_assigned_txg, ==, tx->tx_txg);
914 
915 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
916 			dn->dn_assigned_txg = 0;
917 			dn->dn_assigned_tx = NULL;
918 			cv_broadcast(&dn->dn_notxholds);
919 		}
920 		mutex_exit(&dn->dn_mtx);
921 		dnode_rele(dn, tx);
922 	}
923 
924 	if (tx->tx_dir && tx->tx_space_towrite > 0) {
925 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
926 	}
927 
928 	if (tx->tx_anyobj == FALSE)
929 		txg_rele_to_sync(&tx->tx_txgh);
930 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
931 	    tx->tx_space_towrite, refcount_count(&tx->tx_space_written),
932 	    tx->tx_space_tofree, refcount_count(&tx->tx_space_freed));
933 	refcount_destroy_many(&tx->tx_space_written,
934 	    refcount_count(&tx->tx_space_written));
935 	refcount_destroy_many(&tx->tx_space_freed,
936 	    refcount_count(&tx->tx_space_freed));
937 #ifdef ZFS_DEBUG
938 	if (tx->tx_debug_buf)
939 		kmem_free(tx->tx_debug_buf, 4096);
940 #endif
941 	kmem_free(tx, sizeof (dmu_tx_t));
942 }
943 
944 void
945 dmu_tx_abort(dmu_tx_t *tx)
946 {
947 	dmu_tx_hold_t *dth;
948 
949 	ASSERT(tx->tx_txg == 0);
950 
951 	while (dth = list_head(&tx->tx_holds)) {
952 		dnode_t *dn = dth->dth_dnode;
953 
954 		list_remove(&tx->tx_holds, dth);
955 		kmem_free(dth, sizeof (dmu_tx_hold_t));
956 		if (dn != NULL)
957 			dnode_rele(dn, tx);
958 	}
959 	refcount_destroy_many(&tx->tx_space_written,
960 	    refcount_count(&tx->tx_space_written));
961 	refcount_destroy_many(&tx->tx_space_freed,
962 	    refcount_count(&tx->tx_space_freed));
963 #ifdef ZFS_DEBUG
964 	if (tx->tx_debug_buf)
965 		kmem_free(tx->tx_debug_buf, 4096);
966 #endif
967 	kmem_free(tx, sizeof (dmu_tx_t));
968 }
969 
970 uint64_t
971 dmu_tx_get_txg(dmu_tx_t *tx)
972 {
973 	ASSERT(tx->tx_txg != 0);
974 	return (tx->tx_txg);
975 }
976