xref: /titanic_51/usr/src/uts/common/fs/zfs/dmu_tx.c (revision 99653d4ee642c6528e88224f12409a5f23060994)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dbuf.h>
31 #include <sys/dmu_tx.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dsl_dataset.h> /* for dsl_dataset_block_freeable() */
34 #include <sys/dsl_dir.h> /* for dsl_dir_tempreserve_*() */
35 #include <sys/dsl_pool.h>
36 #include <sys/zap_impl.h>	/* for ZAP_BLOCK_SHIFT */
37 #include <sys/spa.h>
38 #include <sys/zfs_context.h>
39 
40 typedef void (*dmu_tx_hold_func_t)(dmu_tx_t *tx, struct dnode *dn,
41     uint64_t arg1, uint64_t arg2);
42 
43 #ifdef ZFS_DEBUG
44 int dmu_use_tx_debug_bufs = 1;
45 #endif
46 
47 dmu_tx_t *
48 dmu_tx_create_ds(dsl_dir_t *dd)
49 {
50 	dmu_tx_t *tx = kmem_zalloc(sizeof (dmu_tx_t), KM_SLEEP);
51 	tx->tx_dir = dd;
52 	if (dd)
53 		tx->tx_pool = dd->dd_pool;
54 	list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
55 	    offsetof(dmu_tx_hold_t, dth_node));
56 	refcount_create(&tx->tx_space_written);
57 	refcount_create(&tx->tx_space_freed);
58 	return (tx);
59 }
60 
61 dmu_tx_t *
62 dmu_tx_create(objset_t *os)
63 {
64 	dmu_tx_t *tx = dmu_tx_create_ds(os->os->os_dsl_dataset->ds_dir);
65 	tx->tx_objset = os;
66 	tx->tx_lastsnap_txg = dsl_dataset_prev_snap_txg(os->os->os_dsl_dataset);
67 	return (tx);
68 }
69 
70 dmu_tx_t *
71 dmu_tx_create_assigned(struct dsl_pool *dp, uint64_t txg)
72 {
73 	dmu_tx_t *tx = dmu_tx_create_ds(NULL);
74 
75 	ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
76 	tx->tx_pool = dp;
77 	tx->tx_txg = txg;
78 	tx->tx_anyobj = TRUE;
79 
80 	return (tx);
81 }
82 
83 int
84 dmu_tx_is_syncing(dmu_tx_t *tx)
85 {
86 	return (tx->tx_anyobj);
87 }
88 
89 int
90 dmu_tx_private_ok(dmu_tx_t *tx)
91 {
92 	return (tx->tx_anyobj);
93 }
94 
95 static void
96 dmu_tx_hold_object_impl(dmu_tx_t *tx, objset_t *os, uint64_t object,
97     enum dmu_tx_hold_type type, dmu_tx_hold_func_t func,
98     uint64_t arg1, uint64_t arg2)
99 {
100 	dmu_tx_hold_t *dth;
101 	dnode_t *dn = NULL;
102 	int err;
103 
104 	if (object != DMU_NEW_OBJECT) {
105 		err = dnode_hold(os->os, object, tx, &dn);
106 		if (err) {
107 			tx->tx_err = err;
108 			return;
109 		}
110 
111 		if (err == 0 && tx->tx_txg != 0) {
112 			mutex_enter(&dn->dn_mtx);
113 			/*
114 			 * dn->dn_assigned_txg == tx->tx_txg doesn't pose a
115 			 * problem, but there's no way for it to happen (for
116 			 * now, at least).
117 			 */
118 			ASSERT(dn->dn_assigned_txg == 0);
119 			ASSERT(dn->dn_assigned_tx == NULL);
120 			dn->dn_assigned_txg = tx->tx_txg;
121 			dn->dn_assigned_tx = tx;
122 			(void) refcount_add(&dn->dn_tx_holds, tx);
123 			mutex_exit(&dn->dn_mtx);
124 		}
125 	}
126 
127 	dth = kmem_zalloc(sizeof (dmu_tx_hold_t), KM_SLEEP);
128 	dth->dth_dnode = dn;
129 	dth->dth_type = type;
130 	dth->dth_arg1 = arg1;
131 	dth->dth_arg2 = arg2;
132 	list_insert_tail(&tx->tx_holds, dth);
133 
134 	if (func)
135 		func(tx, dn, arg1, arg2);
136 }
137 
138 void
139 dmu_tx_add_new_object(dmu_tx_t *tx, objset_t *os, uint64_t object)
140 {
141 	/*
142 	 * If we're syncing, they can manipulate any object anyhow, and
143 	 * the hold on the dnode_t can cause problems.
144 	 */
145 	if (!dmu_tx_is_syncing(tx)) {
146 		dmu_tx_hold_object_impl(tx, os, object, THT_NEWOBJECT,
147 		    NULL, 0, 0);
148 	}
149 }
150 
151 static int
152 dmu_tx_check_ioerr(zio_t *zio, dnode_t *dn, int level, uint64_t blkid)
153 {
154 	int err;
155 	dmu_buf_impl_t *db;
156 
157 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
158 	db = dbuf_hold_level(dn, level, blkid, FTAG);
159 	rw_exit(&dn->dn_struct_rwlock);
160 	if (db == NULL)
161 		return (EIO);
162 	err = dbuf_read(db, zio, DB_RF_CANFAIL);
163 	dbuf_rele(db, FTAG);
164 	return (err);
165 }
166 
167 /* ARGSUSED */
168 static void
169 dmu_tx_count_write(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
170 {
171 	uint64_t start, end, i, space;
172 	int min_bs, max_bs, min_ibs, max_ibs, epbs, bits;
173 
174 	if (len == 0)
175 		return;
176 
177 	min_bs = SPA_MINBLOCKSHIFT;
178 	max_bs = SPA_MAXBLOCKSHIFT;
179 	min_ibs = DN_MIN_INDBLKSHIFT;
180 	max_ibs = DN_MAX_INDBLKSHIFT;
181 
182 	/*
183 	 * For i/o error checking, read the first and last level-0
184 	 * blocks (if they are not aligned), and all the level-1 blocks.
185 	 * We needn't do this on the meta-dnode, because we've already
186 	 * read it in.
187 	 */
188 
189 	if (dn && dn->dn_object != DMU_META_DNODE_OBJECT) {
190 		int err;
191 
192 		if (dn->dn_maxblkid == 0) {
193 			err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
194 			if (err) {
195 				tx->tx_err = err;
196 				return;
197 			}
198 		} else {
199 			zio_t *zio = zio_root(tx->tx_pool->dp_spa,
200 			    NULL, NULL, ZIO_FLAG_CANFAIL);
201 
202 			/* first level-0 block */
203 			start = off >> dn->dn_datablkshift;
204 			if (P2PHASE(off, dn->dn_datablksz) ||
205 			    len < dn->dn_datablksz) {
206 				err = dmu_tx_check_ioerr(zio, dn, 0, start);
207 				if (err) {
208 					tx->tx_err = err;
209 					return;
210 				}
211 			}
212 
213 			/* last level-0 block */
214 			end = (off+len-1) >> dn->dn_datablkshift;
215 			if (end != start &&
216 			    P2PHASE(off+len, dn->dn_datablksz)) {
217 				err = dmu_tx_check_ioerr(zio, dn, 0, end);
218 				if (err) {
219 					tx->tx_err = err;
220 					return;
221 				}
222 			}
223 
224 			/* level-1 blocks */
225 			if (dn->dn_nlevels > 1) {
226 				start >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
227 				end >>= dn->dn_indblkshift - SPA_BLKPTRSHIFT;
228 				for (i = start+1; i < end; i++) {
229 					err = dmu_tx_check_ioerr(zio, dn, 1, i);
230 					if (err) {
231 						tx->tx_err = err;
232 						return;
233 					}
234 				}
235 			}
236 
237 			err = zio_wait(zio);
238 			if (err) {
239 				tx->tx_err = err;
240 				return;
241 			}
242 		}
243 	}
244 
245 	/*
246 	 * If there's more than one block, the blocksize can't change,
247 	 * so we can make a more precise estimate.  Alternatively,
248 	 * if the dnode's ibs is larger than max_ibs, always use that.
249 	 * This ensures that if we reduce DN_MAX_INDBLKSHIFT,
250 	 * the code will still work correctly on existing pools.
251 	 */
252 	if (dn && (dn->dn_maxblkid != 0 || dn->dn_indblkshift > max_ibs)) {
253 		min_ibs = max_ibs = dn->dn_indblkshift;
254 		if (dn->dn_datablkshift != 0)
255 			min_bs = max_bs = dn->dn_datablkshift;
256 	}
257 
258 	/*
259 	 * 'end' is the last thing we will access, not one past.
260 	 * This way we won't overflow when accessing the last byte.
261 	 */
262 	start = P2ALIGN(off, 1ULL << max_bs);
263 	end = P2ROUNDUP(off + len, 1ULL << max_bs) - 1;
264 	space = end - start + 1;
265 
266 	start >>= min_bs;
267 	end >>= min_bs;
268 
269 	epbs = min_ibs - SPA_BLKPTRSHIFT;
270 
271 	/*
272 	 * The object contains at most 2^(64 - min_bs) blocks,
273 	 * and each indirect level maps 2^epbs.
274 	 */
275 	for (bits = 64 - min_bs; bits >= 0; bits -= epbs) {
276 		start >>= epbs;
277 		end >>= epbs;
278 		/*
279 		 * If we increase the number of levels of indirection,
280 		 * we'll need new blkid=0 indirect blocks.  If start == 0,
281 		 * we're already accounting for that blocks; and if end == 0,
282 		 * we can't increase the number of levels beyond that.
283 		 */
284 		if (start != 0 && end != 0)
285 			space += 1ULL << max_ibs;
286 		space += (end - start + 1) << max_ibs;
287 	}
288 
289 	ASSERT(space < 2 * DMU_MAX_ACCESS);
290 
291 	tx->tx_space_towrite += space;
292 }
293 
294 static void
295 dmu_tx_count_dnode(dmu_tx_t *tx, dnode_t *dn)
296 {
297 	dnode_t *mdn = tx->tx_objset->os->os_meta_dnode;
298 	uint64_t object = dn ? dn->dn_object : DN_MAX_OBJECT - 1;
299 	uint64_t pre_write_space;
300 
301 	ASSERT(object < DN_MAX_OBJECT);
302 	pre_write_space = tx->tx_space_towrite;
303 	dmu_tx_count_write(tx, mdn, object << DNODE_SHIFT, 1 << DNODE_SHIFT);
304 	if (dn && dn->dn_dbuf->db_blkptr &&
305 	    dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
306 	    dn->dn_dbuf->db_blkptr->blk_birth)) {
307 		tx->tx_space_tooverwrite +=
308 			tx->tx_space_towrite - pre_write_space;
309 		tx->tx_space_towrite = pre_write_space;
310 	}
311 }
312 
313 /* ARGSUSED */
314 static void
315 dmu_tx_hold_write_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
316 {
317 	dmu_tx_count_write(tx, dn, off, len);
318 	dmu_tx_count_dnode(tx, dn);
319 }
320 
321 void
322 dmu_tx_hold_write(dmu_tx_t *tx, uint64_t object, uint64_t off, int len)
323 {
324 	ASSERT(tx->tx_txg == 0);
325 	ASSERT(len < DMU_MAX_ACCESS);
326 	ASSERT(len == 0 || UINT64_MAX - off >= len - 1);
327 
328 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_WRITE,
329 	    dmu_tx_hold_write_impl, off, len);
330 }
331 
332 static void
333 dmu_tx_count_free(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
334 {
335 	uint64_t blkid, nblks;
336 	uint64_t space = 0;
337 	dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
338 	spa_t *spa = tx->tx_pool->dp_spa;
339 	int dirty;
340 
341 	/*
342 	 * We don't need to use any locking to check for dirtyness
343 	 * because it's OK if we get stale data -- the dnode may become
344 	 * dirty immediately after our check anyway.  This is just a
345 	 * means to avoid the expensive count when we aren't sure we
346 	 * need it.  We need to be able to deal with a dirty dnode.
347 	 */
348 	dirty = list_link_active(&dn->dn_dirty_link[0]) |
349 	    list_link_active(&dn->dn_dirty_link[1]) |
350 	    list_link_active(&dn->dn_dirty_link[2]) |
351 	    list_link_active(&dn->dn_dirty_link[3]);
352 	if (dirty || dn->dn_assigned_tx || dn->dn_phys->dn_nlevels == 0)
353 		return;
354 
355 	/*
356 	 * the struct_rwlock protects us against dn_phys->dn_nlevels
357 	 * changing, in case (against all odds) we manage to dirty &
358 	 * sync out the changes after we check for being dirty.
359 	 * also, dbuf_hold_impl() wants us to have the struct_rwlock.
360 	 *
361 	 * It's fine to use dn_datablkshift rather than the dn_phys
362 	 * equivalent because if it is changing, maxblkid==0 and we will
363 	 * bail.
364 	 */
365 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
366 	if (dn->dn_phys->dn_maxblkid == 0) {
367 		if (off == 0 && len >= dn->dn_datablksz) {
368 			blkid = 0;
369 			nblks = 1;
370 		} else {
371 			rw_exit(&dn->dn_struct_rwlock);
372 			return;
373 		}
374 	} else {
375 		blkid = off >> dn->dn_datablkshift;
376 		nblks = (off + len) >> dn->dn_datablkshift;
377 
378 		if (blkid >= dn->dn_phys->dn_maxblkid) {
379 			rw_exit(&dn->dn_struct_rwlock);
380 			return;
381 		}
382 		if (blkid + nblks > dn->dn_phys->dn_maxblkid)
383 			nblks = dn->dn_phys->dn_maxblkid - blkid;
384 
385 		/* don't bother after 128,000 blocks */
386 		nblks = MIN(nblks, 128*1024);
387 	}
388 
389 	if (dn->dn_phys->dn_nlevels == 1) {
390 		int i;
391 		for (i = 0; i < nblks; i++) {
392 			blkptr_t *bp = dn->dn_phys->dn_blkptr;
393 			ASSERT3U(blkid + i, <, dn->dn_phys->dn_nblkptr);
394 			bp += blkid + i;
395 			if (dsl_dataset_block_freeable(ds, bp->blk_birth)) {
396 				dprintf_bp(bp, "can free old%s", "");
397 				space += bp_get_dasize(spa, bp);
398 			}
399 		}
400 		nblks = 0;
401 	}
402 
403 	while (nblks) {
404 		dmu_buf_impl_t *dbuf;
405 		int err, epbs, blkoff, tochk;
406 
407 		epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
408 		blkoff = P2PHASE(blkid, 1<<epbs);
409 		tochk = MIN((1<<epbs) - blkoff, nblks);
410 
411 		err = dbuf_hold_impl(dn, 1, blkid >> epbs, TRUE, FTAG, &dbuf);
412 		if (err == 0) {
413 			int i;
414 			blkptr_t *bp;
415 
416 			err = dbuf_read(dbuf, NULL,
417 			    DB_RF_HAVESTRUCT | DB_RF_CANFAIL);
418 			if (err != 0) {
419 				tx->tx_err = err;
420 				dbuf_rele(dbuf, FTAG);
421 				break;
422 			}
423 
424 			bp = dbuf->db.db_data;
425 			bp += blkoff;
426 
427 			for (i = 0; i < tochk; i++) {
428 				if (dsl_dataset_block_freeable(ds,
429 				    bp[i].blk_birth)) {
430 					dprintf_bp(&bp[i],
431 					    "can free old%s", "");
432 					space += bp_get_dasize(spa, &bp[i]);
433 				}
434 			}
435 			dbuf_rele(dbuf, FTAG);
436 		}
437 		if (err != 0 && err != ENOENT) {
438 			tx->tx_err = err;
439 			break;
440 		}
441 
442 		blkid += tochk;
443 		nblks -= tochk;
444 	}
445 	rw_exit(&dn->dn_struct_rwlock);
446 
447 	tx->tx_space_tofree += space;
448 }
449 
450 static void
451 dmu_tx_hold_free_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t off, uint64_t len)
452 {
453 	uint64_t start, end, i;
454 	int err, shift;
455 	zio_t *zio;
456 
457 	/* first block */
458 	if (off != 0)
459 		dmu_tx_count_write(tx, dn, off, 1);
460 	/* last block */
461 	if (len != DMU_OBJECT_END)
462 		dmu_tx_count_write(tx, dn, off+len, 1);
463 
464 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
465 		return;
466 	if (len == DMU_OBJECT_END)
467 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
468 
469 	/*
470 	 * For i/o error checking, read the first and last level-0
471 	 * blocks, and all the level-1 blocks.  The above count_write's
472 	 * will take care of the level-0 blocks.
473 	 */
474 	if (dn->dn_nlevels > 1) {
475 		shift = dn->dn_datablkshift + dn->dn_indblkshift -
476 		    SPA_BLKPTRSHIFT;
477 		start = off >> shift;
478 		end = dn->dn_datablkshift ? ((off+len) >> shift) : 0;
479 
480 		zio = zio_root(tx->tx_pool->dp_spa,
481 		    NULL, NULL, ZIO_FLAG_CANFAIL);
482 		for (i = start; i <= end; i++) {
483 			uint64_t ibyte = i << shift;
484 			err = dnode_next_offset(dn, FALSE, &ibyte, 2, 1);
485 			i = ibyte >> shift;
486 			if (err == ESRCH)
487 				break;
488 			if (err) {
489 				tx->tx_err = err;
490 				return;
491 			}
492 
493 			err = dmu_tx_check_ioerr(zio, dn, 1, i);
494 			if (err) {
495 				tx->tx_err = err;
496 				return;
497 			}
498 		}
499 		err = zio_wait(zio);
500 		if (err) {
501 			tx->tx_err = err;
502 			return;
503 		}
504 	}
505 
506 	dmu_tx_count_dnode(tx, dn);
507 	dmu_tx_count_free(tx, dn, off, len);
508 }
509 
510 void
511 dmu_tx_hold_free(dmu_tx_t *tx, uint64_t object, uint64_t off, uint64_t len)
512 {
513 	ASSERT(tx->tx_txg == 0);
514 
515 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_FREE,
516 	    dmu_tx_hold_free_impl, off, len);
517 }
518 
519 /* ARGSUSED */
520 static void
521 dmu_tx_hold_zap_impl(dmu_tx_t *tx, dnode_t *dn, uint64_t add, uint64_t iname)
522 {
523 	uint64_t nblocks;
524 	int epbs, err;
525 	char *name = (char *)(uintptr_t)iname;
526 
527 	dmu_tx_count_dnode(tx, dn);
528 
529 	if (dn == NULL) {
530 		/*
531 		 * We will be able to fit a new object's entries into one leaf
532 		 * block.  So there will be at most 2 blocks total,
533 		 * including the header block.
534 		 */
535 		dmu_tx_count_write(tx, dn, 0, 2 << fzap_default_block_shift);
536 		return;
537 	}
538 
539 	ASSERT3P(dmu_ot[dn->dn_type].ot_byteswap, ==, zap_byteswap);
540 
541 	if (dn->dn_maxblkid == 0 && !add) {
542 		/*
543 		 * If there is only one block  (i.e. this is a micro-zap)
544 		 * and we are not adding anything, the accounting is simple.
545 		 */
546 		err = dmu_tx_check_ioerr(NULL, dn, 0, 0);
547 		if (err) {
548 			tx->tx_err = err;
549 			return;
550 		}
551 
552 		if (dsl_dataset_block_freeable(dn->dn_objset->os_dsl_dataset,
553 		    dn->dn_phys->dn_blkptr[0].blk_birth))
554 			tx->tx_space_tooverwrite += dn->dn_datablksz;
555 		else
556 			tx->tx_space_towrite += dn->dn_datablksz;
557 		return;
558 	}
559 
560 	if (dn->dn_maxblkid > 0 && name) {
561 		/*
562 		 * access the name in this fat-zap so that we'll check
563 		 * for i/o errors to the leaf blocks, etc.
564 		 */
565 		err = zap_lookup(&dn->dn_objset->os, dn->dn_object, name,
566 		    8, 0, NULL);
567 		if (err == EIO) {
568 			tx->tx_err = err;
569 			return;
570 		}
571 	}
572 
573 	/*
574 	 * 3 blocks overwritten: target leaf, ptrtbl block, header block
575 	 * 3 new blocks written if adding: new split leaf, 2 grown ptrtbl blocks
576 	 */
577 	dmu_tx_count_write(tx, dn, dn->dn_maxblkid * dn->dn_datablksz,
578 	    (3 + add ? 3 : 0) << dn->dn_datablkshift);
579 
580 	/*
581 	 * If the modified blocks are scattered to the four winds,
582 	 * we'll have to modify an indirect twig for each.
583 	 */
584 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
585 	for (nblocks = dn->dn_maxblkid >> epbs; nblocks != 0; nblocks >>= epbs)
586 		tx->tx_space_towrite += 3 << dn->dn_indblkshift;
587 }
588 
589 void
590 dmu_tx_hold_zap(dmu_tx_t *tx, uint64_t object, int add, char *name)
591 {
592 	ASSERT(tx->tx_txg == 0);
593 
594 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_ZAP,
595 	    dmu_tx_hold_zap_impl, add, (uintptr_t)name);
596 }
597 
598 void
599 dmu_tx_hold_bonus(dmu_tx_t *tx, uint64_t object)
600 {
601 	ASSERT(tx->tx_txg == 0);
602 
603 	dmu_tx_hold_object_impl(tx, tx->tx_objset, object, THT_BONUS,
604 	    dmu_tx_hold_write_impl, 0, 0);
605 }
606 
607 
608 /* ARGSUSED */
609 static void
610 dmu_tx_hold_space_impl(dmu_tx_t *tx, dnode_t *dn,
611     uint64_t space, uint64_t unused)
612 {
613 	tx->tx_space_towrite += space;
614 }
615 
616 void
617 dmu_tx_hold_space(dmu_tx_t *tx, uint64_t space)
618 {
619 	ASSERT(tx->tx_txg == 0);
620 
621 	dmu_tx_hold_object_impl(tx, tx->tx_objset, DMU_NEW_OBJECT, THT_SPACE,
622 	    dmu_tx_hold_space_impl, space, 0);
623 }
624 
625 int
626 dmu_tx_holds(dmu_tx_t *tx, uint64_t object)
627 {
628 	dmu_tx_hold_t *dth;
629 	int holds = 0;
630 
631 	/*
632 	 * By asserting that the tx is assigned, we're counting the
633 	 * number of dn_tx_holds, which is the same as the number of
634 	 * dn_holds.  Otherwise, we'd be counting dn_holds, but
635 	 * dn_tx_holds could be 0.
636 	 */
637 	ASSERT(tx->tx_txg != 0);
638 
639 	/* if (tx->tx_anyobj == TRUE) */
640 		/* return (0); */
641 
642 	for (dth = list_head(&tx->tx_holds); dth;
643 	    dth = list_next(&tx->tx_holds, dth)) {
644 		if (dth->dth_dnode && dth->dth_dnode->dn_object == object)
645 			holds++;
646 	}
647 
648 	return (holds);
649 }
650 
651 #ifdef ZFS_DEBUG
652 void
653 dmu_tx_dirty_buf(dmu_tx_t *tx, dmu_buf_impl_t *db)
654 {
655 	dmu_tx_hold_t *dth;
656 	int match_object = FALSE, match_offset = FALSE;
657 	dnode_t *dn = db->db_dnode;
658 
659 	ASSERT(tx->tx_txg != 0);
660 	ASSERT(tx->tx_objset == NULL || dn->dn_objset == tx->tx_objset->os);
661 	ASSERT3U(dn->dn_object, ==, db->db.db_object);
662 
663 	if (tx->tx_anyobj)
664 		return;
665 
666 	/* XXX No checking on the meta dnode for now */
667 	if (db->db.db_object == DMU_META_DNODE_OBJECT)
668 		return;
669 
670 	for (dth = list_head(&tx->tx_holds); dth;
671 	    dth = list_next(&tx->tx_holds, dth)) {
672 		ASSERT(dn == NULL || dn->dn_assigned_txg == tx->tx_txg);
673 		if (dth->dth_dnode == dn && dth->dth_type != THT_NEWOBJECT)
674 			match_object = TRUE;
675 		if (dth->dth_dnode == NULL || dth->dth_dnode == dn) {
676 			int datablkshift = dn->dn_datablkshift ?
677 			    dn->dn_datablkshift : SPA_MAXBLOCKSHIFT;
678 			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
679 			int shift = datablkshift + epbs * db->db_level;
680 			uint64_t beginblk = shift >= 64 ? 0 :
681 			    (dth->dth_arg1 >> shift);
682 			uint64_t endblk = shift >= 64 ? 0 :
683 			    ((dth->dth_arg1 + dth->dth_arg2 - 1) >> shift);
684 			uint64_t blkid = db->db_blkid;
685 
686 			/* XXX dth_arg2 better not be zero... */
687 
688 			dprintf("found dth type %x beginblk=%llx endblk=%llx\n",
689 			    dth->dth_type, beginblk, endblk);
690 
691 			switch (dth->dth_type) {
692 			case THT_WRITE:
693 				if (blkid >= beginblk && blkid <= endblk)
694 					match_offset = TRUE;
695 				/*
696 				 * We will let this hold work for the bonus
697 				 * buffer so that we don't need to hold it
698 				 * when creating a new object.
699 				 */
700 				if (blkid == DB_BONUS_BLKID)
701 					match_offset = TRUE;
702 				/*
703 				 * They might have to increase nlevels,
704 				 * thus dirtying the new TLIBs.  Or the
705 				 * might have to change the block size,
706 				 * thus dirying the new lvl=0 blk=0.
707 				 */
708 				if (blkid == 0)
709 					match_offset = TRUE;
710 				break;
711 			case THT_FREE:
712 				if (blkid == beginblk &&
713 				    (dth->dth_arg1 != 0 ||
714 				    dn->dn_maxblkid == 0))
715 					match_offset = TRUE;
716 				if (blkid == endblk &&
717 				    dth->dth_arg2 != DMU_OBJECT_END)
718 					match_offset = TRUE;
719 				break;
720 			case THT_BONUS:
721 				if (blkid == DB_BONUS_BLKID)
722 					match_offset = TRUE;
723 				break;
724 			case THT_ZAP:
725 				match_offset = TRUE;
726 				break;
727 			case THT_NEWOBJECT:
728 				match_object = TRUE;
729 				break;
730 			default:
731 				ASSERT(!"bad dth_type");
732 			}
733 		}
734 		if (match_object && match_offset)
735 			return;
736 	}
737 	panic("dirtying dbuf obj=%llx lvl=%u blkid=%llx but not tx_held\n",
738 	    (u_longlong_t)db->db.db_object, db->db_level,
739 	    (u_longlong_t)db->db_blkid);
740 }
741 #endif
742 
743 static int
744 dmu_tx_try_assign(dmu_tx_t *tx, uint64_t txg_how, dmu_tx_hold_t **last_dth)
745 {
746 	dmu_tx_hold_t *dth;
747 	uint64_t lsize, asize, fsize, towrite;
748 
749 	*last_dth = NULL;
750 
751 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
752 
753 	if (txg_how >= TXG_INITIAL && txg_how != tx->tx_txg)
754 		return (ERESTART);
755 	if (tx->tx_err)
756 		return (tx->tx_err);
757 
758 	for (dth = list_head(&tx->tx_holds); dth;
759 	    dth = list_next(&tx->tx_holds, dth)) {
760 		dnode_t *dn = dth->dth_dnode;
761 		if (dn != NULL) {
762 			mutex_enter(&dn->dn_mtx);
763 			while (dn->dn_assigned_txg == tx->tx_txg - 1) {
764 				if (txg_how != TXG_WAIT) {
765 					mutex_exit(&dn->dn_mtx);
766 					return (ERESTART);
767 				}
768 				cv_wait(&dn->dn_notxholds, &dn->dn_mtx);
769 			}
770 			if (dn->dn_assigned_txg == 0) {
771 				ASSERT(dn->dn_assigned_tx == NULL);
772 				dn->dn_assigned_txg = tx->tx_txg;
773 				dn->dn_assigned_tx = tx;
774 			} else {
775 				ASSERT(dn->dn_assigned_txg == tx->tx_txg);
776 				if (dn->dn_assigned_tx != tx)
777 					dn->dn_assigned_tx = NULL;
778 			}
779 			(void) refcount_add(&dn->dn_tx_holds, tx);
780 			mutex_exit(&dn->dn_mtx);
781 		}
782 		*last_dth = dth;
783 		if (tx->tx_err)
784 			return (tx->tx_err);
785 	}
786 
787 	/*
788 	 * If a snapshot has been taken since we made our estimates,
789 	 * assume that we won't be able to free or overwrite anything.
790 	 */
791 	if (tx->tx_objset &&
792 	    dsl_dataset_prev_snap_txg(tx->tx_objset->os->os_dsl_dataset) >
793 	    tx->tx_lastsnap_txg) {
794 		tx->tx_space_towrite += tx->tx_space_tooverwrite;
795 		tx->tx_space_tooverwrite = 0;
796 		tx->tx_space_tofree = 0;
797 	}
798 
799 	/*
800 	 * Convert logical size to worst-case allocated size.
801 	 */
802 	fsize = spa_get_asize(tx->tx_pool->dp_spa, tx->tx_space_tooverwrite) +
803 	    tx->tx_space_tofree;
804 	lsize = tx->tx_space_towrite + tx->tx_space_tooverwrite;
805 	asize = spa_get_asize(tx->tx_pool->dp_spa, lsize);
806 	towrite = tx->tx_space_towrite;
807 	tx->tx_space_towrite = asize;
808 
809 	if (tx->tx_dir && asize != 0) {
810 		int err = dsl_dir_tempreserve_space(tx->tx_dir,
811 		    lsize, asize, fsize, &tx->tx_tempreserve_cookie, tx);
812 		if (err) {
813 			tx->tx_space_towrite = towrite;
814 			return (err);
815 		}
816 	}
817 
818 	return (0);
819 }
820 
821 static uint64_t
822 dmu_tx_unassign(dmu_tx_t *tx, dmu_tx_hold_t *last_dth)
823 {
824 	uint64_t txg = tx->tx_txg;
825 	dmu_tx_hold_t *dth;
826 
827 	ASSERT(txg != 0);
828 
829 	txg_rele_to_quiesce(&tx->tx_txgh);
830 
831 	for (dth = last_dth; dth; dth = list_prev(&tx->tx_holds, dth)) {
832 		dnode_t *dn = dth->dth_dnode;
833 
834 		if (dn == NULL)
835 			continue;
836 		mutex_enter(&dn->dn_mtx);
837 		ASSERT3U(dn->dn_assigned_txg, ==, txg);
838 
839 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
840 			dn->dn_assigned_txg = 0;
841 			dn->dn_assigned_tx = NULL;
842 			cv_broadcast(&dn->dn_notxholds);
843 		}
844 		mutex_exit(&dn->dn_mtx);
845 	}
846 
847 	txg_rele_to_sync(&tx->tx_txgh);
848 
849 	tx->tx_txg = 0;
850 	return (txg);
851 }
852 
853 /*
854  * Assign tx to a transaction group.  txg_how can be one of:
855  *
856  * (1)	TXG_WAIT.  If the current open txg is full, waits until there's
857  *	a new one.  This should be used when you're not holding locks.
858  *	If will only fail if we're truly out of space (or over quota).
859  *
860  * (2)	TXG_NOWAIT.  If we can't assign into the current open txg without
861  *	blocking, returns immediately with ERESTART.  This should be used
862  *	whenever you're holding locks.  On an ERESTART error, the caller
863  *	should drop locks, do a txg_wait_open(dp, 0), and try again.
864  *
865  * (3)	A specific txg.  Use this if you need to ensure that multiple
866  *	transactions all sync in the same txg.  Like TXG_NOWAIT, it
867  *	returns ERESTART if it can't assign you into the requested txg.
868  */
869 int
870 dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how)
871 {
872 	dmu_tx_hold_t *last_dth;
873 	int err;
874 
875 	ASSERT(tx->tx_txg == 0);
876 	ASSERT(txg_how != 0);
877 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
878 
879 	while ((err = dmu_tx_try_assign(tx, txg_how, &last_dth)) != 0) {
880 		uint64_t txg = dmu_tx_unassign(tx, last_dth);
881 
882 		if (err != ERESTART || txg_how != TXG_WAIT)
883 			return (err);
884 
885 		txg_wait_open(tx->tx_pool, txg + 1);
886 	}
887 
888 	txg_rele_to_quiesce(&tx->tx_txgh);
889 
890 	return (0);
891 }
892 
893 void
894 dmu_tx_willuse_space(dmu_tx_t *tx, int64_t delta)
895 {
896 	if (tx->tx_dir == NULL || delta == 0)
897 		return;
898 
899 	if (delta > 0) {
900 		ASSERT3U(refcount_count(&tx->tx_space_written) + delta, <=,
901 		    tx->tx_space_towrite);
902 		(void) refcount_add_many(&tx->tx_space_written, delta, NULL);
903 	} else {
904 		(void) refcount_add_many(&tx->tx_space_freed, -delta, NULL);
905 	}
906 }
907 
908 void
909 dmu_tx_commit(dmu_tx_t *tx)
910 {
911 	dmu_tx_hold_t *dth;
912 
913 	ASSERT(tx->tx_txg != 0);
914 
915 	while (dth = list_head(&tx->tx_holds)) {
916 		dnode_t *dn = dth->dth_dnode;
917 
918 		list_remove(&tx->tx_holds, dth);
919 		kmem_free(dth, sizeof (dmu_tx_hold_t));
920 		if (dn == NULL)
921 			continue;
922 		mutex_enter(&dn->dn_mtx);
923 		ASSERT3U(dn->dn_assigned_txg, ==, tx->tx_txg);
924 
925 		if (refcount_remove(&dn->dn_tx_holds, tx) == 0) {
926 			dn->dn_assigned_txg = 0;
927 			dn->dn_assigned_tx = NULL;
928 			cv_broadcast(&dn->dn_notxholds);
929 		}
930 		mutex_exit(&dn->dn_mtx);
931 		dnode_rele(dn, tx);
932 	}
933 
934 	if (tx->tx_dir && tx->tx_space_towrite > 0) {
935 		dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
936 	}
937 
938 	if (tx->tx_anyobj == FALSE)
939 		txg_rele_to_sync(&tx->tx_txgh);
940 	dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
941 	    tx->tx_space_towrite, refcount_count(&tx->tx_space_written),
942 	    tx->tx_space_tofree, refcount_count(&tx->tx_space_freed));
943 	refcount_destroy_many(&tx->tx_space_written,
944 	    refcount_count(&tx->tx_space_written));
945 	refcount_destroy_many(&tx->tx_space_freed,
946 	    refcount_count(&tx->tx_space_freed));
947 #ifdef ZFS_DEBUG
948 	if (tx->tx_debug_buf)
949 		kmem_free(tx->tx_debug_buf, 4096);
950 #endif
951 	kmem_free(tx, sizeof (dmu_tx_t));
952 }
953 
954 void
955 dmu_tx_abort(dmu_tx_t *tx)
956 {
957 	dmu_tx_hold_t *dth;
958 
959 	ASSERT(tx->tx_txg == 0);
960 
961 	while (dth = list_head(&tx->tx_holds)) {
962 		dnode_t *dn = dth->dth_dnode;
963 
964 		list_remove(&tx->tx_holds, dth);
965 		kmem_free(dth, sizeof (dmu_tx_hold_t));
966 		if (dn != NULL)
967 			dnode_rele(dn, tx);
968 	}
969 	refcount_destroy_many(&tx->tx_space_written,
970 	    refcount_count(&tx->tx_space_written));
971 	refcount_destroy_many(&tx->tx_space_freed,
972 	    refcount_count(&tx->tx_space_freed));
973 #ifdef ZFS_DEBUG
974 	if (tx->tx_debug_buf)
975 		kmem_free(tx->tx_debug_buf, 4096);
976 #endif
977 	kmem_free(tx, sizeof (dmu_tx_t));
978 }
979 
980 uint64_t
981 dmu_tx_get_txg(dmu_tx_t *tx)
982 {
983 	ASSERT(tx->tx_txg != 0);
984 	return (tx->tx_txg);
985 }
986