xref: /titanic_50/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision 03100a6332bd4edc7a53091fcf7c9a7131bcdaa7)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu_objset.h>
29 #include <sys/dsl_dataset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dsl_prop.h>
32 #include <sys/dsl_synctask.h>
33 #include <sys/dmu_traverse.h>
34 #include <sys/dmu_tx.h>
35 #include <sys/arc.h>
36 #include <sys/zio.h>
37 #include <sys/zap.h>
38 #include <sys/unique.h>
39 #include <sys/zfs_context.h>
40 #include <sys/zfs_ioctl.h>
41 #include <sys/spa.h>
42 #include <sys/sunddi.h>
43 
44 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
45 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
46 static dsl_checkfunc_t dsl_dataset_rollback_check;
47 static dsl_syncfunc_t dsl_dataset_rollback_sync;
48 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
49 
50 #define	DS_REF_MAX	(1ULL << 62)
51 
52 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
53 
54 /*
55  * We use weighted reference counts to express the various forms of exclusion
56  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
57  * is DS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
58  * This makes the exclusion logic simple: the total refcnt for all opens cannot
59  * exceed DS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
60  * weight (DS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
61  * just over half of the refcnt space, so there can't be more than one, but it
62  * can peacefully coexist with any number of STANDARD opens.
63  */
64 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
65 	0,			/* DS_MODE_NONE - invalid		*/
66 	1,			/* DS_MODE_STANDARD - unlimited number	*/
67 	(DS_REF_MAX >> 1) + 1,	/* DS_MODE_PRIMARY - only one of these	*/
68 	DS_REF_MAX		/* DS_MODE_EXCLUSIVE - no other opens	*/
69 };
70 
71 /*
72  * Figure out how much of this delta should be propogated to the dsl_dir
73  * layer.  If there's a refreservation, that space has already been
74  * partially accounted for in our ancestors.
75  */
76 static int64_t
77 parent_delta(dsl_dataset_t *ds, int64_t delta)
78 {
79 	uint64_t old_bytes, new_bytes;
80 
81 	if (ds->ds_reserved == 0)
82 		return (delta);
83 
84 	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
85 	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
86 
87 	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
88 	return (new_bytes - old_bytes);
89 }
90 
91 void
92 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
93 {
94 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
95 	int compressed = BP_GET_PSIZE(bp);
96 	int uncompressed = BP_GET_UCSIZE(bp);
97 	int64_t delta;
98 
99 	dprintf_bp(bp, "born, ds=%p\n", ds);
100 
101 	ASSERT(dmu_tx_is_syncing(tx));
102 	/* It could have been compressed away to nothing */
103 	if (BP_IS_HOLE(bp))
104 		return;
105 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
106 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
107 	if (ds == NULL) {
108 		/*
109 		 * Account for the meta-objset space in its placeholder
110 		 * dsl_dir.
111 		 */
112 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
113 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
114 		    used, compressed, uncompressed, tx);
115 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
116 		return;
117 	}
118 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
119 	mutex_enter(&ds->ds_lock);
120 	delta = parent_delta(ds, used);
121 	ds->ds_phys->ds_used_bytes += used;
122 	ds->ds_phys->ds_compressed_bytes += compressed;
123 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
124 	ds->ds_phys->ds_unique_bytes += used;
125 	mutex_exit(&ds->ds_lock);
126 	dsl_dir_diduse_space(ds->ds_dir, delta, compressed, uncompressed, tx);
127 }
128 
129 void
130 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, zio_t *pio,
131     dmu_tx_t *tx)
132 {
133 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
134 	int compressed = BP_GET_PSIZE(bp);
135 	int uncompressed = BP_GET_UCSIZE(bp);
136 
137 	ASSERT(dmu_tx_is_syncing(tx));
138 	/* No block pointer => nothing to free */
139 	if (BP_IS_HOLE(bp))
140 		return;
141 
142 	ASSERT(used > 0);
143 	if (ds == NULL) {
144 		int err;
145 		/*
146 		 * Account for the meta-objset space in its placeholder
147 		 * dataset.
148 		 */
149 		err = arc_free(pio, tx->tx_pool->dp_spa,
150 		    tx->tx_txg, bp, NULL, NULL, pio ? ARC_NOWAIT: ARC_WAIT);
151 		ASSERT(err == 0);
152 
153 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
154 		    -used, -compressed, -uncompressed, tx);
155 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
156 		return;
157 	}
158 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
159 
160 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
161 
162 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
163 		int err;
164 		int64_t delta;
165 
166 		dprintf_bp(bp, "freeing: %s", "");
167 		err = arc_free(pio, tx->tx_pool->dp_spa,
168 		    tx->tx_txg, bp, NULL, NULL, pio ? ARC_NOWAIT: ARC_WAIT);
169 		ASSERT(err == 0);
170 
171 		mutex_enter(&ds->ds_lock);
172 		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
173 		    !DS_UNIQUE_IS_ACCURATE(ds));
174 		delta = parent_delta(ds, -used);
175 		ds->ds_phys->ds_unique_bytes -= used;
176 		mutex_exit(&ds->ds_lock);
177 		dsl_dir_diduse_space(ds->ds_dir,
178 		    delta, -compressed, -uncompressed, tx);
179 	} else {
180 		dprintf_bp(bp, "putting on dead list: %s", "");
181 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
182 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
183 		if (ds->ds_phys->ds_prev_snap_obj != 0) {
184 			ASSERT3U(ds->ds_prev->ds_object, ==,
185 			    ds->ds_phys->ds_prev_snap_obj);
186 			ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
187 			if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
188 			    ds->ds_object && bp->blk_birth >
189 			    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
190 				dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
191 				mutex_enter(&ds->ds_prev->ds_lock);
192 				ds->ds_prev->ds_phys->ds_unique_bytes +=
193 				    used;
194 				mutex_exit(&ds->ds_prev->ds_lock);
195 			}
196 		}
197 	}
198 	mutex_enter(&ds->ds_lock);
199 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
200 	ds->ds_phys->ds_used_bytes -= used;
201 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
202 	ds->ds_phys->ds_compressed_bytes -= compressed;
203 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
204 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
205 	mutex_exit(&ds->ds_lock);
206 }
207 
208 uint64_t
209 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
210 {
211 	uint64_t trysnap = 0;
212 
213 	if (ds == NULL)
214 		return (0);
215 	/*
216 	 * The snapshot creation could fail, but that would cause an
217 	 * incorrect FALSE return, which would only result in an
218 	 * overestimation of the amount of space that an operation would
219 	 * consume, which is OK.
220 	 *
221 	 * There's also a small window where we could miss a pending
222 	 * snapshot, because we could set the sync task in the quiescing
223 	 * phase.  So this should only be used as a guess.
224 	 */
225 	if (ds->ds_trysnap_txg >
226 	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
227 		trysnap = ds->ds_trysnap_txg;
228 	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
229 }
230 
231 int
232 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
233 {
234 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
235 }
236 
237 /* ARGSUSED */
238 static void
239 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
240 {
241 	dsl_dataset_t *ds = dsv;
242 
243 	/* open_refcount == DS_REF_MAX when deleting */
244 	ASSERT(ds->ds_open_refcount == 0 ||
245 	    ds->ds_open_refcount == DS_REF_MAX);
246 
247 	dprintf_ds(ds, "evicting %s\n", "");
248 
249 	unique_remove(ds->ds_fsid_guid);
250 
251 	if (ds->ds_user_ptr != NULL)
252 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
253 
254 	if (ds->ds_prev) {
255 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
256 		ds->ds_prev = NULL;
257 	}
258 
259 	bplist_close(&ds->ds_deadlist);
260 	dsl_dir_close(ds->ds_dir, ds);
261 
262 	ASSERT(!list_link_active(&ds->ds_synced_link));
263 
264 	mutex_destroy(&ds->ds_lock);
265 	mutex_destroy(&ds->ds_opening_lock);
266 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
267 
268 	kmem_free(ds, sizeof (dsl_dataset_t));
269 }
270 
271 static int
272 dsl_dataset_get_snapname(dsl_dataset_t *ds)
273 {
274 	dsl_dataset_phys_t *headphys;
275 	int err;
276 	dmu_buf_t *headdbuf;
277 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
278 	objset_t *mos = dp->dp_meta_objset;
279 
280 	if (ds->ds_snapname[0])
281 		return (0);
282 	if (ds->ds_phys->ds_next_snap_obj == 0)
283 		return (0);
284 
285 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
286 	    FTAG, &headdbuf);
287 	if (err)
288 		return (err);
289 	headphys = headdbuf->db_data;
290 	err = zap_value_search(dp->dp_meta_objset,
291 	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
292 	dmu_buf_rele(headdbuf, FTAG);
293 	return (err);
294 }
295 
296 int
297 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
298     int mode, void *tag, dsl_dataset_t **dsp)
299 {
300 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
301 	objset_t *mos = dp->dp_meta_objset;
302 	dmu_buf_t *dbuf;
303 	dsl_dataset_t *ds;
304 	int err;
305 
306 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
307 	    dsl_pool_sync_context(dp));
308 
309 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
310 	if (err)
311 		return (err);
312 	ds = dmu_buf_get_user(dbuf);
313 	if (ds == NULL) {
314 		dsl_dataset_t *winner;
315 
316 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
317 		ds->ds_dbuf = dbuf;
318 		ds->ds_object = dsobj;
319 		ds->ds_phys = dbuf->db_data;
320 
321 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
322 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
323 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
324 		    NULL);
325 
326 		err = bplist_open(&ds->ds_deadlist,
327 		    mos, ds->ds_phys->ds_deadlist_obj);
328 		if (err == 0) {
329 			err = dsl_dir_open_obj(dp,
330 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
331 		}
332 		if (err) {
333 			/*
334 			 * we don't really need to close the blist if we
335 			 * just opened it.
336 			 */
337 			mutex_destroy(&ds->ds_lock);
338 			mutex_destroy(&ds->ds_opening_lock);
339 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
340 			kmem_free(ds, sizeof (dsl_dataset_t));
341 			dmu_buf_rele(dbuf, tag);
342 			return (err);
343 		}
344 
345 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
346 			ds->ds_snapname[0] = '\0';
347 			if (ds->ds_phys->ds_prev_snap_obj) {
348 				err = dsl_dataset_open_obj(dp,
349 				    ds->ds_phys->ds_prev_snap_obj, NULL,
350 				    DS_MODE_NONE, ds, &ds->ds_prev);
351 			}
352 		} else {
353 			if (snapname) {
354 #ifdef ZFS_DEBUG
355 				dsl_dataset_phys_t *headphys;
356 				dmu_buf_t *headdbuf;
357 				err = dmu_bonus_hold(mos,
358 				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
359 				    FTAG, &headdbuf);
360 				if (err == 0) {
361 					headphys = headdbuf->db_data;
362 					uint64_t foundobj;
363 					err = zap_lookup(dp->dp_meta_objset,
364 					    headphys->ds_snapnames_zapobj,
365 					    snapname, sizeof (foundobj), 1,
366 					    &foundobj);
367 					ASSERT3U(foundobj, ==, dsobj);
368 					dmu_buf_rele(headdbuf, FTAG);
369 				}
370 #endif
371 				(void) strcat(ds->ds_snapname, snapname);
372 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
373 				err = dsl_dataset_get_snapname(ds);
374 			}
375 		}
376 
377 		if (!dsl_dataset_is_snapshot(ds)) {
378 			/*
379 			 * In sync context, we're called with either no lock
380 			 * or with the write lock.  If we're not syncing,
381 			 * we're always called with the read lock held.
382 			 */
383 			boolean_t need_lock =
384 			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
385 			    dsl_pool_sync_context(dp);
386 
387 			if (need_lock)
388 				rw_enter(&dp->dp_config_rwlock, RW_READER);
389 
390 			err = dsl_prop_get_ds_locked(ds->ds_dir,
391 			    "refreservation", sizeof (uint64_t), 1,
392 			    &ds->ds_reserved, NULL);
393 			if (err == 0) {
394 				err = dsl_prop_get_ds_locked(ds->ds_dir,
395 				    "refquota", sizeof (uint64_t), 1,
396 				    &ds->ds_quota, NULL);
397 			}
398 
399 			if (need_lock)
400 				rw_exit(&dp->dp_config_rwlock);
401 		} else {
402 			ds->ds_reserved = ds->ds_quota = 0;
403 		}
404 
405 		if (err == 0) {
406 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
407 			    dsl_dataset_evict);
408 		}
409 		if (err || winner) {
410 			bplist_close(&ds->ds_deadlist);
411 			if (ds->ds_prev) {
412 				dsl_dataset_close(ds->ds_prev,
413 				    DS_MODE_NONE, ds);
414 			}
415 			dsl_dir_close(ds->ds_dir, ds);
416 			mutex_destroy(&ds->ds_lock);
417 			mutex_destroy(&ds->ds_opening_lock);
418 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
419 			kmem_free(ds, sizeof (dsl_dataset_t));
420 			if (err) {
421 				dmu_buf_rele(dbuf, tag);
422 				return (err);
423 			}
424 			ds = winner;
425 		} else {
426 			ds->ds_fsid_guid =
427 			    unique_insert(ds->ds_phys->ds_fsid_guid);
428 		}
429 	}
430 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
431 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
432 
433 	mutex_enter(&ds->ds_lock);
434 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
435 	    (ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) &&
436 	    !DS_MODE_IS_INCONSISTENT(mode)) ||
437 	    (ds->ds_open_refcount + weight > DS_REF_MAX)) {
438 		mutex_exit(&ds->ds_lock);
439 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
440 		return (EBUSY);
441 	}
442 	ds->ds_open_refcount += weight;
443 	mutex_exit(&ds->ds_lock);
444 
445 	*dsp = ds;
446 	return (0);
447 }
448 
449 int
450 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
451     void *tag, dsl_dataset_t **dsp)
452 {
453 	dsl_dir_t *dd;
454 	dsl_pool_t *dp;
455 	const char *tail;
456 	uint64_t obj;
457 	dsl_dataset_t *ds = NULL;
458 	int err = 0;
459 
460 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
461 	if (err)
462 		return (err);
463 
464 	dp = dd->dd_pool;
465 	obj = dd->dd_phys->dd_head_dataset_obj;
466 	rw_enter(&dp->dp_config_rwlock, RW_READER);
467 	if (obj == 0) {
468 		/* A dataset with no associated objset */
469 		err = ENOENT;
470 		goto out;
471 	}
472 
473 	if (tail != NULL) {
474 		objset_t *mos = dp->dp_meta_objset;
475 
476 		err = dsl_dataset_open_obj(dp, obj, NULL,
477 		    DS_MODE_NONE, tag, &ds);
478 		if (err)
479 			goto out;
480 		obj = ds->ds_phys->ds_snapnames_zapobj;
481 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
482 		ds = NULL;
483 
484 		if (tail[0] != '@') {
485 			err = ENOENT;
486 			goto out;
487 		}
488 		tail++;
489 
490 		/* Look for a snapshot */
491 		if (!DS_MODE_IS_READONLY(mode)) {
492 			err = EROFS;
493 			goto out;
494 		}
495 		dprintf("looking for snapshot '%s'\n", tail);
496 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
497 		if (err)
498 			goto out;
499 	}
500 	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
501 
502 out:
503 	rw_exit(&dp->dp_config_rwlock);
504 	dsl_dir_close(dd, FTAG);
505 
506 	ASSERT3U((err == 0), ==, (ds != NULL));
507 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
508 
509 	*dsp = ds;
510 	return (err);
511 }
512 
513 int
514 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
515 {
516 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
517 }
518 
519 void
520 dsl_dataset_name(dsl_dataset_t *ds, char *name)
521 {
522 	if (ds == NULL) {
523 		(void) strcpy(name, "mos");
524 	} else {
525 		dsl_dir_name(ds->ds_dir, name);
526 		VERIFY(0 == dsl_dataset_get_snapname(ds));
527 		if (ds->ds_snapname[0]) {
528 			(void) strcat(name, "@");
529 			if (!MUTEX_HELD(&ds->ds_lock)) {
530 				/*
531 				 * We use a "recursive" mutex so that we
532 				 * can call dprintf_ds() with ds_lock held.
533 				 */
534 				mutex_enter(&ds->ds_lock);
535 				(void) strcat(name, ds->ds_snapname);
536 				mutex_exit(&ds->ds_lock);
537 			} else {
538 				(void) strcat(name, ds->ds_snapname);
539 			}
540 		}
541 	}
542 }
543 
544 static int
545 dsl_dataset_namelen(dsl_dataset_t *ds)
546 {
547 	int result;
548 
549 	if (ds == NULL) {
550 		result = 3;	/* "mos" */
551 	} else {
552 		result = dsl_dir_namelen(ds->ds_dir);
553 		VERIFY(0 == dsl_dataset_get_snapname(ds));
554 		if (ds->ds_snapname[0]) {
555 			++result;	/* adding one for the @-sign */
556 			if (!MUTEX_HELD(&ds->ds_lock)) {
557 				/* see dsl_datset_name */
558 				mutex_enter(&ds->ds_lock);
559 				result += strlen(ds->ds_snapname);
560 				mutex_exit(&ds->ds_lock);
561 			} else {
562 				result += strlen(ds->ds_snapname);
563 			}
564 		}
565 	}
566 
567 	return (result);
568 }
569 
570 void
571 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
572 {
573 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
574 	mutex_enter(&ds->ds_lock);
575 	ASSERT3U(ds->ds_open_refcount, >=, weight);
576 	ds->ds_open_refcount -= weight;
577 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
578 	    mode, ds->ds_open_refcount);
579 	mutex_exit(&ds->ds_lock);
580 
581 	dmu_buf_rele(ds->ds_dbuf, tag);
582 }
583 
584 void
585 dsl_dataset_downgrade(dsl_dataset_t *ds, int oldmode, int newmode)
586 {
587 	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
588 	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
589 	mutex_enter(&ds->ds_lock);
590 	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
591 	ASSERT3U(oldweight, >=, newweight);
592 	ds->ds_open_refcount -= oldweight;
593 	ds->ds_open_refcount += newweight;
594 	mutex_exit(&ds->ds_lock);
595 }
596 
597 boolean_t
598 dsl_dataset_tryupgrade(dsl_dataset_t *ds, int oldmode, int newmode)
599 {
600 	boolean_t rv;
601 	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
602 	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
603 	mutex_enter(&ds->ds_lock);
604 	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
605 	ASSERT3U(newweight, >=, oldweight);
606 	if (ds->ds_open_refcount - oldweight + newweight > DS_REF_MAX) {
607 		rv = B_FALSE;
608 	} else {
609 		ds->ds_open_refcount -= oldweight;
610 		ds->ds_open_refcount += newweight;
611 		rv = B_TRUE;
612 	}
613 	mutex_exit(&ds->ds_lock);
614 	return (rv);
615 }
616 
617 void
618 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
619 {
620 	objset_t *mos = dp->dp_meta_objset;
621 	dmu_buf_t *dbuf;
622 	dsl_dataset_phys_t *dsphys;
623 	dsl_dataset_t *ds;
624 	uint64_t dsobj;
625 	dsl_dir_t *dd;
626 
627 	dsl_dir_create_root(mos, ddobjp, tx);
628 	VERIFY(0 == dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG, &dd));
629 
630 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
631 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
632 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
633 	dmu_buf_will_dirty(dbuf, tx);
634 	dsphys = dbuf->db_data;
635 	dsphys->ds_dir_obj = dd->dd_object;
636 	dsphys->ds_fsid_guid = unique_create();
637 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
638 	    sizeof (dsphys->ds_guid));
639 	dsphys->ds_snapnames_zapobj =
640 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
641 	dsphys->ds_creation_time = gethrestime_sec();
642 	dsphys->ds_creation_txg = tx->tx_txg;
643 	dsphys->ds_deadlist_obj =
644 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
645 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
646 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
647 	dmu_buf_rele(dbuf, FTAG);
648 
649 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
650 	dd->dd_phys->dd_head_dataset_obj = dsobj;
651 	dsl_dir_close(dd, FTAG);
652 
653 	VERIFY(0 ==
654 	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
655 	(void) dmu_objset_create_impl(dp->dp_spa, ds,
656 	    &ds->ds_phys->ds_bp, DMU_OST_ZFS, tx);
657 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
658 }
659 
660 uint64_t
661 dsl_dataset_create_sync_impl(dsl_dir_t *dd, dsl_dataset_t *origin, dmu_tx_t *tx)
662 {
663 	dsl_pool_t *dp = dd->dd_pool;
664 	dmu_buf_t *dbuf;
665 	dsl_dataset_phys_t *dsphys;
666 	uint64_t dsobj;
667 	objset_t *mos = dp->dp_meta_objset;
668 
669 	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
670 	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
671 	ASSERT(dmu_tx_is_syncing(tx));
672 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
673 
674 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
675 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
676 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
677 	dmu_buf_will_dirty(dbuf, tx);
678 	dsphys = dbuf->db_data;
679 	dsphys->ds_dir_obj = dd->dd_object;
680 	dsphys->ds_fsid_guid = unique_create();
681 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
682 	    sizeof (dsphys->ds_guid));
683 	dsphys->ds_snapnames_zapobj =
684 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
685 	dsphys->ds_creation_time = gethrestime_sec();
686 	dsphys->ds_creation_txg = tx->tx_txg;
687 	dsphys->ds_deadlist_obj =
688 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
689 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
690 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
691 
692 	if (origin) {
693 		dsphys->ds_prev_snap_obj = origin->ds_object;
694 		dsphys->ds_prev_snap_txg =
695 		    origin->ds_phys->ds_creation_txg;
696 		dsphys->ds_used_bytes =
697 		    origin->ds_phys->ds_used_bytes;
698 		dsphys->ds_compressed_bytes =
699 		    origin->ds_phys->ds_compressed_bytes;
700 		dsphys->ds_uncompressed_bytes =
701 		    origin->ds_phys->ds_uncompressed_bytes;
702 		dsphys->ds_bp = origin->ds_phys->ds_bp;
703 
704 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
705 		origin->ds_phys->ds_num_children++;
706 
707 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
708 		dd->dd_phys->dd_origin_obj = origin->ds_object;
709 	}
710 	dmu_buf_rele(dbuf, FTAG);
711 
712 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
713 	dd->dd_phys->dd_head_dataset_obj = dsobj;
714 
715 	return (dsobj);
716 }
717 
718 uint64_t
719 dsl_dataset_create_sync(dsl_dir_t *pdd,
720     const char *lastname, dsl_dataset_t *origin, cred_t *cr, dmu_tx_t *tx)
721 {
722 	dsl_pool_t *dp = pdd->dd_pool;
723 	uint64_t dsobj, ddobj;
724 	dsl_dir_t *dd;
725 
726 	ASSERT(lastname[0] != '@');
727 
728 	ddobj = dsl_dir_create_sync(pdd, lastname, tx);
729 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
730 
731 	dsobj = dsl_dataset_create_sync_impl(dd, origin, tx);
732 
733 	dsl_deleg_set_create_perms(dd, tx, cr);
734 
735 	dsl_dir_close(dd, FTAG);
736 
737 	return (dsobj);
738 }
739 
740 struct destroyarg {
741 	dsl_sync_task_group_t *dstg;
742 	char *snapname;
743 	char *failed;
744 };
745 
746 static int
747 dsl_snapshot_destroy_one(char *name, void *arg)
748 {
749 	struct destroyarg *da = arg;
750 	dsl_dataset_t *ds;
751 	char *cp;
752 	int err;
753 
754 	(void) strcat(name, "@");
755 	(void) strcat(name, da->snapname);
756 	err = dsl_dataset_open(name,
757 	    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
758 	    da->dstg, &ds);
759 	cp = strchr(name, '@');
760 	*cp = '\0';
761 	if (err == ENOENT)
762 		return (0);
763 	if (err) {
764 		(void) strcpy(da->failed, name);
765 		return (err);
766 	}
767 
768 	dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
769 	    dsl_dataset_destroy_sync, ds, da->dstg, 0);
770 	return (0);
771 }
772 
773 /*
774  * Destroy 'snapname' in all descendants of 'fsname'.
775  */
776 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
777 int
778 dsl_snapshots_destroy(char *fsname, char *snapname)
779 {
780 	int err;
781 	struct destroyarg da;
782 	dsl_sync_task_t *dst;
783 	spa_t *spa;
784 
785 	err = spa_open(fsname, &spa, FTAG);
786 	if (err)
787 		return (err);
788 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
789 	da.snapname = snapname;
790 	da.failed = fsname;
791 
792 	err = dmu_objset_find(fsname,
793 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
794 
795 	if (err == 0)
796 		err = dsl_sync_task_group_wait(da.dstg);
797 
798 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
799 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
800 		dsl_dataset_t *ds = dst->dst_arg1;
801 		if (dst->dst_err) {
802 			dsl_dataset_name(ds, fsname);
803 			*strchr(fsname, '@') = '\0';
804 		}
805 		/*
806 		 * If it was successful, destroy_sync would have
807 		 * closed the ds
808 		 */
809 		if (err)
810 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, da.dstg);
811 	}
812 
813 	dsl_sync_task_group_destroy(da.dstg);
814 	spa_close(spa, FTAG);
815 	return (err);
816 }
817 
818 /*
819  * ds must be opened EXCLUSIVE or PRIMARY.  on return (whether
820  * successful or not), ds will be closed and caller can no longer
821  * dereference it.
822  */
823 int
824 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
825 {
826 	int err;
827 	dsl_sync_task_group_t *dstg;
828 	objset_t *os;
829 	dsl_dir_t *dd;
830 	uint64_t obj;
831 
832 	if (ds->ds_open_refcount != DS_REF_MAX) {
833 		if (dsl_dataset_tryupgrade(ds, DS_MODE_PRIMARY,
834 		    DS_MODE_EXCLUSIVE) == 0) {
835 			dsl_dataset_close(ds, DS_MODE_PRIMARY, tag);
836 			return (EBUSY);
837 		}
838 	}
839 
840 	if (dsl_dataset_is_snapshot(ds)) {
841 		/* Destroying a snapshot is simpler */
842 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
843 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
844 		    ds, tag, 0);
845 		goto out;
846 	}
847 
848 	dd = ds->ds_dir;
849 
850 	/*
851 	 * Check for errors and mark this ds as inconsistent, in
852 	 * case we crash while freeing the objects.
853 	 */
854 	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
855 	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
856 	if (err)
857 		goto out;
858 
859 	err = dmu_objset_open_ds(ds, DMU_OST_ANY, &os);
860 	if (err)
861 		goto out;
862 
863 	/*
864 	 * remove the objects in open context, so that we won't
865 	 * have too much to do in syncing context.
866 	 */
867 	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
868 	    ds->ds_phys->ds_prev_snap_txg)) {
869 		dmu_tx_t *tx = dmu_tx_create(os);
870 		dmu_tx_hold_free(tx, obj, 0, DMU_OBJECT_END);
871 		dmu_tx_hold_bonus(tx, obj);
872 		err = dmu_tx_assign(tx, TXG_WAIT);
873 		if (err) {
874 			/*
875 			 * Perhaps there is not enough disk
876 			 * space.  Just deal with it from
877 			 * dsl_dataset_destroy_sync().
878 			 */
879 			dmu_tx_abort(tx);
880 			continue;
881 		}
882 		VERIFY(0 == dmu_object_free(os, obj, tx));
883 		dmu_tx_commit(tx);
884 	}
885 	/* Make sure it's not dirty before we finish destroying it. */
886 	txg_wait_synced(dd->dd_pool, 0);
887 
888 	dmu_objset_close(os);
889 	if (err != ESRCH)
890 		goto out;
891 
892 	if (ds->ds_user_ptr) {
893 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
894 		ds->ds_user_ptr = NULL;
895 	}
896 
897 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
898 	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
899 	rw_exit(&dd->dd_pool->dp_config_rwlock);
900 
901 	if (err)
902 		goto out;
903 
904 	/*
905 	 * Blow away the dsl_dir + head dataset.
906 	 */
907 	dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
908 	dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
909 	    dsl_dataset_destroy_sync, ds, tag, 0);
910 	dsl_sync_task_create(dstg, dsl_dir_destroy_check,
911 	    dsl_dir_destroy_sync, dd, FTAG, 0);
912 	err = dsl_sync_task_group_wait(dstg);
913 	dsl_sync_task_group_destroy(dstg);
914 	/* if it is successful, *destroy_sync will close the ds+dd */
915 	if (err)
916 		dsl_dir_close(dd, FTAG);
917 out:
918 	if (err)
919 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
920 	return (err);
921 }
922 
923 int
924 dsl_dataset_rollback(dsl_dataset_t *ds, dmu_objset_type_t ost)
925 {
926 	ASSERT3U(ds->ds_open_refcount, ==, DS_REF_MAX);
927 
928 	return (dsl_sync_task_do(ds->ds_dir->dd_pool,
929 	    dsl_dataset_rollback_check, dsl_dataset_rollback_sync,
930 	    ds, &ost, 0));
931 }
932 
933 void *
934 dsl_dataset_set_user_ptr(dsl_dataset_t *ds,
935     void *p, dsl_dataset_evict_func_t func)
936 {
937 	void *old;
938 
939 	mutex_enter(&ds->ds_lock);
940 	old = ds->ds_user_ptr;
941 	if (old == NULL) {
942 		ds->ds_user_ptr = p;
943 		ds->ds_user_evict_func = func;
944 	}
945 	mutex_exit(&ds->ds_lock);
946 	return (old);
947 }
948 
949 void *
950 dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
951 {
952 	return (ds->ds_user_ptr);
953 }
954 
955 
956 blkptr_t *
957 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
958 {
959 	return (&ds->ds_phys->ds_bp);
960 }
961 
962 void
963 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
964 {
965 	ASSERT(dmu_tx_is_syncing(tx));
966 	/* If it's the meta-objset, set dp_meta_rootbp */
967 	if (ds == NULL) {
968 		tx->tx_pool->dp_meta_rootbp = *bp;
969 	} else {
970 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
971 		ds->ds_phys->ds_bp = *bp;
972 	}
973 }
974 
975 spa_t *
976 dsl_dataset_get_spa(dsl_dataset_t *ds)
977 {
978 	return (ds->ds_dir->dd_pool->dp_spa);
979 }
980 
981 void
982 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
983 {
984 	dsl_pool_t *dp;
985 
986 	if (ds == NULL) /* this is the meta-objset */
987 		return;
988 
989 	ASSERT(ds->ds_user_ptr != NULL);
990 
991 	if (ds->ds_phys->ds_next_snap_obj != 0)
992 		panic("dirtying snapshot!");
993 
994 	dp = ds->ds_dir->dd_pool;
995 
996 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
997 		/* up the hold count until we can be written out */
998 		dmu_buf_add_ref(ds->ds_dbuf, ds);
999 	}
1000 }
1001 
1002 /*
1003  * The unique space in the head dataset can be calculated by subtracting
1004  * the space used in the most recent snapshot, that is still being used
1005  * in this file system, from the space currently in use.  To figure out
1006  * the space in the most recent snapshot still in use, we need to take
1007  * the total space used in the snapshot and subtract out the space that
1008  * has been freed up since the snapshot was taken.
1009  */
1010 static void
1011 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
1012 {
1013 	uint64_t mrs_used;
1014 	uint64_t dlused, dlcomp, dluncomp;
1015 
1016 	ASSERT(ds->ds_object == ds->ds_dir->dd_phys->dd_head_dataset_obj);
1017 
1018 	if (ds->ds_phys->ds_prev_snap_obj != 0)
1019 		mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
1020 	else
1021 		mrs_used = 0;
1022 
1023 	VERIFY(0 == bplist_space(&ds->ds_deadlist, &dlused, &dlcomp,
1024 	    &dluncomp));
1025 
1026 	ASSERT3U(dlused, <=, mrs_used);
1027 	ds->ds_phys->ds_unique_bytes =
1028 	    ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
1029 
1030 	if (!DS_UNIQUE_IS_ACCURATE(ds) &&
1031 	    spa_version(ds->ds_dir->dd_pool->dp_spa) >=
1032 	    SPA_VERSION_UNIQUE_ACCURATE)
1033 		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1034 }
1035 
1036 static uint64_t
1037 dsl_dataset_unique(dsl_dataset_t *ds)
1038 {
1039 	if (!DS_UNIQUE_IS_ACCURATE(ds) && !dsl_dataset_is_snapshot(ds))
1040 		dsl_dataset_recalc_head_uniq(ds);
1041 
1042 	return (ds->ds_phys->ds_unique_bytes);
1043 }
1044 
1045 struct killarg {
1046 	int64_t *usedp;
1047 	int64_t *compressedp;
1048 	int64_t *uncompressedp;
1049 	zio_t *zio;
1050 	dmu_tx_t *tx;
1051 };
1052 
1053 static int
1054 kill_blkptr(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
1055 {
1056 	struct killarg *ka = arg;
1057 	blkptr_t *bp = &bc->bc_blkptr;
1058 
1059 	ASSERT3U(bc->bc_errno, ==, 0);
1060 
1061 	/*
1062 	 * Since this callback is not called concurrently, no lock is
1063 	 * needed on the accounting values.
1064 	 */
1065 	*ka->usedp += bp_get_dasize(spa, bp);
1066 	*ka->compressedp += BP_GET_PSIZE(bp);
1067 	*ka->uncompressedp += BP_GET_UCSIZE(bp);
1068 	/* XXX check for EIO? */
1069 	(void) arc_free(ka->zio, spa, ka->tx->tx_txg, bp, NULL, NULL,
1070 	    ARC_NOWAIT);
1071 	return (0);
1072 }
1073 
1074 /* ARGSUSED */
1075 static int
1076 dsl_dataset_rollback_check(void *arg1, void *arg2, dmu_tx_t *tx)
1077 {
1078 	dsl_dataset_t *ds = arg1;
1079 	dmu_objset_type_t *ost = arg2;
1080 
1081 	/*
1082 	 * We can only roll back to emptyness if it is a ZPL objset.
1083 	 */
1084 	if (*ost != DMU_OST_ZFS && ds->ds_phys->ds_prev_snap_txg == 0)
1085 		return (EINVAL);
1086 
1087 	/*
1088 	 * This must not be a snapshot.
1089 	 */
1090 	if (ds->ds_phys->ds_next_snap_obj != 0)
1091 		return (EINVAL);
1092 
1093 	/*
1094 	 * If we made changes this txg, traverse_dsl_dataset won't find
1095 	 * them.  Try again.
1096 	 */
1097 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1098 		return (EAGAIN);
1099 
1100 	return (0);
1101 }
1102 
1103 /* ARGSUSED */
1104 static void
1105 dsl_dataset_rollback_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1106 {
1107 	dsl_dataset_t *ds = arg1;
1108 	dmu_objset_type_t *ost = arg2;
1109 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1110 
1111 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1112 
1113 	/*
1114 	 * Before the roll back destroy the zil.
1115 	 */
1116 	if (ds->ds_user_ptr != NULL) {
1117 		zil_rollback_destroy(
1118 		    ((objset_impl_t *)ds->ds_user_ptr)->os_zil, tx);
1119 
1120 		/*
1121 		 * We need to make sure that the objset_impl_t is reopened after
1122 		 * we do the rollback, otherwise it will have the wrong
1123 		 * objset_phys_t.  Normally this would happen when this
1124 		 * DS_MODE_EXCLUSIVE dataset-open is closed, thus causing the
1125 		 * dataset to be immediately evicted.  But when doing "zfs recv
1126 		 * -F", we reopen the objset before that, so that there is no
1127 		 * window where the dataset is closed and inconsistent.
1128 		 */
1129 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
1130 		ds->ds_user_ptr = NULL;
1131 	}
1132 
1133 	/* Zero out the deadlist. */
1134 	bplist_close(&ds->ds_deadlist);
1135 	bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1136 	ds->ds_phys->ds_deadlist_obj =
1137 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1138 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1139 	    ds->ds_phys->ds_deadlist_obj));
1140 
1141 	{
1142 		/* Free blkptrs that we gave birth to */
1143 		zio_t *zio;
1144 		int64_t used = 0, compressed = 0, uncompressed = 0;
1145 		struct killarg ka;
1146 		int64_t delta;
1147 
1148 		zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL,
1149 		    ZIO_FLAG_MUSTSUCCEED);
1150 		ka.usedp = &used;
1151 		ka.compressedp = &compressed;
1152 		ka.uncompressedp = &uncompressed;
1153 		ka.zio = zio;
1154 		ka.tx = tx;
1155 		(void) traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1156 		    ADVANCE_POST, kill_blkptr, &ka);
1157 		(void) zio_wait(zio);
1158 
1159 		/* only deduct space beyond any refreservation */
1160 		delta = parent_delta(ds, -used);
1161 		dsl_dir_diduse_space(ds->ds_dir,
1162 		    delta, -compressed, -uncompressed, tx);
1163 	}
1164 
1165 	if (ds->ds_prev) {
1166 		/* Change our contents to that of the prev snapshot */
1167 		ASSERT3U(ds->ds_prev->ds_object, ==,
1168 		    ds->ds_phys->ds_prev_snap_obj);
1169 		ds->ds_phys->ds_bp = ds->ds_prev->ds_phys->ds_bp;
1170 		ds->ds_phys->ds_used_bytes =
1171 		    ds->ds_prev->ds_phys->ds_used_bytes;
1172 		ds->ds_phys->ds_compressed_bytes =
1173 		    ds->ds_prev->ds_phys->ds_compressed_bytes;
1174 		ds->ds_phys->ds_uncompressed_bytes =
1175 		    ds->ds_prev->ds_phys->ds_uncompressed_bytes;
1176 		ds->ds_phys->ds_flags = ds->ds_prev->ds_phys->ds_flags;
1177 		ds->ds_phys->ds_unique_bytes = 0;
1178 
1179 		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1180 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
1181 			ds->ds_prev->ds_phys->ds_unique_bytes = 0;
1182 		}
1183 	} else {
1184 		/* Zero out our contents, recreate objset */
1185 		bzero(&ds->ds_phys->ds_bp, sizeof (blkptr_t));
1186 		ds->ds_phys->ds_used_bytes = 0;
1187 		ds->ds_phys->ds_compressed_bytes = 0;
1188 		ds->ds_phys->ds_uncompressed_bytes = 0;
1189 		ds->ds_phys->ds_flags = 0;
1190 		ds->ds_phys->ds_unique_bytes = 0;
1191 		(void) dmu_objset_create_impl(ds->ds_dir->dd_pool->dp_spa, ds,
1192 		    &ds->ds_phys->ds_bp, *ost, tx);
1193 	}
1194 
1195 	spa_history_internal_log(LOG_DS_ROLLBACK, ds->ds_dir->dd_pool->dp_spa,
1196 	    tx, cr, "dataset = %llu", ds->ds_object);
1197 }
1198 
1199 /* ARGSUSED */
1200 static int
1201 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
1202 {
1203 	dsl_dataset_t *ds = arg1;
1204 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1205 	uint64_t count;
1206 	int err;
1207 
1208 	/*
1209 	 * Can't delete a head dataset if there are snapshots of it.
1210 	 * (Except if the only snapshots are from the branch we cloned
1211 	 * from.)
1212 	 */
1213 	if (ds->ds_prev != NULL &&
1214 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1215 		return (EINVAL);
1216 
1217 	/*
1218 	 * This is really a dsl_dir thing, but check it here so that
1219 	 * we'll be less likely to leave this dataset inconsistent &
1220 	 * nearly destroyed.
1221 	 */
1222 	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
1223 	if (err)
1224 		return (err);
1225 	if (count != 0)
1226 		return (EEXIST);
1227 
1228 	return (0);
1229 }
1230 
1231 /* ARGSUSED */
1232 static void
1233 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1234 {
1235 	dsl_dataset_t *ds = arg1;
1236 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1237 
1238 	/* Mark it as inconsistent on-disk, in case we crash */
1239 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1240 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1241 
1242 	spa_history_internal_log(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
1243 	    cr, "dataset = %llu", ds->ds_object);
1244 }
1245 
1246 /* ARGSUSED */
1247 int
1248 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1249 {
1250 	dsl_dataset_t *ds = arg1;
1251 
1252 	/* Can't delete a branch point. */
1253 	if (ds->ds_phys->ds_num_children > 1)
1254 		return (EEXIST);
1255 
1256 	/*
1257 	 * Can't delete a head dataset if there are snapshots of it.
1258 	 * (Except if the only snapshots are from the branch we cloned
1259 	 * from.)
1260 	 */
1261 	if (ds->ds_prev != NULL &&
1262 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1263 		return (EINVAL);
1264 
1265 	/*
1266 	 * If we made changes this txg, traverse_dsl_dataset won't find
1267 	 * them.  Try again.
1268 	 */
1269 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1270 		return (EAGAIN);
1271 
1272 	/* XXX we should do some i/o error checking... */
1273 	return (0);
1274 }
1275 
1276 void
1277 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
1278 {
1279 	dsl_dataset_t *ds = arg1;
1280 	int64_t used = 0, compressed = 0, uncompressed = 0;
1281 	zio_t *zio;
1282 	int err;
1283 	int after_branch_point = FALSE;
1284 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1285 	objset_t *mos = dp->dp_meta_objset;
1286 	dsl_dataset_t *ds_prev = NULL;
1287 	uint64_t obj;
1288 
1289 	ASSERT3U(ds->ds_open_refcount, ==, DS_REF_MAX);
1290 	ASSERT3U(ds->ds_phys->ds_num_children, <=, 1);
1291 	ASSERT(ds->ds_prev == NULL ||
1292 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1293 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1294 
1295 	/* Remove our reservation */
1296 	if (ds->ds_reserved != 0) {
1297 		uint64_t val = 0;
1298 		dsl_dataset_set_reservation_sync(ds, &val, cr, tx);
1299 		ASSERT3U(ds->ds_reserved, ==, 0);
1300 	}
1301 
1302 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1303 
1304 	obj = ds->ds_object;
1305 
1306 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1307 		if (ds->ds_prev) {
1308 			ds_prev = ds->ds_prev;
1309 		} else {
1310 			VERIFY(0 == dsl_dataset_open_obj(dp,
1311 			    ds->ds_phys->ds_prev_snap_obj, NULL,
1312 			    DS_MODE_NONE, FTAG, &ds_prev));
1313 		}
1314 		after_branch_point =
1315 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1316 
1317 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1318 		if (after_branch_point &&
1319 		    ds->ds_phys->ds_next_snap_obj == 0) {
1320 			/* This clone is toast. */
1321 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1322 			ds_prev->ds_phys->ds_num_children--;
1323 		} else if (!after_branch_point) {
1324 			ds_prev->ds_phys->ds_next_snap_obj =
1325 			    ds->ds_phys->ds_next_snap_obj;
1326 		}
1327 	}
1328 
1329 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1330 
1331 	if (ds->ds_phys->ds_next_snap_obj != 0) {
1332 		blkptr_t bp;
1333 		dsl_dataset_t *ds_next;
1334 		uint64_t itor = 0;
1335 		uint64_t old_unique;
1336 
1337 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1338 
1339 		VERIFY(0 == dsl_dataset_open_obj(dp,
1340 		    ds->ds_phys->ds_next_snap_obj, NULL,
1341 		    DS_MODE_NONE, FTAG, &ds_next));
1342 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1343 
1344 		old_unique = dsl_dataset_unique(ds_next);
1345 
1346 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1347 		ds_next->ds_phys->ds_prev_snap_obj =
1348 		    ds->ds_phys->ds_prev_snap_obj;
1349 		ds_next->ds_phys->ds_prev_snap_txg =
1350 		    ds->ds_phys->ds_prev_snap_txg;
1351 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1352 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1353 
1354 		/*
1355 		 * Transfer to our deadlist (which will become next's
1356 		 * new deadlist) any entries from next's current
1357 		 * deadlist which were born before prev, and free the
1358 		 * other entries.
1359 		 *
1360 		 * XXX we're doing this long task with the config lock held
1361 		 */
1362 		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
1363 		    &bp) == 0) {
1364 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1365 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1366 				    &bp, tx));
1367 				if (ds_prev && !after_branch_point &&
1368 				    bp.blk_birth >
1369 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1370 					ds_prev->ds_phys->ds_unique_bytes +=
1371 					    bp_get_dasize(dp->dp_spa, &bp);
1372 				}
1373 			} else {
1374 				used += bp_get_dasize(dp->dp_spa, &bp);
1375 				compressed += BP_GET_PSIZE(&bp);
1376 				uncompressed += BP_GET_UCSIZE(&bp);
1377 				/* XXX check return value? */
1378 				(void) arc_free(zio, dp->dp_spa, tx->tx_txg,
1379 				    &bp, NULL, NULL, ARC_NOWAIT);
1380 			}
1381 		}
1382 
1383 		/* free next's deadlist */
1384 		bplist_close(&ds_next->ds_deadlist);
1385 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1386 
1387 		/* set next's deadlist to our deadlist */
1388 		ds_next->ds_phys->ds_deadlist_obj =
1389 		    ds->ds_phys->ds_deadlist_obj;
1390 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1391 		    ds_next->ds_phys->ds_deadlist_obj));
1392 		ds->ds_phys->ds_deadlist_obj = 0;
1393 
1394 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1395 			/*
1396 			 * Update next's unique to include blocks which
1397 			 * were previously shared by only this snapshot
1398 			 * and it.  Those blocks will be born after the
1399 			 * prev snap and before this snap, and will have
1400 			 * died after the next snap and before the one
1401 			 * after that (ie. be on the snap after next's
1402 			 * deadlist).
1403 			 *
1404 			 * XXX we're doing this long task with the
1405 			 * config lock held
1406 			 */
1407 			dsl_dataset_t *ds_after_next;
1408 
1409 			VERIFY(0 == dsl_dataset_open_obj(dp,
1410 			    ds_next->ds_phys->ds_next_snap_obj, NULL,
1411 			    DS_MODE_NONE, FTAG, &ds_after_next));
1412 			itor = 0;
1413 			while (bplist_iterate(&ds_after_next->ds_deadlist,
1414 			    &itor, &bp) == 0) {
1415 				if (bp.blk_birth >
1416 				    ds->ds_phys->ds_prev_snap_txg &&
1417 				    bp.blk_birth <=
1418 				    ds->ds_phys->ds_creation_txg) {
1419 					ds_next->ds_phys->ds_unique_bytes +=
1420 					    bp_get_dasize(dp->dp_spa, &bp);
1421 				}
1422 			}
1423 
1424 			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
1425 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1426 		} else {
1427 			ASSERT3P(ds_next->ds_prev, ==, ds);
1428 			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
1429 			    ds_next);
1430 			if (ds_prev) {
1431 				VERIFY(0 == dsl_dataset_open_obj(dp,
1432 				    ds->ds_phys->ds_prev_snap_obj, NULL,
1433 				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
1434 			} else {
1435 				ds_next->ds_prev = NULL;
1436 			}
1437 
1438 			dsl_dataset_recalc_head_uniq(ds_next);
1439 
1440 			/*
1441 			 * Reduce the amount of our unconsmed refreservation
1442 			 * being charged to our parent by the amount of
1443 			 * new unique data we have gained.
1444 			 */
1445 			if (old_unique < ds_next->ds_reserved) {
1446 				int64_t mrsdelta;
1447 				uint64_t new_unique =
1448 				    ds_next->ds_phys->ds_unique_bytes;
1449 
1450 				ASSERT(old_unique <= new_unique);
1451 				mrsdelta = MIN(new_unique - old_unique,
1452 				    ds_next->ds_reserved - old_unique);
1453 				dsl_dir_diduse_space(ds->ds_dir, -mrsdelta,
1454 				    0, 0, tx);
1455 			}
1456 		}
1457 		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
1458 
1459 		/*
1460 		 * NB: unique_bytes might not be accurate for the head objset.
1461 		 * Before SPA_VERSION 9, we didn't update its value when we
1462 		 * deleted the most recent snapshot.
1463 		 */
1464 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1465 	} else {
1466 		/*
1467 		 * There's no next snapshot, so this is a head dataset.
1468 		 * Destroy the deadlist.  Unless it's a clone, the
1469 		 * deadlist should be empty.  (If it's a clone, it's
1470 		 * safe to ignore the deadlist contents.)
1471 		 */
1472 		struct killarg ka;
1473 
1474 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1475 		bplist_close(&ds->ds_deadlist);
1476 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1477 		ds->ds_phys->ds_deadlist_obj = 0;
1478 
1479 		/*
1480 		 * Free everything that we point to (that's born after
1481 		 * the previous snapshot, if we are a clone)
1482 		 *
1483 		 * XXX we're doing this long task with the config lock held
1484 		 */
1485 		ka.usedp = &used;
1486 		ka.compressedp = &compressed;
1487 		ka.uncompressedp = &uncompressed;
1488 		ka.zio = zio;
1489 		ka.tx = tx;
1490 		err = traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1491 		    ADVANCE_POST, kill_blkptr, &ka);
1492 		ASSERT3U(err, ==, 0);
1493 		ASSERT(spa_version(dp->dp_spa) <
1494 		    SPA_VERSION_UNIQUE_ACCURATE ||
1495 		    used == ds->ds_phys->ds_unique_bytes);
1496 	}
1497 
1498 	err = zio_wait(zio);
1499 	ASSERT3U(err, ==, 0);
1500 
1501 	dsl_dir_diduse_space(ds->ds_dir, -used, -compressed, -uncompressed, tx);
1502 
1503 	if (ds->ds_phys->ds_snapnames_zapobj) {
1504 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1505 		ASSERT(err == 0);
1506 	}
1507 
1508 	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1509 		/* Erase the link in the dataset */
1510 		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1511 		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1512 		/*
1513 		 * dsl_dir_sync_destroy() called us, they'll destroy
1514 		 * the dataset.
1515 		 */
1516 	} else {
1517 		/* remove from snapshot namespace */
1518 		dsl_dataset_t *ds_head;
1519 		VERIFY(0 == dsl_dataset_open_obj(dp,
1520 		    ds->ds_dir->dd_phys->dd_head_dataset_obj, NULL,
1521 		    DS_MODE_NONE, FTAG, &ds_head));
1522 		VERIFY(0 == dsl_dataset_get_snapname(ds));
1523 #ifdef ZFS_DEBUG
1524 		{
1525 			uint64_t val;
1526 			err = zap_lookup(mos,
1527 			    ds_head->ds_phys->ds_snapnames_zapobj,
1528 			    ds->ds_snapname, 8, 1, &val);
1529 			ASSERT3U(err, ==, 0);
1530 			ASSERT3U(val, ==, obj);
1531 		}
1532 #endif
1533 		err = zap_remove(mos, ds_head->ds_phys->ds_snapnames_zapobj,
1534 		    ds->ds_snapname, tx);
1535 		ASSERT(err == 0);
1536 		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
1537 	}
1538 
1539 	if (ds_prev && ds->ds_prev != ds_prev)
1540 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1541 
1542 	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
1543 	spa_history_internal_log(LOG_DS_DESTROY, dp->dp_spa, tx,
1544 	    cr, "dataset = %llu", ds->ds_object);
1545 
1546 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
1547 	VERIFY(0 == dmu_object_free(mos, obj, tx));
1548 
1549 }
1550 
1551 static int
1552 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
1553 {
1554 	uint64_t asize;
1555 
1556 	if (!dmu_tx_is_syncing(tx))
1557 		return (0);
1558 
1559 	/*
1560 	 * If there's an fs-only reservation, any blocks that might become
1561 	 * owned by the snapshot dataset must be accommodated by space
1562 	 * outside of the reservation.
1563 	 */
1564 	asize = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
1565 	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, FALSE))
1566 		return (ENOSPC);
1567 
1568 	/*
1569 	 * Propogate any reserved space for this snapshot to other
1570 	 * snapshot checks in this sync group.
1571 	 */
1572 	if (asize > 0)
1573 		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
1574 
1575 	return (0);
1576 }
1577 
1578 /* ARGSUSED */
1579 int
1580 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
1581 {
1582 	dsl_dataset_t *ds = arg1;
1583 	const char *snapname = arg2;
1584 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1585 	int err;
1586 	uint64_t value;
1587 
1588 	/*
1589 	 * We don't allow multiple snapshots of the same txg.  If there
1590 	 * is already one, try again.
1591 	 */
1592 	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
1593 		return (EAGAIN);
1594 
1595 	/*
1596 	 * Check for conflicting name snapshot name.
1597 	 */
1598 	err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1599 	    snapname, 8, 1, &value);
1600 	if (err == 0)
1601 		return (EEXIST);
1602 	if (err != ENOENT)
1603 		return (err);
1604 
1605 	/*
1606 	 * Check that the dataset's name is not too long.  Name consists
1607 	 * of the dataset's length + 1 for the @-sign + snapshot name's length
1608 	 */
1609 	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
1610 		return (ENAMETOOLONG);
1611 
1612 	err = dsl_dataset_snapshot_reserve_space(ds, tx);
1613 	if (err)
1614 		return (err);
1615 
1616 	ds->ds_trysnap_txg = tx->tx_txg;
1617 	return (0);
1618 }
1619 
1620 void
1621 dsl_dataset_snapshot_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
1622 {
1623 	dsl_dataset_t *ds = arg1;
1624 	const char *snapname = arg2;
1625 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1626 	dmu_buf_t *dbuf;
1627 	dsl_dataset_phys_t *dsphys;
1628 	uint64_t dsobj;
1629 	objset_t *mos = dp->dp_meta_objset;
1630 	int err;
1631 
1632 	spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1633 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1634 
1635 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1636 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1637 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1638 	dmu_buf_will_dirty(dbuf, tx);
1639 	dsphys = dbuf->db_data;
1640 	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
1641 	dsphys->ds_fsid_guid = unique_create();
1642 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1643 	    sizeof (dsphys->ds_guid));
1644 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1645 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1646 	dsphys->ds_next_snap_obj = ds->ds_object;
1647 	dsphys->ds_num_children = 1;
1648 	dsphys->ds_creation_time = gethrestime_sec();
1649 	dsphys->ds_creation_txg = tx->tx_txg;
1650 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1651 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1652 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1653 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1654 	dsphys->ds_flags = ds->ds_phys->ds_flags;
1655 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1656 	dmu_buf_rele(dbuf, FTAG);
1657 
1658 	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
1659 	if (ds->ds_prev) {
1660 		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
1661 		    ds->ds_object ||
1662 		    ds->ds_prev->ds_phys->ds_num_children > 1);
1663 		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1664 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
1665 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1666 			    ds->ds_prev->ds_phys->ds_creation_txg);
1667 			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1668 		}
1669 	}
1670 
1671 	/*
1672 	 * If we have a reference-reservation on this dataset, we will
1673 	 * need to increase the amount of refreservation being charged
1674 	 * since our unique space is going to zero.
1675 	 */
1676 	if (ds->ds_reserved) {
1677 		int64_t add = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
1678 		dsl_dir_diduse_space(ds->ds_dir, add, 0, 0, tx);
1679 	}
1680 
1681 	bplist_close(&ds->ds_deadlist);
1682 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1683 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, dsphys->ds_creation_txg);
1684 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1685 	ds->ds_phys->ds_prev_snap_txg = dsphys->ds_creation_txg;
1686 	ds->ds_phys->ds_unique_bytes = 0;
1687 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
1688 		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
1689 	ds->ds_phys->ds_deadlist_obj =
1690 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1691 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1692 	    ds->ds_phys->ds_deadlist_obj));
1693 
1694 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1695 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1696 	    snapname, 8, 1, &dsobj, tx);
1697 	ASSERT(err == 0);
1698 
1699 	if (ds->ds_prev)
1700 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
1701 	VERIFY(0 == dsl_dataset_open_obj(dp,
1702 	    ds->ds_phys->ds_prev_snap_obj, snapname,
1703 	    DS_MODE_NONE, ds, &ds->ds_prev));
1704 
1705 	spa_history_internal_log(LOG_DS_SNAPSHOT, dp->dp_spa, tx, cr,
1706 	    "dataset = %llu", dsobj);
1707 }
1708 
1709 void
1710 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
1711 {
1712 	ASSERT(dmu_tx_is_syncing(tx));
1713 	ASSERT(ds->ds_user_ptr != NULL);
1714 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1715 
1716 	/*
1717 	 * in case we had to change ds_fsid_guid when we opened it,
1718 	 * sync it out now.
1719 	 */
1720 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1721 	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
1722 
1723 	dsl_dir_dirty(ds->ds_dir, tx);
1724 	dmu_objset_sync(ds->ds_user_ptr, zio, tx);
1725 }
1726 
1727 void
1728 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
1729 {
1730 	uint64_t refd, avail, uobjs, aobjs;
1731 
1732 	dsl_dir_stats(ds->ds_dir, nv);
1733 
1734 	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
1735 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
1736 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
1737 
1738 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
1739 	    ds->ds_phys->ds_creation_time);
1740 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
1741 	    ds->ds_phys->ds_creation_txg);
1742 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
1743 	    ds->ds_quota);
1744 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
1745 	    ds->ds_reserved);
1746 
1747 	if (ds->ds_phys->ds_next_snap_obj) {
1748 		/*
1749 		 * This is a snapshot; override the dd's space used with
1750 		 * our unique space and compression ratio.
1751 		 */
1752 		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
1753 		    ds->ds_phys->ds_unique_bytes);
1754 		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO,
1755 		    ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
1756 		    (ds->ds_phys->ds_uncompressed_bytes * 100 /
1757 		    ds->ds_phys->ds_compressed_bytes));
1758 	}
1759 }
1760 
1761 void
1762 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
1763 {
1764 	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
1765 	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
1766 	stat->dds_guid = ds->ds_phys->ds_guid;
1767 	if (ds->ds_phys->ds_next_snap_obj) {
1768 		stat->dds_is_snapshot = B_TRUE;
1769 		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
1770 	}
1771 
1772 	/* clone origin is really a dsl_dir thing... */
1773 	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
1774 	if (ds->ds_dir->dd_phys->dd_origin_obj) {
1775 		dsl_dataset_t *ods;
1776 
1777 		VERIFY(0 == dsl_dataset_open_obj(ds->ds_dir->dd_pool,
1778 		    ds->ds_dir->dd_phys->dd_origin_obj,
1779 		    NULL, DS_MODE_NONE, FTAG, &ods));
1780 		dsl_dataset_name(ods, stat->dds_origin);
1781 		dsl_dataset_close(ods, DS_MODE_NONE, FTAG);
1782 	}
1783 	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
1784 }
1785 
1786 uint64_t
1787 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
1788 {
1789 	return (ds->ds_fsid_guid);
1790 }
1791 
1792 void
1793 dsl_dataset_space(dsl_dataset_t *ds,
1794     uint64_t *refdbytesp, uint64_t *availbytesp,
1795     uint64_t *usedobjsp, uint64_t *availobjsp)
1796 {
1797 	*refdbytesp = ds->ds_phys->ds_used_bytes;
1798 	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
1799 	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
1800 		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
1801 	if (ds->ds_quota != 0) {
1802 		/*
1803 		 * Adjust available bytes according to refquota
1804 		 */
1805 		if (*refdbytesp < ds->ds_quota)
1806 			*availbytesp = MIN(*availbytesp,
1807 			    ds->ds_quota - *refdbytesp);
1808 		else
1809 			*availbytesp = 0;
1810 	}
1811 	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
1812 	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
1813 }
1814 
1815 boolean_t
1816 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
1817 {
1818 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1819 
1820 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
1821 	    dsl_pool_sync_context(dp));
1822 	if (ds->ds_prev == NULL)
1823 		return (B_FALSE);
1824 	if (ds->ds_phys->ds_bp.blk_birth >
1825 	    ds->ds_prev->ds_phys->ds_creation_txg)
1826 		return (B_TRUE);
1827 	return (B_FALSE);
1828 }
1829 
1830 /* ARGSUSED */
1831 static int
1832 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
1833 {
1834 	dsl_dataset_t *ds = arg1;
1835 	char *newsnapname = arg2;
1836 	dsl_dir_t *dd = ds->ds_dir;
1837 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1838 	dsl_dataset_t *hds;
1839 	uint64_t val;
1840 	int err;
1841 
1842 	err = dsl_dataset_open_obj(dd->dd_pool,
1843 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds);
1844 	if (err)
1845 		return (err);
1846 
1847 	/* new name better not be in use */
1848 	err = zap_lookup(mos, hds->ds_phys->ds_snapnames_zapobj,
1849 	    newsnapname, 8, 1, &val);
1850 	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
1851 
1852 	if (err == 0)
1853 		err = EEXIST;
1854 	else if (err == ENOENT)
1855 		err = 0;
1856 
1857 	/* dataset name + 1 for the "@" + the new snapshot name must fit */
1858 	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
1859 		err = ENAMETOOLONG;
1860 
1861 	return (err);
1862 }
1863 
1864 static void
1865 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2,
1866     cred_t *cr, dmu_tx_t *tx)
1867 {
1868 	dsl_dataset_t *ds = arg1;
1869 	const char *newsnapname = arg2;
1870 	dsl_dir_t *dd = ds->ds_dir;
1871 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1872 	dsl_dataset_t *hds;
1873 	int err;
1874 
1875 	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
1876 
1877 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1878 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds));
1879 
1880 	VERIFY(0 == dsl_dataset_get_snapname(ds));
1881 	err = zap_remove(mos, hds->ds_phys->ds_snapnames_zapobj,
1882 	    ds->ds_snapname, tx);
1883 	ASSERT3U(err, ==, 0);
1884 	mutex_enter(&ds->ds_lock);
1885 	(void) strcpy(ds->ds_snapname, newsnapname);
1886 	mutex_exit(&ds->ds_lock);
1887 	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
1888 	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
1889 	ASSERT3U(err, ==, 0);
1890 
1891 	spa_history_internal_log(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
1892 	    cr, "dataset = %llu", ds->ds_object);
1893 	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
1894 }
1895 
1896 struct renamesnaparg {
1897 	dsl_sync_task_group_t *dstg;
1898 	char failed[MAXPATHLEN];
1899 	char *oldsnap;
1900 	char *newsnap;
1901 };
1902 
1903 static int
1904 dsl_snapshot_rename_one(char *name, void *arg)
1905 {
1906 	struct renamesnaparg *ra = arg;
1907 	dsl_dataset_t *ds = NULL;
1908 	char *cp;
1909 	int err;
1910 
1911 	cp = name + strlen(name);
1912 	*cp = '@';
1913 	(void) strcpy(cp + 1, ra->oldsnap);
1914 
1915 	/*
1916 	 * For recursive snapshot renames the parent won't be changing
1917 	 * so we just pass name for both the to/from argument.
1918 	 */
1919 	if (err = zfs_secpolicy_rename_perms(name, name, CRED())) {
1920 		(void) strcpy(ra->failed, name);
1921 		return (err);
1922 	}
1923 
1924 	err = dsl_dataset_open(name, DS_MODE_READONLY | DS_MODE_STANDARD,
1925 	    ra->dstg, &ds);
1926 	if (err == ENOENT) {
1927 		*cp = '\0';
1928 		return (0);
1929 	}
1930 	if (err) {
1931 		(void) strcpy(ra->failed, name);
1932 		*cp = '\0';
1933 		dsl_dataset_close(ds, DS_MODE_STANDARD, ra->dstg);
1934 		return (err);
1935 	}
1936 
1937 #ifdef _KERNEL
1938 	/* for all filesystems undergoing rename, we'll need to unmount it */
1939 	(void) zfs_unmount_snap(name, NULL);
1940 #endif
1941 
1942 	*cp = '\0';
1943 
1944 	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
1945 	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
1946 
1947 	return (0);
1948 }
1949 
1950 static int
1951 dsl_recursive_rename(char *oldname, const char *newname)
1952 {
1953 	int err;
1954 	struct renamesnaparg *ra;
1955 	dsl_sync_task_t *dst;
1956 	spa_t *spa;
1957 	char *cp, *fsname = spa_strdup(oldname);
1958 	int len = strlen(oldname);
1959 
1960 	/* truncate the snapshot name to get the fsname */
1961 	cp = strchr(fsname, '@');
1962 	*cp = '\0';
1963 
1964 	err = spa_open(fsname, &spa, FTAG);
1965 	if (err) {
1966 		kmem_free(fsname, len + 1);
1967 		return (err);
1968 	}
1969 	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
1970 	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
1971 
1972 	ra->oldsnap = strchr(oldname, '@') + 1;
1973 	ra->newsnap = strchr(newname, '@') + 1;
1974 	*ra->failed = '\0';
1975 
1976 	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
1977 	    DS_FIND_CHILDREN);
1978 	kmem_free(fsname, len + 1);
1979 
1980 	if (err == 0) {
1981 		err = dsl_sync_task_group_wait(ra->dstg);
1982 	}
1983 
1984 	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
1985 	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
1986 		dsl_dataset_t *ds = dst->dst_arg1;
1987 		if (dst->dst_err) {
1988 			dsl_dir_name(ds->ds_dir, ra->failed);
1989 			(void) strcat(ra->failed, "@");
1990 			(void) strcat(ra->failed, ra->newsnap);
1991 		}
1992 		dsl_dataset_close(ds, DS_MODE_STANDARD, ra->dstg);
1993 	}
1994 
1995 	if (err)
1996 		(void) strcpy(oldname, ra->failed);
1997 
1998 	dsl_sync_task_group_destroy(ra->dstg);
1999 	kmem_free(ra, sizeof (struct renamesnaparg));
2000 	spa_close(spa, FTAG);
2001 	return (err);
2002 }
2003 
2004 static int
2005 dsl_valid_rename(char *oldname, void *arg)
2006 {
2007 	int delta = *(int *)arg;
2008 
2009 	if (strlen(oldname) + delta >= MAXNAMELEN)
2010 		return (ENAMETOOLONG);
2011 
2012 	return (0);
2013 }
2014 
2015 #pragma weak dmu_objset_rename = dsl_dataset_rename
2016 int
2017 dsl_dataset_rename(char *oldname, const char *newname,
2018     boolean_t recursive)
2019 {
2020 	dsl_dir_t *dd;
2021 	dsl_dataset_t *ds;
2022 	const char *tail;
2023 	int err;
2024 
2025 	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
2026 	if (err)
2027 		return (err);
2028 	if (tail == NULL) {
2029 		int delta = strlen(newname) - strlen(oldname);
2030 
2031 		/* if we're growing, validate child size lengths */
2032 		if (delta > 0)
2033 			err = dmu_objset_find(oldname, dsl_valid_rename,
2034 			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
2035 
2036 		if (!err)
2037 			err = dsl_dir_rename(dd, newname);
2038 		dsl_dir_close(dd, FTAG);
2039 		return (err);
2040 	}
2041 	if (tail[0] != '@') {
2042 		/* the name ended in a nonexistant component */
2043 		dsl_dir_close(dd, FTAG);
2044 		return (ENOENT);
2045 	}
2046 
2047 	dsl_dir_close(dd, FTAG);
2048 
2049 	/* new name must be snapshot in same filesystem */
2050 	tail = strchr(newname, '@');
2051 	if (tail == NULL)
2052 		return (EINVAL);
2053 	tail++;
2054 	if (strncmp(oldname, newname, tail - newname) != 0)
2055 		return (EXDEV);
2056 
2057 	if (recursive) {
2058 		err = dsl_recursive_rename(oldname, newname);
2059 	} else {
2060 		err = dsl_dataset_open(oldname,
2061 		    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &ds);
2062 		if (err)
2063 			return (err);
2064 
2065 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2066 		    dsl_dataset_snapshot_rename_check,
2067 		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
2068 
2069 		dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
2070 	}
2071 
2072 	return (err);
2073 }
2074 
2075 struct promotearg {
2076 	uint64_t used, comp, uncomp, unique;
2077 	uint64_t newnext_obj, snapnames_obj;
2078 };
2079 
2080 /* ARGSUSED */
2081 static int
2082 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
2083 {
2084 	dsl_dataset_t *hds = arg1;
2085 	struct promotearg *pa = arg2;
2086 	dsl_dir_t *dd = hds->ds_dir;
2087 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2088 	dsl_dir_t *odd = NULL;
2089 	dsl_dataset_t *ds = NULL;
2090 	dsl_dataset_t *origin_ds = NULL;
2091 	dsl_dataset_t *newnext_ds = NULL;
2092 	int err;
2093 	char *name = NULL;
2094 	uint64_t itor = 0;
2095 	blkptr_t bp;
2096 
2097 	bzero(pa, sizeof (*pa));
2098 
2099 	/* Check that it is a clone */
2100 	if (dd->dd_phys->dd_origin_obj == 0)
2101 		return (EINVAL);
2102 
2103 	/* Since this is so expensive, don't do the preliminary check */
2104 	if (!dmu_tx_is_syncing(tx))
2105 		return (0);
2106 
2107 	if (err = dsl_dataset_open_obj(dp, dd->dd_phys->dd_origin_obj,
2108 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &origin_ds))
2109 		goto out;
2110 	odd = origin_ds->ds_dir;
2111 
2112 	{
2113 		dsl_dataset_t *phds;
2114 		if (err = dsl_dataset_open_obj(dd->dd_pool,
2115 		    odd->dd_phys->dd_head_dataset_obj,
2116 		    NULL, DS_MODE_NONE, FTAG, &phds))
2117 			goto out;
2118 		pa->snapnames_obj = phds->ds_phys->ds_snapnames_zapobj;
2119 		dsl_dataset_close(phds, DS_MODE_NONE, FTAG);
2120 	}
2121 
2122 	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE) {
2123 		err = EXDEV;
2124 		goto out;
2125 	}
2126 
2127 	/* find origin's new next ds */
2128 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, hds->ds_object,
2129 	    NULL, DS_MODE_NONE, FTAG, &newnext_ds));
2130 	while (newnext_ds->ds_phys->ds_prev_snap_obj != origin_ds->ds_object) {
2131 		dsl_dataset_t *prev;
2132 
2133 		if (err = dsl_dataset_open_obj(dd->dd_pool,
2134 		    newnext_ds->ds_phys->ds_prev_snap_obj,
2135 		    NULL, DS_MODE_NONE, FTAG, &prev))
2136 			goto out;
2137 		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
2138 		newnext_ds = prev;
2139 	}
2140 	pa->newnext_obj = newnext_ds->ds_object;
2141 
2142 	/* compute origin's new unique space */
2143 	while ((err = bplist_iterate(&newnext_ds->ds_deadlist,
2144 	    &itor, &bp)) == 0) {
2145 		if (bp.blk_birth > origin_ds->ds_phys->ds_prev_snap_txg)
2146 			pa->unique += bp_get_dasize(dd->dd_pool->dp_spa, &bp);
2147 	}
2148 	if (err != ENOENT)
2149 		goto out;
2150 
2151 	/* Walk the snapshots that we are moving */
2152 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
2153 	ds = origin_ds;
2154 	/* CONSTCOND */
2155 	while (TRUE) {
2156 		uint64_t val, dlused, dlcomp, dluncomp;
2157 		dsl_dataset_t *prev;
2158 
2159 		/* Check that the snapshot name does not conflict */
2160 		dsl_dataset_name(ds, name);
2161 		err = zap_lookup(dd->dd_pool->dp_meta_objset,
2162 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2163 		    8, 1, &val);
2164 		if (err != ENOENT) {
2165 			if (err == 0)
2166 				err = EEXIST;
2167 			goto out;
2168 		}
2169 
2170 		/*
2171 		 * compute space to transfer.  Each snapshot gave birth to:
2172 		 * (my used) - (prev's used) + (deadlist's used)
2173 		 */
2174 		pa->used += ds->ds_phys->ds_used_bytes;
2175 		pa->comp += ds->ds_phys->ds_compressed_bytes;
2176 		pa->uncomp += ds->ds_phys->ds_uncompressed_bytes;
2177 
2178 		/* If we reach the first snapshot, we're done. */
2179 		if (ds->ds_phys->ds_prev_snap_obj == 0)
2180 			break;
2181 
2182 		if (err = bplist_space(&ds->ds_deadlist,
2183 		    &dlused, &dlcomp, &dluncomp))
2184 			goto out;
2185 		if (err = dsl_dataset_open_obj(dd->dd_pool,
2186 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
2187 		    FTAG, &prev))
2188 			goto out;
2189 		pa->used += dlused - prev->ds_phys->ds_used_bytes;
2190 		pa->comp += dlcomp - prev->ds_phys->ds_compressed_bytes;
2191 		pa->uncomp += dluncomp - prev->ds_phys->ds_uncompressed_bytes;
2192 
2193 		/*
2194 		 * We could be a clone of a clone.  If we reach our
2195 		 * parent's branch point, we're done.
2196 		 */
2197 		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
2198 			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
2199 			break;
2200 		}
2201 		if (ds != origin_ds)
2202 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
2203 		ds = prev;
2204 	}
2205 
2206 	/* Check that there is enough space here */
2207 	err = dsl_dir_transfer_possible(odd, dd, pa->used);
2208 
2209 out:
2210 	if (ds && ds != origin_ds)
2211 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
2212 	if (origin_ds)
2213 		dsl_dataset_close(origin_ds, DS_MODE_EXCLUSIVE, FTAG);
2214 	if (newnext_ds)
2215 		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
2216 	if (name)
2217 		kmem_free(name, MAXPATHLEN);
2218 	return (err);
2219 }
2220 
2221 static void
2222 dsl_dataset_promote_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2223 {
2224 	dsl_dataset_t *hds = arg1;
2225 	struct promotearg *pa = arg2;
2226 	dsl_dir_t *dd = hds->ds_dir;
2227 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
2228 	dsl_dir_t *odd = NULL;
2229 	dsl_dataset_t *ds, *origin_ds;
2230 	char *name;
2231 
2232 	ASSERT(dd->dd_phys->dd_origin_obj != 0);
2233 	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
2234 
2235 	VERIFY(0 == dsl_dataset_open_obj(dp, dd->dd_phys->dd_origin_obj,
2236 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &origin_ds));
2237 	/*
2238 	 * We need to explicitly open odd, since origin_ds's dd will be
2239 	 * changing.
2240 	 */
2241 	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
2242 	    NULL, FTAG, &odd));
2243 
2244 	/* move snapshots to this dir */
2245 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
2246 	ds = origin_ds;
2247 	/* CONSTCOND */
2248 	while (TRUE) {
2249 		dsl_dataset_t *prev;
2250 
2251 		/* move snap name entry */
2252 		dsl_dataset_name(ds, name);
2253 		VERIFY(0 == zap_remove(dp->dp_meta_objset,
2254 		    pa->snapnames_obj, ds->ds_snapname, tx));
2255 		VERIFY(0 == zap_add(dp->dp_meta_objset,
2256 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
2257 		    8, 1, &ds->ds_object, tx));
2258 
2259 		/* change containing dsl_dir */
2260 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
2261 		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
2262 		ds->ds_phys->ds_dir_obj = dd->dd_object;
2263 		ASSERT3P(ds->ds_dir, ==, odd);
2264 		dsl_dir_close(ds->ds_dir, ds);
2265 		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
2266 		    NULL, ds, &ds->ds_dir));
2267 
2268 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
2269 
2270 		if (ds->ds_phys->ds_prev_snap_obj == 0)
2271 			break;
2272 
2273 		VERIFY(0 == dsl_dataset_open_obj(dp,
2274 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
2275 		    FTAG, &prev));
2276 
2277 		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
2278 			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
2279 			break;
2280 		}
2281 		if (ds != origin_ds)
2282 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
2283 		ds = prev;
2284 	}
2285 	if (ds != origin_ds)
2286 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
2287 
2288 	/* change origin's next snap */
2289 	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
2290 	origin_ds->ds_phys->ds_next_snap_obj = pa->newnext_obj;
2291 
2292 	/* change origin */
2293 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
2294 	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
2295 	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
2296 	dmu_buf_will_dirty(odd->dd_dbuf, tx);
2297 	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
2298 
2299 	/* change space accounting */
2300 	dsl_dir_diduse_space(odd, -pa->used, -pa->comp, -pa->uncomp, tx);
2301 	dsl_dir_diduse_space(dd, pa->used, pa->comp, pa->uncomp, tx);
2302 	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
2303 
2304 	/* log history record */
2305 	spa_history_internal_log(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
2306 	    cr, "dataset = %llu", ds->ds_object);
2307 
2308 	dsl_dir_close(odd, FTAG);
2309 	dsl_dataset_close(origin_ds, DS_MODE_EXCLUSIVE, FTAG);
2310 	kmem_free(name, MAXPATHLEN);
2311 }
2312 
2313 int
2314 dsl_dataset_promote(const char *name)
2315 {
2316 	dsl_dataset_t *ds;
2317 	int err;
2318 	dmu_object_info_t doi;
2319 	struct promotearg pa;
2320 
2321 	err = dsl_dataset_open(name, DS_MODE_NONE, FTAG, &ds);
2322 	if (err)
2323 		return (err);
2324 
2325 	err = dmu_object_info(ds->ds_dir->dd_pool->dp_meta_objset,
2326 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
2327 	if (err) {
2328 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
2329 		return (err);
2330 	}
2331 
2332 	/*
2333 	 * Add in 128x the snapnames zapobj size, since we will be moving
2334 	 * a bunch of snapnames to the promoted ds, and dirtying their
2335 	 * bonus buffers.
2336 	 */
2337 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2338 	    dsl_dataset_promote_check,
2339 	    dsl_dataset_promote_sync, ds, &pa, 2 + 2 * doi.doi_physical_blks);
2340 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
2341 	return (err);
2342 }
2343 
2344 struct cloneswaparg {
2345 	dsl_dataset_t *cds; /* clone dataset */
2346 	dsl_dataset_t *ohds; /* origin's head dataset */
2347 	boolean_t force;
2348 	int64_t unused_refres_delta; /* change in unconsumed refreservation */
2349 };
2350 
2351 /* ARGSUSED */
2352 static int
2353 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
2354 {
2355 	struct cloneswaparg *csa = arg1;
2356 
2357 	/* they should both be heads */
2358 	if (dsl_dataset_is_snapshot(csa->cds) ||
2359 	    dsl_dataset_is_snapshot(csa->ohds))
2360 		return (EINVAL);
2361 
2362 	/* the branch point should be just before them */
2363 	if (csa->cds->ds_prev != csa->ohds->ds_prev)
2364 		return (EINVAL);
2365 
2366 	/* cds should be the clone */
2367 	if (csa->cds->ds_prev->ds_phys->ds_next_snap_obj !=
2368 	    csa->ohds->ds_object)
2369 		return (EINVAL);
2370 
2371 	/* the clone should be a child of the origin */
2372 	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
2373 		return (EINVAL);
2374 
2375 	/* ohds shouldn't be modified unless 'force' */
2376 	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
2377 		return (ETXTBSY);
2378 
2379 	/* adjust amount of any unconsumed refreservation */
2380 	csa->unused_refres_delta =
2381 	    (int64_t)MIN(csa->ohds->ds_reserved,
2382 	    csa->ohds->ds_phys->ds_unique_bytes) -
2383 	    (int64_t)MIN(csa->ohds->ds_reserved,
2384 	    csa->cds->ds_phys->ds_unique_bytes);
2385 
2386 	if (csa->unused_refres_delta > 0 &&
2387 	    csa->unused_refres_delta >
2388 	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
2389 		return (ENOSPC);
2390 
2391 	return (0);
2392 }
2393 
2394 /* ARGSUSED */
2395 static void
2396 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2397 {
2398 	struct cloneswaparg *csa = arg1;
2399 	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
2400 	uint64_t itor = 0;
2401 	blkptr_t bp;
2402 	uint64_t unique = 0;
2403 	int err;
2404 
2405 	ASSERT(csa->cds->ds_reserved == 0);
2406 	ASSERT(csa->cds->ds_quota == csa->ohds->ds_quota);
2407 
2408 	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
2409 	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
2410 	dmu_buf_will_dirty(csa->cds->ds_prev->ds_dbuf, tx);
2411 
2412 	if (csa->cds->ds_user_ptr != NULL) {
2413 		csa->cds->ds_user_evict_func(csa->cds, csa->cds->ds_user_ptr);
2414 		csa->cds->ds_user_ptr = NULL;
2415 	}
2416 
2417 	if (csa->ohds->ds_user_ptr != NULL) {
2418 		csa->ohds->ds_user_evict_func(csa->ohds,
2419 		    csa->ohds->ds_user_ptr);
2420 		csa->ohds->ds_user_ptr = NULL;
2421 	}
2422 
2423 	/* compute unique space */
2424 	while ((err = bplist_iterate(&csa->cds->ds_deadlist,
2425 	    &itor, &bp)) == 0) {
2426 		if (bp.blk_birth > csa->cds->ds_prev->ds_phys->ds_prev_snap_txg)
2427 			unique += bp_get_dasize(dp->dp_spa, &bp);
2428 	}
2429 	VERIFY(err == ENOENT);
2430 
2431 	/* reset origin's unique bytes */
2432 	csa->cds->ds_prev->ds_phys->ds_unique_bytes = unique;
2433 
2434 	/* swap blkptrs */
2435 	{
2436 		blkptr_t tmp;
2437 		tmp = csa->ohds->ds_phys->ds_bp;
2438 		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
2439 		csa->cds->ds_phys->ds_bp = tmp;
2440 	}
2441 
2442 	/* set dd_*_bytes */
2443 	{
2444 		int64_t dused, dcomp, duncomp;
2445 		uint64_t cdl_used, cdl_comp, cdl_uncomp;
2446 		uint64_t odl_used, odl_comp, odl_uncomp;
2447 
2448 		VERIFY(0 == bplist_space(&csa->cds->ds_deadlist, &cdl_used,
2449 		    &cdl_comp, &cdl_uncomp));
2450 		VERIFY(0 == bplist_space(&csa->ohds->ds_deadlist, &odl_used,
2451 		    &odl_comp, &odl_uncomp));
2452 		dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
2453 		    (csa->ohds->ds_phys->ds_used_bytes + odl_used);
2454 		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
2455 		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
2456 		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
2457 		    cdl_uncomp -
2458 		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
2459 
2460 		dsl_dir_diduse_space(csa->ohds->ds_dir,
2461 		    dused, dcomp, duncomp, tx);
2462 		dsl_dir_diduse_space(csa->cds->ds_dir,
2463 		    -dused, -dcomp, -duncomp, tx);
2464 	}
2465 
2466 #define	SWITCH64(x, y) \
2467 	{ \
2468 		uint64_t __tmp = (x); \
2469 		(x) = (y); \
2470 		(y) = __tmp; \
2471 	}
2472 
2473 	/* swap ds_*_bytes */
2474 	SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
2475 	    csa->cds->ds_phys->ds_used_bytes);
2476 	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
2477 	    csa->cds->ds_phys->ds_compressed_bytes);
2478 	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
2479 	    csa->cds->ds_phys->ds_uncompressed_bytes);
2480 	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
2481 	    csa->cds->ds_phys->ds_unique_bytes);
2482 
2483 	/* apply any parent delta for change in unconsumed refreservation */
2484 	dsl_dir_diduse_space(csa->ohds->ds_dir, csa->unused_refres_delta,
2485 	    0, 0, tx);
2486 
2487 	/* swap deadlists */
2488 	bplist_close(&csa->cds->ds_deadlist);
2489 	bplist_close(&csa->ohds->ds_deadlist);
2490 	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
2491 	    csa->cds->ds_phys->ds_deadlist_obj);
2492 	VERIFY(0 == bplist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
2493 	    csa->cds->ds_phys->ds_deadlist_obj));
2494 	VERIFY(0 == bplist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
2495 	    csa->ohds->ds_phys->ds_deadlist_obj));
2496 }
2497 
2498 /*
2499  * Swap 'clone' with its origin head file system.
2500  */
2501 int
2502 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
2503     boolean_t force)
2504 {
2505 	struct cloneswaparg csa;
2506 
2507 	ASSERT(clone->ds_open_refcount == DS_REF_MAX);
2508 	ASSERT(origin_head->ds_open_refcount == DS_REF_MAX);
2509 
2510 	csa.cds = clone;
2511 	csa.ohds = origin_head;
2512 	csa.force = force;
2513 	return (dsl_sync_task_do(clone->ds_dir->dd_pool,
2514 	    dsl_dataset_clone_swap_check,
2515 	    dsl_dataset_clone_swap_sync, &csa, NULL, 9));
2516 }
2517 
2518 /*
2519  * Given a pool name and a dataset object number in that pool,
2520  * return the name of that dataset.
2521  */
2522 int
2523 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
2524 {
2525 	spa_t *spa;
2526 	dsl_pool_t *dp;
2527 	dsl_dataset_t *ds = NULL;
2528 	int error;
2529 
2530 	if ((error = spa_open(pname, &spa, FTAG)) != 0)
2531 		return (error);
2532 	dp = spa_get_dsl(spa);
2533 	rw_enter(&dp->dp_config_rwlock, RW_READER);
2534 	if ((error = dsl_dataset_open_obj(dp, obj,
2535 	    NULL, DS_MODE_NONE, FTAG, &ds)) != 0) {
2536 		rw_exit(&dp->dp_config_rwlock);
2537 		spa_close(spa, FTAG);
2538 		return (error);
2539 	}
2540 	dsl_dataset_name(ds, buf);
2541 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
2542 	rw_exit(&dp->dp_config_rwlock);
2543 	spa_close(spa, FTAG);
2544 
2545 	return (0);
2546 }
2547 
2548 int
2549 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
2550     uint64_t asize, uint64_t inflight, uint64_t *used)
2551 {
2552 	int error = 0;
2553 
2554 	ASSERT3S(asize, >, 0);
2555 
2556 	mutex_enter(&ds->ds_lock);
2557 	/*
2558 	 * Make a space adjustment for reserved bytes.
2559 	 */
2560 	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
2561 		ASSERT3U(*used, >=,
2562 		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
2563 		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
2564 	}
2565 
2566 	if (!check_quota || ds->ds_quota == 0) {
2567 		mutex_exit(&ds->ds_lock);
2568 		return (0);
2569 	}
2570 	/*
2571 	 * If they are requesting more space, and our current estimate
2572 	 * is over quota, they get to try again unless the actual
2573 	 * on-disk is over quota and there are no pending changes (which
2574 	 * may free up space for us).
2575 	 */
2576 	if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
2577 		if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
2578 			error = ERESTART;
2579 		else
2580 			error = EDQUOT;
2581 	}
2582 	mutex_exit(&ds->ds_lock);
2583 
2584 	return (error);
2585 }
2586 
2587 /* ARGSUSED */
2588 static int
2589 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
2590 {
2591 	dsl_dataset_t *ds = arg1;
2592 	uint64_t *quotap = arg2;
2593 	uint64_t new_quota = *quotap;
2594 
2595 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
2596 		return (ENOTSUP);
2597 
2598 	if (new_quota == 0)
2599 		return (0);
2600 
2601 	if (new_quota < ds->ds_phys->ds_used_bytes ||
2602 	    new_quota < ds->ds_reserved)
2603 		return (ENOSPC);
2604 
2605 	return (0);
2606 }
2607 
2608 /* ARGSUSED */
2609 void
2610 dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
2611 {
2612 	dsl_dataset_t *ds = arg1;
2613 	uint64_t *quotap = arg2;
2614 	uint64_t new_quota = *quotap;
2615 
2616 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2617 
2618 	mutex_enter(&ds->ds_lock);
2619 	ds->ds_quota = new_quota;
2620 	mutex_exit(&ds->ds_lock);
2621 
2622 	dsl_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
2623 
2624 	spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
2625 	    tx, cr, "%lld dataset = %llu ",
2626 	    (longlong_t)new_quota, ds->ds_dir->dd_phys->dd_head_dataset_obj);
2627 }
2628 
2629 int
2630 dsl_dataset_set_quota(const char *dsname, uint64_t quota)
2631 {
2632 	dsl_dataset_t *ds;
2633 	int err;
2634 
2635 	err = dsl_dataset_open(dsname, DS_MODE_STANDARD, FTAG, &ds);
2636 	if (err)
2637 		return (err);
2638 
2639 	if (quota != ds->ds_quota) {
2640 		/*
2641 		 * If someone removes a file, then tries to set the quota, we
2642 		 * want to make sure the file freeing takes effect.
2643 		 */
2644 		txg_wait_open(ds->ds_dir->dd_pool, 0);
2645 
2646 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2647 		    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
2648 		    ds, &quota, 0);
2649 	}
2650 	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
2651 	return (err);
2652 }
2653 
2654 static int
2655 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
2656 {
2657 	dsl_dataset_t *ds = arg1;
2658 	uint64_t *reservationp = arg2;
2659 	uint64_t new_reservation = *reservationp;
2660 	int64_t delta;
2661 	uint64_t unique;
2662 
2663 	if (new_reservation > INT64_MAX)
2664 		return (EOVERFLOW);
2665 
2666 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
2667 	    SPA_VERSION_REFRESERVATION)
2668 		return (ENOTSUP);
2669 
2670 	if (dsl_dataset_is_snapshot(ds))
2671 		return (EINVAL);
2672 
2673 	/*
2674 	 * If we are doing the preliminary check in open context, the
2675 	 * space estimates may be inaccurate.
2676 	 */
2677 	if (!dmu_tx_is_syncing(tx))
2678 		return (0);
2679 
2680 	mutex_enter(&ds->ds_lock);
2681 	unique = dsl_dataset_unique(ds);
2682 	delta = MAX(unique, new_reservation) - MAX(unique, ds->ds_reserved);
2683 	mutex_exit(&ds->ds_lock);
2684 
2685 	if (delta > 0 &&
2686 	    delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
2687 		return (ENOSPC);
2688 	if (delta > 0 && ds->ds_quota > 0 &&
2689 	    new_reservation > ds->ds_quota)
2690 		return (ENOSPC);
2691 
2692 	return (0);
2693 }
2694 
2695 /* ARGSUSED */
2696 static void
2697 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, cred_t *cr,
2698     dmu_tx_t *tx)
2699 {
2700 	dsl_dataset_t *ds = arg1;
2701 	uint64_t *reservationp = arg2;
2702 	uint64_t new_reservation = *reservationp;
2703 	uint64_t unique;
2704 	int64_t delta;
2705 
2706 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
2707 
2708 	mutex_enter(&ds->ds_lock);
2709 	unique = dsl_dataset_unique(ds);
2710 	delta = MAX(0, (int64_t)(new_reservation - unique)) -
2711 	    MAX(0, (int64_t)(ds->ds_reserved - unique));
2712 	ds->ds_reserved = new_reservation;
2713 	mutex_exit(&ds->ds_lock);
2714 
2715 	dsl_prop_set_uint64_sync(ds->ds_dir, "refreservation",
2716 	    new_reservation, cr, tx);
2717 
2718 	dsl_dir_diduse_space(ds->ds_dir, delta, 0, 0, tx);
2719 
2720 	spa_history_internal_log(LOG_DS_REFRESERV,
2721 	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu",
2722 	    (longlong_t)new_reservation,
2723 	    ds->ds_dir->dd_phys->dd_head_dataset_obj);
2724 }
2725 
2726 int
2727 dsl_dataset_set_reservation(const char *dsname, uint64_t reservation)
2728 {
2729 	dsl_dataset_t *ds;
2730 	int err;
2731 
2732 	err = dsl_dataset_open(dsname, DS_MODE_STANDARD, FTAG, &ds);
2733 	if (err)
2734 		return (err);
2735 
2736 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
2737 	    dsl_dataset_set_reservation_check,
2738 	    dsl_dataset_set_reservation_sync, ds, &reservation, 0);
2739 	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
2740 	return (err);
2741 }
2742