xref: /titanic_52/usr/src/uts/common/fs/zfs/dmu_tx.c (revision c7bf320592a8c4a952d9855186d4b38524c7484d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dbuf.h>
31 #include <sys/dmu_tx.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dsl_dataset.h> /* for dsl_dataset_block_freeable() */
34 #include <sys/dsl_dir.h> /* for dsl_dir_tempreserve_*() */
35 #include <sys/dsl_pool.h>
36 #include <sys/zap_impl.h>	/* for ZAP_BLOCK_SHIFT */
37 #include <sys/spa.h>
38 #include <sys/zfs_context.h>
39 
40 typedef void (*dmu_tx_hold_func_t)(dmu_tx_t *tx, struct dnode *dn,
41     uint64_t arg1, uint64_t arg2);
42 
43 #ifdef ZFS_DEBUG
44 int dmu_use_tx_debug_bufs = 1;
45 #endif
46 
47 dmu_tx_t *
48 dmu_tx_create_ds(dsl_dir_t *dd)
49 {
50 	dmu_tx_t *tx = kmem_zalloc(sizeof (dmu_tx_t), KM_SLEEP);
51 	tx->tx_dir = dd;
52 	if (dd)
53 		tx->tx_pool = dd->dd_pool;
54 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
55 	    offsetof(dmu_tx_hold_t, dth_node));
56 	refcount_create(&tx->tx_space_written);
57 	refcount_create(&tx->tx_space_freed);
58 	return (tx);
59 }
60 
61 dmu_tx_t *
62 dmu_tx_create(objset_t *os)
63 {
64 	dmu_tx_t *tx = dmu_tx_create_ds(os->os->os_dsl_dataset->ds_dir);
65 	tx->tx_objset = os;
66 	tx->tx_lastsnap_txg = dsl_dataset_prev_snap_txg(os->os->os_dsl_dataset);
67 	return (tx);
68 }
69 
70 dmu_tx_t *
71 dmu_tx_create_assigned(struct dsl_pool *dp, uint64_t txg)
72 {
73 	dmu_tx_t *tx = dmu_tx_create_ds(NULL);
74 
75 	ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
76 	tx->tx_pool = dp;
77 	tx->tx_txg = txg;
78 	tx->tx_anyobj = TRUE;
79 
80 	return (tx);
81 }
82 
83 int
84 dmu_tx_is_syncing(dmu_tx_t *tx)
85 {
86 	return (tx->tx_anyobj);
87 }
88 
89 int
90 dmu_tx_private_ok(dmu_tx_t *tx)
91 {
92 	return (tx->tx_anyobj);
93 }
94 
95 static void
96 dmu_tx_hold_object_impl(dmu_tx_t *tx, objset_t *os, uint64_t object,
97     enum dmu_tx_hold_type type, dmu_tx_hold_func_t func,
98     uint64_t arg1, uint64_t arg2)
99 {
100 	dmu_tx_hold_t *dth;
101 	dnode_t *dn = NULL;
102 	int err;
103 
104 	if (object != DMU_NEW_OBJECT) {
105 		err = dnode_hold(os->os, object, tx, &dn);
106 		if (err) {
107 			tx->tx_err = err;
108 			return;
109 		}
110 
111 		if (err == 0 && tx->tx_txg != 0) {
112 			mutex_enter(&dn->dn_mtx);
113 			/*
114 			 * dn->dn_assigned_txg == tx->tx_txg doesn't pose a
115 			 * problem, but there's no way for it to happen (for
116 			 * now, at least).
117 			 */
118 			ASSERT(dn->dn_assigned_txg == 0);
119 			ASSERT(dn->dn_assigned_tx == NULL);
120 			dn->dn_assigned_txg = tx->tx_txg;
121 			dn->dn_assigned_tx = tx;
122 			(void) refcount_add(&dn->dn_tx_holds, tx);
123 			mutex_exit(&dn->dn_mtx);
124 		}
125 	}
126 
127 	dth = kmem_zalloc(sizeof (dmu_tx_hold_t), KM_SLEEP);
128 	dth->dth_dnode = dn;
129 	dth->dth_type = type;
130 	dth->dth_arg1 = arg1;
131 	dth->dth_arg2 = arg2;
132 	list_insert_tail(&tx->tx_holds, dth);
133 
134 	if (func)
135 		func(tx, dn, arg1, arg2);
136 }
137 
138 void
139 dmu_tx_add_new_object(dmu_tx_t *tx, objset_t *os, uint64_t object)
140 {
141 	/*
142 	 * If we're syncing, they can manipulate any object anyhow, and
143 	 * the hold on the dnode_t can cause problems.
144 	 */
145 	if (!dmu_tx_is_syncing(tx)) {
146 		dmu_tx_hold_object_impl(tx, os, object, THT_NEWOBJECT,
147 		    NULL, 0, 0);
148 	}
149 }
150 
151 static int
152 dmu_tx_check_ioerr(zio_t *zio, dnode_t *dn, int level, uint64_t blkid)
153 {
154 	int err;
155 	dmu_buf_impl_t *db;
156 
157 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
158 	db = dbuf_hold_level(dn, level, blkid, FTAG);
159 	rw_exit(&dn->dn_struct_rwlock);
160 	if (db == NULL)
161 		return (EIO);
162 	err = dbuf_read(db, zio, DB_RF_CANFAIL);
163 	dbuf_rele(db, FTAG);
164 	return (err);
165 }
166 
167 /* ARGSUSED */
168 static void
169 dmu_tx_count_write(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
170 {
171 	uint64_t start, end, i, space;
172 	int min_bs, max_bs, min_ibs, max_ibs, epbs, bits;
173 
174 	if (len == 0)
175 		return;
176 
177 	min_bs = SPA_MINBLOCKSHIFT;
178 	max_bs = SPA_MAXBLOCKSHIFT;
179 	min_ibs = DN_MIN_INDBLKSHIFT;
180 	max_ibs = DN_MAX_INDBLKSHIFT;
181 
182 	/*
183 	 * For i/o error checking, read the first and last level-0
184 	 * blocks, and all the level-1 blocks.  We needn't do this on
185 	 * the meta-dnode, because we've already read it in.
186 	 */
187 
188 	if (dn && dn->dn_object != DMU_META_DNODE_OBJECT) {
189 		int err;
190 
191 		if (dn->dn_maxblkid == 0) {
192 			err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
193 			if (err) {
194 				tx->tx_err = err;
195 				return;
196 			}
197 		} else {
198 			zio_t *zio = zio_root(tx->tx_pool->dp_spa,
199 			    NULL, NULL, ZIO_FLAG_CANFAIL);
200 
201 			/* first level-0 block */
202 			start = off/dn->dn_datablksz;
203 			err = dmu_tx_check_ioerr(zio, dn, 0, start);
204 			if (err) {
205 				tx->tx_err = err;
206 				return;
207 			}
208 
209 			/* last level-0 block */
210 			end = (off+len)/dn->dn_datablksz;
211 			if (end != start) {
212 				err = dmu_tx_check_ioerr(zio, dn, 0, end);
213 				if (err) {
214 					tx->tx_err = err;
215 					return;
216 				}
217 			}
218 
219 			/* level-1 blocks */
220 			if (dn->dn_nlevels > 1) {
221 				start >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
222 				end >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
223 				for (i = start+1; i < end; i++) {
224 					err = dmu_tx_check_ioerr(zio, dn, 1, i);
225 					if (err) {
226 						tx->tx_err = err;
227 						return;
228 					}
229 				}
230 			}
231 
232 			err = zio_wait(zio);
233 			if (err) {
234 				tx->tx_err = err;
235 				return;
236 			}
237 		}
238 	}
239 
240 	/*
241 	 * If there's more than one block, the blocksize can't change,
242 	 * so we can make a more precise estimate.  Alternatively,
243 	 * if the dnode's ibs is larger than max_ibs, always use that.
244 	 * This ensures that if we reduce DN_MAX_INDBLKSHIFT,
245 	 * the code will still work correctly on existing pools.
246 	 */
247 	if (dn && (dn->dn_maxblkid != 0 || dn->dn_indblkshift > max_ibs)) {
248 		min_ibs = max_ibs = dn->dn_indblkshift;
249 		if (dn->dn_datablkshift != 0)
250 			min_bs = max_bs = dn->dn_datablkshift;
251 	}
252 
253 	/*
254 	 * 'end' is the last thing we will access, not one past.
255 	 * This way we won't overflow when accessing the last byte.
256 	 */
257 	start = P2ALIGN(off, 1ULL << max_bs);
258 	end = P2ROUNDUP(off + len, 1ULL << max_bs) - 1;
259 	space = end - start + 1;
260 
261 	start >>= min_bs;
262 	end >>= min_bs;
263 
264 	epbs = min_ibs - SPA_BLKPTRSHIFT;
265 
266 	/*
267 	 * The object contains at most 2^(64 - min_bs) blocks,
268 	 * and each indirect level maps 2^epbs.
269 	 */
270 	for (bits = 64 - min_bs; bits >= 0; bits -= epbs) {
271 		start >>= epbs;
272 		end >>= epbs;
273 		/*
274 		 * If we increase the number of levels of indirection,
275 		 * we'll need new blkid=0 indirect blocks.  If start == 0,
276 		 * we're already accounting for that blocks; and if end == 0,
277 		 * we can't increase the number of levels beyond that.
278 		 */
279 		if (start != 0 && end != 0)
280 			space += 1ULL << max_ibs;
281 		space += (end - start + 1) << max_ibs;
282 	}
283 
284 	ASSERT(space < 2 * DMU_MAX_ACCESS);
285 
286 	tx->tx_space_towrite += space;
287 }
288 
289 static void
290 dmu_tx_count_dnode(dmu_tx_t *tx, dnode_t *dn)
291 {
292 	dnode_t *mdn = tx->tx_objset->os->os_meta_dnode;
293 	uint64_t object = dn ? dn->dn_object : DN_MAX_OBJECT - 1;
294 	uint64_t pre_write_space;
295 
296 	ASSERT(object < DN_MAX_OBJECT);
297 	pre_write_space = tx->tx_space_towrite;
298 	dmu_tx_count_write(tx, mdn, object << DNODE_SHIFT, 1 << DNODE_SHIFT);
299 	if (dn && dn->dn_dbuf->db_blkptr &&
300 	    dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
301 	    dn->dn_dbuf->db_blkptr->blk_birth)) {
302 		tx->tx_space_tooverwrite +=
303 			tx->tx_space_towrite - pre_write_space;
304 		tx->tx_space_towrite = pre_write_space;
305 	}
306 }
307 
308 /* ARGSUSED */
309 static void
310 dmu_tx_hold_write_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
311 {
312 	dmu_tx_count_write(tx, dn, off, len);
313 	dmu_tx_count_dnode(tx, dn);
314 }
315 
316 void
317 dmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
318 {
319 	ASSERT(tx->tx_txg == 0);
320 	ASSERT(len < DMU_MAX_ACCESS);
321 	ASSERT(UINT64_MAX - off >= len - 1);
322 
323 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_WRITE,
324 	    dmu_tx_hold_write_impl, off, len);
325 }
326 
327 static void
328 dmu_tx_count_free(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
329 {
330 	uint64_t blkid, nblks;
331 	uint64_t space = 0;
332 	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
333 
334 	/*
335 	 * We don't use any locking to check for dirtyness because it's
336 	 * OK if we get stale data -- the dnode may become dirty
337 	 * immediately after our check anyway.  This is just a means to
338 	 * avoid the expensive count when we aren't sure we need it.  We
339 	 * need to be able to deal with a dirty dnode.
340 	 */
341 	if ((uintptr_t)dn->dn_assigned_tx |
342 	    list_link_active(&dn->dn_dirty_link[0]) |
343 	    list_link_active(&dn->dn_dirty_link[1]) |
344 	    list_link_active(&dn->dn_dirty_link[2]) |
345 	    list_link_active(&dn->dn_dirty_link[3]))
346 		return;
347 
348 	/*
349 	 * the struct_rwlock protects us against dn_phys->dn_nlevels
350 	 * changing, in case (against all odds) we manage to dirty &
351 	 * sync out the changes after we check for being dirty.
352 	 * also, dbuf_hold_impl() wants us to have the struct_rwlock.
353 	 *
354 	 * It's fine to use dn_datablkshift rather than the dn_phys
355 	 * equivalent because if it is changing, maxblkid==0 and we will
356 	 * bail.
357 	 */
358 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
359 	if (dn->dn_phys->dn_maxblkid == 0) {
360 		if (off == 0 && len >= dn->dn_datablksz) {
361 			blkid = 0;
362 			nblks = 1;
363 		} else {
364 			rw_exit(&dn->dn_struct_rwlock);
365 			return;
366 		}
367 	} else {
368 		blkid = off >> dn->dn_datablkshift;
369 		nblks = (off + len) >> dn->dn_datablkshift;
370 
371 		if (blkid >= dn->dn_phys->dn_maxblkid) {
372 			rw_exit(&dn->dn_struct_rwlock);
373 			return;
374 		}
375 		if (blkid + nblks > dn->dn_phys->dn_maxblkid)
376 			nblks = dn->dn_phys->dn_maxblkid - blkid;
377 
378 		/* don't bother after 128,000 blocks */
379 		nblks = MIN(nblks, 128*1024);
380 	}
381 
382 	if (dn->dn_phys->dn_nlevels == 1) {
383 		int i;
384 		for (i = 0; i < nblks; i++) {
385 			blkptr_t *bp = dn->dn_phys->dn_blkptr;
386 			ASSERT3U(blkid + i, <, dn->dn_phys->dn_nblkptr);
387 			bp += blkid + i;
388 			if (dsl_dataset_block_freeable(ds, bp->blk_birth)) {
389 				dprintf_bp(bp, "can free old%s", "");
390 				space += BP_GET_ASIZE(bp);
391 			}
392 		}
393 		nblks = 0;
394 	}
395 
396 	while (nblks) {
397 		dmu_buf_impl_t *dbuf;
398 		int err, epbs, blkoff, tochk;
399 
400 		epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
401 		blkoff = P2PHASE(blkid, 1<<epbs);
402 		tochk = MIN((1<<epbs) - blkoff, nblks);
403 
404 		err = dbuf_hold_impl(dn, 1, blkid >> epbs, TRUE, FTAG, &dbuf);
405 		if (err == 0) {
406 			int i;
407 			blkptr_t *bp;
408 
409 			err = dbuf_read(dbuf, NULL,
410 			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL);
411 			if (err != 0) {
412 				tx->tx_err = err;
413 				dbuf_rele(dbuf, FTAG);
414 				break;
415 			}
416 
417 			bp = dbuf->db.db_data;
418 			bp += blkoff;
419 
420 			for (i = 0; i < tochk; i++) {
421 				if (dsl_dataset_block_freeable(ds,
422 				    bp[i].blk_birth)) {
423 					dprintf_bp(&bp[i],
424 					    "can free old%s", "");
425 					space += BP_GET_ASIZE(&bp[i]);
426 				}
427 			}
428 			dbuf_rele(dbuf, FTAG);
429 		}
430 		if (err != 0 && err != ENOENT) {
431 			tx->tx_err = err;
432 			break;
433 		}
434 
435 		blkid += tochk;
436 		nblks -= tochk;
437 	}
438 	rw_exit(&dn->dn_struct_rwlock);
439 
440 	tx->tx_space_tofree += space;
441 }
442 
443 static void
444 dmu_tx_hold_free_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
445 {
446 	uint64_t start, end, i;
447 	int err, shift;
448 	zio_t *zio;
449 
450 	/* first block */
451 	if (off != 0 /* || dn->dn_maxblkid == 0 */)
452 		dmu_tx_count_write(tx, dn, off, 1);
453 	/* last block */
454 	if (len != DMU_OBJECT_END)
455 		dmu_tx_count_write(tx, dn, off+len, 1);
456 
457 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
458 		return;
459 	if (len == DMU_OBJECT_END)
460 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
461 
462 	/*
463 	 * For i/o error checking, read the first and last level-0
464 	 * blocks, and all the level-1 blocks.  The above count_write's
465 	 * will take care of the level-0 blocks.
466 	 */
467 	shift = dn->dn_datablkshift + dn->dn_indblkshift - SPA_BLKPTRSHIFT;
468 	start = off >> shift;
469 	end = dn->dn_datablkshift ? ((off+len) >> shift) : 0;
470 
471 	zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
472 	for (i = start+1; i < end; i++) {
473 		uint64_t ibyte = i << shift;
474 		err = dnode_next_offset(dn, FALSE, &ibyte, 2, 1);
475 		i = ibyte >> shift;
476 		if (err == ESRCH)
477 			break;
478 		if (err) {
479 			tx->tx_err = err;
480 			return;
481 		}
482 
483 		err = dmu_tx_check_ioerr(zio, dn, 1, i);
484 		if (err) {
485 			tx->tx_err = err;
486 			return;
487 		}
488 	}
489 	err = zio_wait(zio);
490 	if (err) {
491 		tx->tx_err = err;
492 		return;
493 	}
494 
495 	dmu_tx_count_dnode(tx, dn);
496 	dmu_tx_count_free(tx, dn, off, len);
497 }
498 
499 void
500 dmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off, uint64_t len)
501 {
502 	ASSERT(tx->tx_txg == 0);
503 
504 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_FREE,
505 	    dmu_tx_hold_free_impl, off, len);
506 }
507 
508 /* ARGSUSED */
509 static void
510 dmu_tx_hold_zap_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t add, uint64_t iname)
511 {
512 	uint64_t nblocks;
513 	int epbs, err;
514 	char *name = (char *)(uintptr_t)iname;
515 
516 	dmu_tx_count_dnode(tx, dn);
517 
518 	if (dn == NULL) {
519 		/*
520 		 * We will be able to fit a new object's entries into one leaf
521 		 * block.  So there will be at most 2 blocks total,
522 		 * including the header block.
523 		 */
524 		dmu_tx_count_write(tx, dn, 0, 2 << fzap_default_block_shift);
525 		return;
526 	}
527 
528 	ASSERT3P(dmu_ot[dn->dn_type].ot_byteswap, ==, zap_byteswap);
529 
530 	if (dn->dn_maxblkid == 0 && !add) {
531 		/*
532 		 * If there is only one block  (i.e. this is a micro-zap)
533 		 * and we are not adding anything, the accounting is simple.
534 		 */
535 		err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
536 		if (err) {
537 			tx->tx_err = err;
538 			return;
539 		}
540 
541 		if (dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
542 		    dn->dn_phys->dn_blkptr[0].blk_birth))
543 			tx->tx_space_tooverwrite += dn->dn_datablksz;
544 		else
545 			tx->tx_space_towrite += dn->dn_datablksz;
546 		return;
547 	}
548 
549 	if (dn->dn_maxblkid > 0 && name) {
550 		/*
551 		 * access the name in this fat-zap so that we'll check
552 		 * for i/o errors to the leaf blocks, etc.
553 		 */
554 		err = zap_lookup(&dn->dn_objset->os, dn->dn_object, name,
555 		    8, 0, NULL);
556 		if (err == EIO) {
557 			tx->tx_err = err;
558 			return;
559 		}
560 	}
561 
562 	/*
563 	 * 3 blocks overwritten: target leaf, ptrtbl block, header block
564 	 * 3 new blocks written if adding: new split leaf, 2 grown ptrtbl blocks
565 	 */
566 	dmu_tx_count_write(tx, dn, dn->dn_maxblkid * dn->dn_datablksz,
567 	    (3 + add ? 3 : 0) << dn->dn_datablkshift);
568 
569 	/*
570 	 * If the modified blocks are scattered to the four winds,
571 	 * we'll have to modify an indirect twig for each.
572 	 */
573 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
574 	for (nblocks = dn->dn_maxblkid >> epbs; nblocks != 0; nblocks >>= epbs)
575 		tx->tx_space_towrite += 3 << dn->dn_indblkshift;
576 }
577 
578 void
579 dmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int add, char *name)
580 {
581 	ASSERT(tx->tx_txg == 0);
582 
583 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_ZAP,
584 	    dmu_tx_hold_zap_impl, add, (uintptr_t)name);
585 }
586 
587 void
588 dmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
589 {
590 	ASSERT(tx->tx_txg == 0);
591 
592 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_BONUS,
593 	    dmu_tx_hold_write_impl, 0, 0);
594 }
595 
596 
597 /* ARGSUSED */
598 static void
599 dmu_tx_hold_space_impl(dmu_tx_t *tx, dnode_t *dn,
600     uint64_t space, uint64_t unused)
601 {
602 	tx->tx_space_towrite += space;
603 }
604 
605 void
606 dmu_tx_hold_space(dmu_tx_t *tx, uint64_t space)
607 {
608 	ASSERT(tx->tx_txg == 0);
609 
610 	dmu_tx_hold_object_impl(tx, tx->tx_objset, DMU_NEW_OBJECT, THT_SPACE,
611 	    dmu_tx_hold_space_impl, space, 0);
612 }
613 
614 int
615 dmu_tx_holds(dmu_tx_t *tx, uint64_t object)
616 {
617 	dmu_tx_hold_t *dth;
618 	int holds = 0;
619 
620 	/*
621 	 * By asserting that the tx is assigned, we're counting the
622 	 * number of dn_tx_holds, which is the same as the number of
623 	 * dn_holds.  Otherwise, we'd be counting dn_holds, but
624 	 * dn_tx_holds could be 0.
625 	 */
626 	ASSERT(tx->tx_txg != 0);
627 
628 	/* if (tx->tx_anyobj == TRUE) */
629 		/* return (0); */
630 
631 	for (dth = list_head(&tx->tx_holds); dth;
632 	    dth = list_next(&tx->tx_holds, dth)) {
633 		if (dth->dth_dnode && dth->dth_dnode->dn_object == object)
634 			holds++;
635 	}
636 
637 	return (holds);
638 }
639 
640 #ifdef ZFS_DEBUG
641 void
642 dmu_tx_dirty_buf(dmu_tx_t *tx, dmu_buf_impl_t *db)
643 {
644 	dmu_tx_hold_t *dth;
645 	int match_object = FALSE, match_offset = FALSE;
646 	dnode_t *dn = db->db_dnode;
647 
648 	ASSERT(tx->tx_txg != 0);
649 	ASSERT(tx->tx_objset == NULL || dn->dn_objset == tx->tx_objset->os);
650 	ASSERT3U(dn->dn_object, ==, db->db.db_object);
651 
652 	if (tx->tx_anyobj)
653 		return;
654 
655 	/* XXX No checking on the meta dnode for now */
656 	if (db->db.db_object == DMU_META_DNODE_OBJECT)
657 		return;
658 
659 	for (dth = list_head(&tx->tx_holds); dth;
660 	    dth = list_next(&tx->tx_holds, dth)) {
661 		ASSERT(dn == NULL || dn->dn_assigned_txg == tx->tx_txg);
662 		if (dth->dth_dnode == dn && dth->dth_type != THT_NEWOBJECT)
663 			match_object = TRUE;
664 		if (dth->dth_dnode == NULL || dth->dth_dnode == dn) {
665 			int datablkshift = dn->dn_datablkshift ?
666 			    dn->dn_datablkshift : SPA_MAXBLOCKSHIFT;
667 			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
668 			int shift = datablkshift + epbs * db->db_level;
669 			uint64_t beginblk = shift >= 64 ? 0 :
670 			    (dth->dth_arg1 >> shift);
671 			uint64_t endblk = shift >= 64 ? 0 :
672 			    ((dth->dth_arg1 + dth->dth_arg2 - 1) >> shift);
673 			uint64_t blkid = db->db_blkid;
674 
675 			/* XXX dth_arg2 better not be zero... */
676 
677 			dprintf("found dth type %x beginblk=%llx endblk=%llx\n",
678 			    dth->dth_type, beginblk, endblk);
679 
680 			switch (dth->dth_type) {
681 			case THT_WRITE:
682 				if (blkid >= beginblk && blkid <= endblk)
683 					match_offset = TRUE;
684 				/*
685 				 * We will let this hold work for the bonus
686 				 * buffer so that we don't need to hold it
687 				 * when creating a new object.
688 				 */
689 				if (blkid == DB_BONUS_BLKID)
690 					match_offset = TRUE;
691 				/*
692 				 * They might have to increase nlevels,
693 				 * thus dirtying the new TLIBs.  Or the
694 				 * might have to change the block size,
695 				 * thus dirying the new lvl=0 blk=0.
696 				 */
697 				if (blkid == 0)
698 					match_offset = TRUE;
699 				break;
700 			case THT_FREE:
701 				if (blkid == beginblk &&
702 				    (dth->dth_arg1 != 0 ||
703 				    dn->dn_maxblkid == 0))
704 					match_offset = TRUE;
705 				if (blkid == endblk &&
706 				    dth->dth_arg2 != DMU_OBJECT_END)
707 					match_offset = TRUE;
708 				break;
709 			case THT_BONUS:
710 				if (blkid == DB_BONUS_BLKID)
711 					match_offset = TRUE;
712 				break;
713 			case THT_ZAP:
714 				match_offset = TRUE;
715 				break;
716 			case THT_NEWOBJECT:
717 				match_object = TRUE;
718 				break;
719 			default:
720 				ASSERT(!"bad dth_type");
721 			}
722 		}
723 		if (match_object && match_offset)
724 			return;
725 	}
726 	panic("dirtying dbuf obj=%llx lvl=%u blkid=%llx but not tx_held\n",
727 	    (u_longlong_t)db->db.db_object, db->db_level,
728 	    (u_longlong_t)db->db_blkid);
729 }
730 #endif
731 
732 static int
733 dmu_tx_try_assign(dmu_tx_t *tx, uint64_t txg_how, dmu_tx_hold_t **last_dth)
734 {
735 	dmu_tx_hold_t *dth;
736 	uint64_t lsize, asize, fsize, towrite;
737 
738 	*last_dth = NULL;
739 
740 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
741 
742 	if (txg_how >= TXG_INITIAL && txg_how != tx->tx_txg)
743 		return (ERESTART);
744 	if (tx->tx_err)
745 		return (tx->tx_err);
746 
747 	for (dth = list_head(&tx->tx_holds); dth;
748 	    dth = list_next(&tx->tx_holds, dth)) {
749 		dnode_t *dn = dth->dth_dnode;
750 		if (dn != NULL) {
751 			mutex_enter(&dn->dn_mtx);
752 			while (dn->dn_assigned_txg == tx->tx_txg - 1) {
753 				if (txg_how != TXG_WAIT) {
754 					mutex_exit(&dn->dn_mtx);
755 					return (ERESTART);
756 				}
757 				cv_wait(&dn->dn_notxholds, &dn->dn_mtx);
758 			}
759 			if (dn->dn_assigned_txg == 0) {
760 				ASSERT(dn->dn_assigned_tx == NULL);
761 				dn->dn_assigned_txg = tx->tx_txg;
762 				dn->dn_assigned_tx = tx;
763 			} else {
764 				ASSERT(dn->dn_assigned_txg == tx->tx_txg);
765 				if (dn->dn_assigned_tx != tx)
766 					dn->dn_assigned_tx = NULL;
767 			}
768 			(void) refcount_add(&dn->dn_tx_holds, tx);
769 			mutex_exit(&dn->dn_mtx);
770 		}
771 		*last_dth = dth;
772 		if (tx->tx_err)
773 			return (tx->tx_err);
774 	}
775 
776 	/*
777 	 * If a snapshot has been taken since we made our estimates,
778 	 * assume that we won't be able to free or overwrite anything.
779 	 */
780 	if (tx->tx_objset &&
781 	    dsl_dataset_prev_snap_txg(tx->tx_objset->os->os_dsl_dataset) >
782 	    tx->tx_lastsnap_txg) {
783 		tx->tx_space_towrite += tx->tx_space_tooverwrite;
784 		tx->tx_space_tooverwrite = 0;
785 		tx->tx_space_tofree = 0;
786 	}
787 
788 	/*
789 	 * Convert logical size to worst-case allocated size.
790 	 */
791 	fsize = spa_get_asize(tx->tx_pool->dp_spa, tx->tx_space_tooverwrite) +
792 	    tx->tx_space_tofree;
793 	lsize = tx->tx_space_towrite + tx->tx_space_tooverwrite;
794 	asize = spa_get_asize(tx->tx_pool->dp_spa, lsize);
795 	towrite = tx->tx_space_towrite;
796 	tx->tx_space_towrite = asize;
797 
798 	if (tx->tx_dir && asize != 0) {
799 		int err = dsl_dir_tempreserve_space(tx->tx_dir,
800 		    lsize, asize, fsize, &tx->tx_tempreserve_cookie, tx);
801 		if (err) {
802 			tx->tx_space_towrite = towrite;
803 			return (err);
804 		}
805 	}
806 
807 	return (0);
808 }
809 
810 static uint64_t
811 dmu_tx_unassign(dmu_tx_t *tx, dmu_tx_hold_t *last_dth)
812 {
813 	uint64_t txg = tx->tx_txg;
814 	dmu_tx_hold_t *dth;
815 
816 	ASSERT(txg != 0);
817 
818 	txg_rele_to_quiesce(&tx->tx_txgh);
819 
820 	for (dth = last_dth; dth; dth = list_prev(&tx->tx_holds, dth)) {
821 		dnode_t *dn = dth->dth_dnode;
822 
823 		if (dn == NULL)
824 			continue;
825 		mutex_enter(&dn->dn_mtx);
826 		ASSERT3U(dn->dn_assigned_txg, ==, txg);
827 
828 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
829 			dn->dn_assigned_txg = 0;
830 			dn->dn_assigned_tx = NULL;
831 			cv_broadcast(&dn->dn_notxholds);
832 		}
833 		mutex_exit(&dn->dn_mtx);
834 	}
835 
836 	txg_rele_to_sync(&tx->tx_txgh);
837 
838 	tx->tx_txg = 0;
839 	return (txg);
840 }
841 
842 /*
843  * Assign tx to a transaction group.  txg_how can be one of:
844  *
845  * (1)	TXG_WAIT.  If the current open txg is full, waits until there's
846  *	a new one.  This should be used when you're not holding locks.
847  *	If will only fail if we're truly out of space (or over quota).
848  *
849  * (2)	TXG_NOWAIT.  If we can't assign into the current open txg without
850  *	blocking, returns immediately with ERESTART.  This should be used
851  *	whenever you're holding locks.  On an ERESTART error, the caller
852  *	should drop locks, do a txg_wait_open(dp, 0), and try again.
853  *
854  * (3)	A specific txg.  Use this if you need to ensure that multiple
855  *	transactions all sync in the same txg.  Like TXG_NOWAIT, it
856  *	returns ERESTART if it can't assign you into the requested txg.
857  */
858 int
859 dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
860 {
861 	dmu_tx_hold_t *last_dth;
862 	int err;
863 
864 	ASSERT(tx->tx_txg == 0);
865 	ASSERT(txg_how != 0);
866 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
867 
868 	while ((err = dmu_tx_try_assign(tx, txg_how, &last_dth)) != 0) {
869 		uint64_t txg = dmu_tx_unassign(tx, last_dth);
870 
871 		if (err != ERESTART || txg_how != TXG_WAIT)
872 			return (err);
873 
874 		txg_wait_open(tx->tx_pool, txg + 1);
875 	}
876 
877 	txg_rele_to_quiesce(&tx->tx_txgh);
878 
879 	return (0);
880 }
881 
882 void
883 dmu_tx_willuse_space(dmu_tx_t *tx, int64_t delta)
884 {
885 	if (tx->tx_dir == NULL || delta == 0)
886 		return;
887 
888 	if (delta > 0) {
889 		ASSERT3U(refcount_count(&tx->tx_space_written) + delta, <=,
890 		    tx->tx_space_towrite);
891 		(void) refcount_add_many(&tx->tx_space_written, delta, NULL);
892 	} else {
893 		(void) refcount_add_many(&tx->tx_space_freed, -delta, NULL);
894 	}
895 }
896 
897 void
898 dmu_tx_commit(dmu_tx_t *tx)
899 {
900 	dmu_tx_hold_t *dth;
901 
902 	ASSERT(tx->tx_txg != 0);
903 
904 	while (dth = list_head(&tx->tx_holds)) {
905 		dnode_t *dn = dth->dth_dnode;
906 
907 		list_remove(&tx->tx_holds, dth);
908 		kmem_free(dth, sizeof (dmu_tx_hold_t));
909 		if (dn == NULL)
910 			continue;
911 		mutex_enter(&dn->dn_mtx);
912 		ASSERT3U(dn->dn_assigned_txg, ==, tx->tx_txg);
913 
914 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
915 			dn->dn_assigned_txg = 0;
916 			dn->dn_assigned_tx = NULL;
917 			cv_broadcast(&dn->dn_notxholds);
918 		}
919 		mutex_exit(&dn->dn_mtx);
920 		dnode_rele(dn, tx);
921 	}
922 
923 	if (tx->tx_dir && tx->tx_space_towrite > 0) {
924 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
925 	}
926 
927 	if (tx->tx_anyobj == FALSE)
928 		txg_rele_to_sync(&tx->tx_txgh);
929 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
930 	    tx->tx_space_towrite, refcount_count(&tx->tx_space_written),
931 	    tx->tx_space_tofree, refcount_count(&tx->tx_space_freed));
932 	refcount_destroy_many(&tx->tx_space_written,
933 	    refcount_count(&tx->tx_space_written));
934 	refcount_destroy_many(&tx->tx_space_freed,
935 	    refcount_count(&tx->tx_space_freed));
936 #ifdef ZFS_DEBUG
937 	if (tx->tx_debug_buf)
938 		kmem_free(tx->tx_debug_buf, 4096);
939 #endif
940 	kmem_free(tx, sizeof (dmu_tx_t));
941 }
942 
943 void
944 dmu_tx_abort(dmu_tx_t *tx)
945 {
946 	dmu_tx_hold_t *dth;
947 
948 	ASSERT(tx->tx_txg == 0);
949 
950 	while (dth = list_head(&tx->tx_holds)) {
951 		dnode_t *dn = dth->dth_dnode;
952 
953 		list_remove(&tx->tx_holds, dth);
954 		kmem_free(dth, sizeof (dmu_tx_hold_t));
955 		if (dn != NULL)
956 			dnode_rele(dn, tx);
957 	}
958 	refcount_destroy_many(&tx->tx_space_written,
959 	    refcount_count(&tx->tx_space_written));
960 	refcount_destroy_many(&tx->tx_space_freed,
961 	    refcount_count(&tx->tx_space_freed));
962 #ifdef ZFS_DEBUG
963 	if (tx->tx_debug_buf)
964 		kmem_free(tx->tx_debug_buf, 4096);
965 #endif
966 	kmem_free(tx, sizeof (dmu_tx_t));
967 }
968 
969 uint64_t
970 dmu_tx_get_txg(dmu_tx_t *tx)
971 {
972 	ASSERT(tx->tx_txg != 0);
973 	return (tx->tx_txg);
974 }
975