xref: /titanic_51/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision d326b23bcecd3c0d693a54003343ec3de73e58d0)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu_objset.h>
29 #include <sys/dsl_dataset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dsl_prop.h>
32 #include <sys/dmu_traverse.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/arc.h>
35 #include <sys/zio.h>
36 #include <sys/zap.h>
37 #include <sys/unique.h>
38 #include <sys/zfs_context.h>
39 
40 static int dsl_dataset_destroy_begin_sync(dsl_dir_t *dd,
41     void *arg, dmu_tx_t *tx);
42 
43 #define	DOS_REF_MAX	(1ULL << 62)
44 
45 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
46 
47 /*
48  * We use weighted reference counts to express the various forms of exclusion
49  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
50  * is DOS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
51  * This makes the exclusion logic simple: the total refcnt for all opens cannot
52  * exceed DOS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
53  * weight (DOS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
54  * just over half of the refcnt space, so there can't be more than one, but it
55  * can peacefully coexist with any number of STANDARD opens.
56  */
57 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
58 	0,			/* DOS_MODE_NONE - invalid		*/
59 	1,			/* DOS_MODE_STANDARD - unlimited number	*/
60 	(DOS_REF_MAX >> 1) + 1,	/* DOS_MODE_PRIMARY - only one of these	*/
61 	DOS_REF_MAX		/* DOS_MODE_EXCLUSIVE - no other opens	*/
62 };
63 
64 
65 void
66 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
67 {
68 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
69 	int compressed = BP_GET_PSIZE(bp);
70 	int uncompressed = BP_GET_UCSIZE(bp);
71 
72 	dprintf_bp(bp, "born, ds=%p\n", ds);
73 
74 	ASSERT(dmu_tx_is_syncing(tx));
75 	/* It could have been compressed away to nothing */
76 	if (BP_IS_HOLE(bp))
77 		return;
78 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
79 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
80 	if (ds == NULL) {
81 		/*
82 		 * Account for the meta-objset space in its placeholder
83 		 * dsl_dir.
84 		 */
85 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
86 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
87 		    used, compressed, uncompressed, tx);
88 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
89 		return;
90 	}
91 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
92 	mutex_enter(&ds->ds_lock);
93 	ds->ds_phys->ds_used_bytes += used;
94 	ds->ds_phys->ds_compressed_bytes += compressed;
95 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
96 	ds->ds_phys->ds_unique_bytes += used;
97 	mutex_exit(&ds->ds_lock);
98 	dsl_dir_diduse_space(ds->ds_dir,
99 	    used, compressed, uncompressed, tx);
100 }
101 
102 void
103 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
104 {
105 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
106 	int compressed = BP_GET_PSIZE(bp);
107 	int uncompressed = BP_GET_UCSIZE(bp);
108 
109 	ASSERT(dmu_tx_is_syncing(tx));
110 	if (BP_IS_HOLE(bp))
111 		return;
112 
113 	ASSERT(used > 0);
114 	if (ds == NULL) {
115 		/*
116 		 * Account for the meta-objset space in its placeholder
117 		 * dataset.
118 		 */
119 		/* XXX this can fail, what do we do when it does? */
120 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
121 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
122 		bzero(bp, sizeof (blkptr_t));
123 
124 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
125 		    -used, -compressed, -uncompressed, tx);
126 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
127 		return;
128 	}
129 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
130 
131 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
132 
133 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
134 		dprintf_bp(bp, "freeing: %s", "");
135 		/* XXX check return code? */
136 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
137 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
138 
139 		mutex_enter(&ds->ds_lock);
140 		/* XXX unique_bytes is not accurate for head datasets */
141 		/* ASSERT3U(ds->ds_phys->ds_unique_bytes, >=, used); */
142 		ds->ds_phys->ds_unique_bytes -= used;
143 		mutex_exit(&ds->ds_lock);
144 		dsl_dir_diduse_space(ds->ds_dir,
145 		    -used, -compressed, -uncompressed, tx);
146 	} else {
147 		dprintf_bp(bp, "putting on dead list: %s", "");
148 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
149 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
150 		if (ds->ds_phys->ds_prev_snap_obj != 0) {
151 			ASSERT3U(ds->ds_prev->ds_object, ==,
152 			    ds->ds_phys->ds_prev_snap_obj);
153 			ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
154 			if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
155 			    ds->ds_object && bp->blk_birth >
156 			    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
157 				dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
158 				mutex_enter(&ds->ds_prev->ds_lock);
159 				ds->ds_prev->ds_phys->ds_unique_bytes +=
160 				    used;
161 				mutex_exit(&ds->ds_prev->ds_lock);
162 			}
163 		}
164 	}
165 	bzero(bp, sizeof (blkptr_t));
166 	mutex_enter(&ds->ds_lock);
167 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
168 	ds->ds_phys->ds_used_bytes -= used;
169 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
170 	ds->ds_phys->ds_compressed_bytes -= compressed;
171 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
172 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
173 	mutex_exit(&ds->ds_lock);
174 }
175 
176 uint64_t
177 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
178 {
179 	uint64_t txg;
180 	dsl_dir_t *dd;
181 
182 	if (ds == NULL)
183 		return (0);
184 	/*
185 	 * The snapshot creation could fail, but that would cause an
186 	 * incorrect FALSE return, which would only result in an
187 	 * overestimation of the amount of space that an operation would
188 	 * consume, which is OK.
189 	 *
190 	 * There's also a small window where we could miss a pending
191 	 * snapshot, because we could set the sync task in the quiescing
192 	 * phase.  So this should only be used as a guess.
193 	 */
194 	dd = ds->ds_dir;
195 	mutex_enter(&dd->dd_lock);
196 	if (dd->dd_sync_func == dsl_dataset_snapshot_sync)
197 		txg = dd->dd_sync_txg;
198 	else
199 		txg = ds->ds_phys->ds_prev_snap_txg;
200 	mutex_exit(&dd->dd_lock);
201 
202 	return (txg);
203 }
204 
205 int
206 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
207 {
208 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
209 }
210 
211 /* ARGSUSED */
212 static void
213 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
214 {
215 	dsl_dataset_t *ds = dsv;
216 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
217 
218 	/* open_refcount == DOS_REF_MAX when deleting */
219 	ASSERT(ds->ds_open_refcount == 0 ||
220 	    ds->ds_open_refcount == DOS_REF_MAX);
221 
222 	dprintf_ds(ds, "evicting %s\n", "");
223 
224 	unique_remove(ds->ds_phys->ds_fsid_guid);
225 
226 	if (ds->ds_user_ptr != NULL)
227 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
228 
229 	if (ds->ds_prev) {
230 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
231 		ds->ds_prev = NULL;
232 	}
233 
234 	bplist_close(&ds->ds_deadlist);
235 	dsl_dir_close(ds->ds_dir, ds);
236 
237 	if (list_link_active(&ds->ds_synced_link))
238 		list_remove(&dp->dp_synced_objsets, ds);
239 
240 	kmem_free(ds, sizeof (dsl_dataset_t));
241 }
242 
243 static int
244 dsl_dataset_get_snapname(dsl_dataset_t *ds)
245 {
246 	dsl_dataset_phys_t *headphys;
247 	int err;
248 	dmu_buf_t *headdbuf;
249 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
250 	objset_t *mos = dp->dp_meta_objset;
251 
252 	if (ds->ds_snapname[0])
253 		return (0);
254 	if (ds->ds_phys->ds_next_snap_obj == 0)
255 		return (0);
256 
257 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
258 	    FTAG, &headdbuf);
259 	if (err)
260 		return (err);
261 	headphys = headdbuf->db_data;
262 	err = zap_value_search(dp->dp_meta_objset,
263 	    headphys->ds_snapnames_zapobj, ds->ds_object, ds->ds_snapname);
264 	dmu_buf_rele(headdbuf, FTAG);
265 	return (err);
266 }
267 
268 int
269 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
270     int mode, void *tag, dsl_dataset_t **dsp)
271 {
272 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
273 	objset_t *mos = dp->dp_meta_objset;
274 	dmu_buf_t *dbuf;
275 	dsl_dataset_t *ds;
276 	int err;
277 
278 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
279 	    dsl_pool_sync_context(dp));
280 
281 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
282 	if (err)
283 		return (err);
284 	ds = dmu_buf_get_user(dbuf);
285 	if (ds == NULL) {
286 		dsl_dataset_t *winner;
287 
288 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
289 		ds->ds_dbuf = dbuf;
290 		ds->ds_object = dsobj;
291 		ds->ds_phys = dbuf->db_data;
292 
293 		err = bplist_open(&ds->ds_deadlist,
294 		    mos, ds->ds_phys->ds_deadlist_obj);
295 		if (err == 0) {
296 			err = dsl_dir_open_obj(dp,
297 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
298 		}
299 		if (err) {
300 			/*
301 			 * we don't really need to close the blist if we
302 			 * just opened it.
303 			 */
304 			kmem_free(ds, sizeof (dsl_dataset_t));
305 			dmu_buf_rele(dbuf, tag);
306 			return (err);
307 		}
308 
309 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
310 			ds->ds_snapname[0] = '\0';
311 			if (ds->ds_phys->ds_prev_snap_obj) {
312 				err = dsl_dataset_open_obj(dp,
313 				    ds->ds_phys->ds_prev_snap_obj, NULL,
314 				    DS_MODE_NONE, ds, &ds->ds_prev);
315 			}
316 		} else {
317 			if (snapname) {
318 #ifdef ZFS_DEBUG
319 				dsl_dataset_phys_t *headphys;
320 				dmu_buf_t *headdbuf;
321 				err = dmu_bonus_hold(mos,
322 				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
323 				    FTAG, &headdbuf);
324 				if (err == 0) {
325 					headphys = headdbuf->db_data;
326 					uint64_t foundobj;
327 					err = zap_lookup(dp->dp_meta_objset,
328 					    headphys->ds_snapnames_zapobj,
329 					    snapname, sizeof (foundobj), 1,
330 					    &foundobj);
331 					ASSERT3U(foundobj, ==, dsobj);
332 					dmu_buf_rele(headdbuf, FTAG);
333 				}
334 #endif
335 				(void) strcat(ds->ds_snapname, snapname);
336 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
337 				err = dsl_dataset_get_snapname(ds);
338 			}
339 		}
340 
341 		if (err == 0) {
342 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
343 			    dsl_dataset_evict);
344 		}
345 		if (err || winner) {
346 			bplist_close(&ds->ds_deadlist);
347 			if (ds->ds_prev) {
348 				dsl_dataset_close(ds->ds_prev,
349 				    DS_MODE_NONE, ds);
350 			}
351 			dsl_dir_close(ds->ds_dir, ds);
352 			kmem_free(ds, sizeof (dsl_dataset_t));
353 			if (err) {
354 				dmu_buf_rele(dbuf, tag);
355 				return (err);
356 			}
357 			ds = winner;
358 		} else {
359 			uint64_t new =
360 			    unique_insert(ds->ds_phys->ds_fsid_guid);
361 			if (new != ds->ds_phys->ds_fsid_guid) {
362 				/* XXX it won't necessarily be synced... */
363 				ds->ds_phys->ds_fsid_guid = new;
364 			}
365 		}
366 	}
367 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
368 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
369 
370 	mutex_enter(&ds->ds_lock);
371 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
372 	    (ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) &&
373 	    !DS_MODE_IS_INCONSISTENT(mode)) ||
374 	    (ds->ds_open_refcount + weight > DOS_REF_MAX)) {
375 		mutex_exit(&ds->ds_lock);
376 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
377 		return (EBUSY);
378 	}
379 	ds->ds_open_refcount += weight;
380 	mutex_exit(&ds->ds_lock);
381 
382 	*dsp = ds;
383 	return (0);
384 }
385 
386 int
387 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
388     void *tag, dsl_dataset_t **dsp)
389 {
390 	dsl_dir_t *dd;
391 	dsl_pool_t *dp;
392 	const char *tail;
393 	uint64_t obj;
394 	dsl_dataset_t *ds = NULL;
395 	int err = 0;
396 
397 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
398 	if (err)
399 		return (err);
400 
401 	dp = dd->dd_pool;
402 	obj = dd->dd_phys->dd_head_dataset_obj;
403 	rw_enter(&dp->dp_config_rwlock, RW_READER);
404 	if (obj == 0) {
405 		/* A dataset with no associated objset */
406 		err = ENOENT;
407 		goto out;
408 	}
409 
410 	if (tail != NULL) {
411 		objset_t *mos = dp->dp_meta_objset;
412 
413 		err = dsl_dataset_open_obj(dp, obj, NULL,
414 		    DS_MODE_NONE, tag, &ds);
415 		if (err)
416 			goto out;
417 		obj = ds->ds_phys->ds_snapnames_zapobj;
418 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
419 		ds = NULL;
420 
421 		if (tail[0] != '@') {
422 			err = ENOENT;
423 			goto out;
424 		}
425 		tail++;
426 
427 		/* Look for a snapshot */
428 		if (!DS_MODE_IS_READONLY(mode)) {
429 			err = EROFS;
430 			goto out;
431 		}
432 		dprintf("looking for snapshot '%s'\n", tail);
433 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
434 		if (err)
435 			goto out;
436 	}
437 	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
438 
439 out:
440 	rw_exit(&dp->dp_config_rwlock);
441 	dsl_dir_close(dd, FTAG);
442 
443 	ASSERT3U((err == 0), ==, (ds != NULL));
444 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
445 
446 	*dsp = ds;
447 	return (err);
448 }
449 
450 int
451 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
452 {
453 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
454 }
455 
456 void
457 dsl_dataset_name(dsl_dataset_t *ds, char *name)
458 {
459 	if (ds == NULL) {
460 		(void) strcpy(name, "mos");
461 	} else {
462 		dsl_dir_name(ds->ds_dir, name);
463 		VERIFY(0 == dsl_dataset_get_snapname(ds));
464 		if (ds->ds_snapname[0]) {
465 			(void) strcat(name, "@");
466 			if (!MUTEX_HELD(&ds->ds_lock)) {
467 				/*
468 				 * We use a "recursive" mutex so that we
469 				 * can call dprintf_ds() with ds_lock held.
470 				 */
471 				mutex_enter(&ds->ds_lock);
472 				(void) strcat(name, ds->ds_snapname);
473 				mutex_exit(&ds->ds_lock);
474 			} else {
475 				(void) strcat(name, ds->ds_snapname);
476 			}
477 		}
478 	}
479 }
480 
481 void
482 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
483 {
484 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
485 	mutex_enter(&ds->ds_lock);
486 	ASSERT3U(ds->ds_open_refcount, >=, weight);
487 	ds->ds_open_refcount -= weight;
488 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
489 	    mode, ds->ds_open_refcount);
490 	mutex_exit(&ds->ds_lock);
491 
492 	dmu_buf_rele(ds->ds_dbuf, tag);
493 }
494 
495 void
496 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
497 {
498 	objset_t *mos = dp->dp_meta_objset;
499 	dmu_buf_t *dbuf;
500 	dsl_dataset_phys_t *dsphys;
501 	dsl_dataset_t *ds;
502 	uint64_t dsobj;
503 	dsl_dir_t *dd;
504 
505 	dsl_dir_create_root(mos, ddobjp, tx);
506 	VERIFY(0 == dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG, &dd));
507 
508 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
509 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
510 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
511 	dmu_buf_will_dirty(dbuf, tx);
512 	dsphys = dbuf->db_data;
513 	dsphys->ds_dir_obj = dd->dd_object;
514 	dsphys->ds_fsid_guid = unique_create();
515 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
516 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
517 	    sizeof (dsphys->ds_guid));
518 	dsphys->ds_snapnames_zapobj =
519 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
520 	dsphys->ds_creation_time = gethrestime_sec();
521 	dsphys->ds_creation_txg = tx->tx_txg;
522 	dsphys->ds_deadlist_obj =
523 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
524 	dmu_buf_rele(dbuf, FTAG);
525 
526 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
527 	dd->dd_phys->dd_head_dataset_obj = dsobj;
528 	dsl_dir_close(dd, FTAG);
529 
530 	VERIFY(0 ==
531 	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
532 	(void) dmu_objset_create_impl(dp->dp_spa, ds, DMU_OST_ZFS, tx);
533 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
534 }
535 
536 int
537 dsl_dataset_create_sync(dsl_dir_t *pds, const char *fullname,
538     const char *lastname, dsl_dataset_t *clone_parent, dmu_tx_t *tx)
539 {
540 	int err;
541 	dsl_pool_t *dp = pds->dd_pool;
542 	dmu_buf_t *dbuf;
543 	dsl_dataset_phys_t *dsphys;
544 	uint64_t dsobj;
545 	objset_t *mos = dp->dp_meta_objset;
546 	dsl_dir_t *dd;
547 
548 	if (clone_parent != NULL) {
549 		/*
550 		 * You can't clone across pools.
551 		 */
552 		if (clone_parent->ds_dir->dd_pool != dp)
553 			return (EXDEV);
554 
555 		/*
556 		 * You can only clone snapshots, not the head datasets.
557 		 */
558 		if (clone_parent->ds_phys->ds_num_children == 0)
559 			return (EINVAL);
560 	}
561 
562 	ASSERT(lastname[0] != '@');
563 	ASSERT(dmu_tx_is_syncing(tx));
564 
565 	err = dsl_dir_create_sync(pds, lastname, tx);
566 	if (err)
567 		return (err);
568 	VERIFY(0 == dsl_dir_open_spa(dp->dp_spa, fullname, FTAG, &dd, NULL));
569 
570 	/* This is the point of no (unsuccessful) return */
571 
572 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
573 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
574 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
575 	dmu_buf_will_dirty(dbuf, tx);
576 	dsphys = dbuf->db_data;
577 	dsphys->ds_dir_obj = dd->dd_object;
578 	dsphys->ds_fsid_guid = unique_create();
579 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
580 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
581 	    sizeof (dsphys->ds_guid));
582 	dsphys->ds_snapnames_zapobj =
583 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
584 	dsphys->ds_creation_time = gethrestime_sec();
585 	dsphys->ds_creation_txg = tx->tx_txg;
586 	dsphys->ds_deadlist_obj =
587 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
588 	if (clone_parent) {
589 		dsphys->ds_prev_snap_obj = clone_parent->ds_object;
590 		dsphys->ds_prev_snap_txg =
591 		    clone_parent->ds_phys->ds_creation_txg;
592 		dsphys->ds_used_bytes =
593 		    clone_parent->ds_phys->ds_used_bytes;
594 		dsphys->ds_compressed_bytes =
595 		    clone_parent->ds_phys->ds_compressed_bytes;
596 		dsphys->ds_uncompressed_bytes =
597 		    clone_parent->ds_phys->ds_uncompressed_bytes;
598 		dsphys->ds_bp = clone_parent->ds_phys->ds_bp;
599 
600 		dmu_buf_will_dirty(clone_parent->ds_dbuf, tx);
601 		clone_parent->ds_phys->ds_num_children++;
602 
603 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
604 		dd->dd_phys->dd_clone_parent_obj = clone_parent->ds_object;
605 	}
606 	dmu_buf_rele(dbuf, FTAG);
607 
608 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
609 	dd->dd_phys->dd_head_dataset_obj = dsobj;
610 	dsl_dir_close(dd, FTAG);
611 
612 	return (0);
613 }
614 
615 int
616 dsl_dataset_destroy(const char *name)
617 {
618 	int err;
619 	dsl_pool_t *dp;
620 	dsl_dir_t *dd;
621 	const char *tail;
622 
623 	err = dsl_dir_open(name, FTAG, &dd, &tail);
624 	if (err)
625 		return (err);
626 
627 	dp = dd->dd_pool;
628 	if (tail != NULL) {
629 		if (tail[0] != '@') {
630 			dsl_dir_close(dd, FTAG);
631 			return (ENOENT);
632 		}
633 		tail++;
634 		/* Just blow away the snapshot */
635 		do {
636 			txg_wait_synced(dp, 0);
637 			err = dsl_dir_sync_task(dd,
638 			    dsl_dataset_destroy_sync, (void*)tail, 0);
639 		} while (err == EAGAIN);
640 		dsl_dir_close(dd, FTAG);
641 	} else {
642 		char buf[MAXNAMELEN];
643 		char *cp;
644 		objset_t *os;
645 		uint64_t obj;
646 		dsl_dir_t *pds;
647 
648 		if (dd->dd_phys->dd_parent_obj == 0) {
649 			dsl_dir_close(dd, FTAG);
650 			return (EINVAL);
651 		}
652 
653 		err = dmu_objset_open(name, DMU_OST_ANY,
654 		    DS_MODE_PRIMARY | DS_MODE_INCONSISTENT, &os);
655 		if (err) {
656 			dsl_dir_close(dd, FTAG);
657 			return (err);
658 		}
659 
660 		/*
661 		 * Check for errors and mark this ds as inconsistent, in
662 		 * case we crash while freeing the objects.
663 		 */
664 		err = dsl_dir_sync_task(os->os->os_dsl_dataset->ds_dir,
665 		    dsl_dataset_destroy_begin_sync, os->os->os_dsl_dataset, 0);
666 		if (err) {
667 			dmu_objset_close(os);
668 			dsl_dir_close(dd, FTAG);
669 			return (err);
670 		}
671 
672 		/*
673 		 * remove the objects in open context, so that we won't
674 		 * have too much to do in syncing context.
675 		 */
676 		for (obj = 0; err == 0;
677 		    err = dmu_object_next(os, &obj, FALSE)) {
678 			dmu_tx_t *tx = dmu_tx_create(os);
679 			dmu_tx_hold_free(tx, obj, 0, DMU_OBJECT_END);
680 			dmu_tx_hold_bonus(tx, obj);
681 			err = dmu_tx_assign(tx, TXG_WAIT);
682 			if (err) {
683 				/*
684 				 * Perhaps there is not enough disk
685 				 * space.  Just deal with it from
686 				 * dsl_dataset_destroy_sync().
687 				 */
688 				dmu_tx_abort(tx);
689 				continue;
690 			}
691 			VERIFY(0 == dmu_object_free(os, obj, tx));
692 			dmu_tx_commit(tx);
693 		}
694 		/* Make sure it's not dirty before we finish destroying it. */
695 		txg_wait_synced(dd->dd_pool, 0);
696 
697 		dmu_objset_close(os);
698 		if (err != ESRCH) {
699 			dsl_dir_close(dd, FTAG);
700 			return (err);
701 		}
702 
703 		/*
704 		 * Blow away the dsl_dir + head dataset.
705 		 * dsl_dir_destroy_sync() will call
706 		 * dsl_dataset_destroy_sync() to destroy the head dataset.
707 		 */
708 		rw_enter(&dp->dp_config_rwlock, RW_READER);
709 		err = dsl_dir_open_obj(dd->dd_pool,
710 		    dd->dd_phys->dd_parent_obj, NULL, FTAG, &pds);
711 		dsl_dir_close(dd, FTAG);
712 		rw_exit(&dp->dp_config_rwlock);
713 		if (err)
714 			return (err);
715 
716 		(void) strcpy(buf, name);
717 		cp = strrchr(buf, '/') + 1;
718 		ASSERT(cp[0] != '\0');
719 		do {
720 			txg_wait_synced(dp, 0);
721 			err = dsl_dir_sync_task(pds,
722 			    dsl_dir_destroy_sync, cp, 0);
723 		} while (err == EAGAIN);
724 		dsl_dir_close(pds, FTAG);
725 	}
726 
727 	return (err);
728 }
729 
730 int
731 dsl_dataset_rollback(const char *name)
732 {
733 	int err;
734 	dsl_dir_t *dd;
735 	const char *tail;
736 
737 	err = dsl_dir_open(name, FTAG, &dd, &tail);
738 	if (err)
739 		return (err);
740 
741 	if (tail != NULL) {
742 		dsl_dir_close(dd, FTAG);
743 		return (EINVAL);
744 	}
745 	do {
746 		txg_wait_synced(dd->dd_pool, 0);
747 		err = dsl_dir_sync_task(dd,
748 		    dsl_dataset_rollback_sync, NULL, 0);
749 	} while (err == EAGAIN);
750 	dsl_dir_close(dd, FTAG);
751 
752 	return (err);
753 }
754 
755 void *
756 dsl_dataset_set_user_ptr(dsl_dataset_t *ds,
757     void *p, dsl_dataset_evict_func_t func)
758 {
759 	void *old;
760 
761 	mutex_enter(&ds->ds_lock);
762 	old = ds->ds_user_ptr;
763 	if (old == NULL) {
764 		ds->ds_user_ptr = p;
765 		ds->ds_user_evict_func = func;
766 	}
767 	mutex_exit(&ds->ds_lock);
768 	return (old);
769 }
770 
771 void *
772 dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
773 {
774 	return (ds->ds_user_ptr);
775 }
776 
777 
778 void
779 dsl_dataset_get_blkptr(dsl_dataset_t *ds, blkptr_t *bp)
780 {
781 	*bp = ds->ds_phys->ds_bp;
782 }
783 
784 void
785 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
786 {
787 	ASSERT(dmu_tx_is_syncing(tx));
788 	/* If it's the meta-objset, set dp_meta_rootbp */
789 	if (ds == NULL) {
790 		tx->tx_pool->dp_meta_rootbp = *bp;
791 	} else {
792 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
793 		ds->ds_phys->ds_bp = *bp;
794 	}
795 }
796 
797 spa_t *
798 dsl_dataset_get_spa(dsl_dataset_t *ds)
799 {
800 	return (ds->ds_dir->dd_pool->dp_spa);
801 }
802 
803 void
804 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
805 {
806 	dsl_pool_t *dp;
807 
808 	if (ds == NULL) /* this is the meta-objset */
809 		return;
810 
811 	ASSERT(ds->ds_user_ptr != NULL);
812 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
813 
814 	dp = ds->ds_dir->dd_pool;
815 
816 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
817 		/* up the hold count until we can be written out */
818 		dmu_buf_add_ref(ds->ds_dbuf, ds);
819 	}
820 }
821 
822 struct killarg {
823 	uint64_t *usedp;
824 	uint64_t *compressedp;
825 	uint64_t *uncompressedp;
826 	zio_t *zio;
827 	dmu_tx_t *tx;
828 };
829 
830 static int
831 kill_blkptr(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
832 {
833 	struct killarg *ka = arg;
834 	blkptr_t *bp = &bc->bc_blkptr;
835 
836 	ASSERT3U(bc->bc_errno, ==, 0);
837 
838 	/*
839 	 * Since this callback is not called concurrently, no lock is
840 	 * needed on the accounting values.
841 	 */
842 	*ka->usedp += bp_get_dasize(spa, bp);
843 	*ka->compressedp += BP_GET_PSIZE(bp);
844 	*ka->uncompressedp += BP_GET_UCSIZE(bp);
845 	/* XXX check for EIO? */
846 	(void) arc_free(ka->zio, spa, ka->tx->tx_txg, bp, NULL, NULL,
847 	    ARC_NOWAIT);
848 	return (0);
849 }
850 
851 /* ARGSUSED */
852 int
853 dsl_dataset_rollback_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
854 {
855 	objset_t *mos = dd->dd_pool->dp_meta_objset;
856 	dsl_dataset_t *ds;
857 	int err;
858 
859 	if (dd->dd_phys->dd_head_dataset_obj == 0)
860 		return (EINVAL);
861 	err = dsl_dataset_open_obj(dd->dd_pool,
862 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &ds);
863 	if (err)
864 		return (err);
865 
866 	if (ds->ds_phys->ds_prev_snap_txg == 0) {
867 		/*
868 		 * There's no previous snapshot.  I suppose we could
869 		 * roll it back to being empty (and re-initialize the
870 		 * upper (ZPL) layer).  But for now there's no way to do
871 		 * this via the user interface.
872 		 */
873 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
874 		return (EINVAL);
875 	}
876 
877 	mutex_enter(&ds->ds_lock);
878 	if (ds->ds_open_refcount > 0) {
879 		mutex_exit(&ds->ds_lock);
880 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
881 		return (EBUSY);
882 	}
883 
884 	/*
885 	 * If we made changes this txg, traverse_dsl_dataset won't find
886 	 * them.  Try again.
887 	 */
888 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
889 		mutex_exit(&ds->ds_lock);
890 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
891 		return (EAGAIN);
892 	}
893 
894 	/* THE POINT OF NO (unsuccessful) RETURN */
895 	ds->ds_open_refcount = DOS_REF_MAX;
896 	mutex_exit(&ds->ds_lock);
897 
898 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
899 
900 	/* Zero out the deadlist. */
901 	dprintf("old deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
902 	bplist_close(&ds->ds_deadlist);
903 	bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
904 	ds->ds_phys->ds_deadlist_obj =
905 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
906 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
907 	    ds->ds_phys->ds_deadlist_obj));
908 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
909 
910 	{
911 		/* Free blkptrs that we gave birth to */
912 		zio_t *zio;
913 		uint64_t used = 0, compressed = 0, uncompressed = 0;
914 		struct killarg ka;
915 
916 		zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL,
917 		    ZIO_FLAG_MUSTSUCCEED);
918 		ka.usedp = &used;
919 		ka.compressedp = &compressed;
920 		ka.uncompressedp = &uncompressed;
921 		ka.zio = zio;
922 		ka.tx = tx;
923 		(void) traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
924 		    ADVANCE_POST, kill_blkptr, &ka);
925 		(void) zio_wait(zio);
926 
927 		dsl_dir_diduse_space(dd,
928 		    -used, -compressed, -uncompressed, tx);
929 	}
930 
931 	/* Change our contents to that of the prev snapshot (finally!) */
932 	ASSERT3U(ds->ds_prev->ds_object, ==, ds->ds_phys->ds_prev_snap_obj);
933 	ds->ds_phys->ds_bp = ds->ds_prev->ds_phys->ds_bp;
934 	ds->ds_phys->ds_used_bytes = ds->ds_prev->ds_phys->ds_used_bytes;
935 	ds->ds_phys->ds_compressed_bytes =
936 	    ds->ds_prev->ds_phys->ds_compressed_bytes;
937 	ds->ds_phys->ds_uncompressed_bytes =
938 	    ds->ds_prev->ds_phys->ds_uncompressed_bytes;
939 	ds->ds_phys->ds_flags = ds->ds_prev->ds_phys->ds_flags;
940 	ds->ds_phys->ds_unique_bytes = 0;
941 
942 	dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
943 	ds->ds_prev->ds_phys->ds_unique_bytes = 0;
944 
945 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
946 	ds->ds_open_refcount = 0;
947 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
948 
949 	return (0);
950 }
951 
952 /* ARGSUSED */
953 static int
954 dsl_dataset_destroy_begin_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
955 {
956 	dsl_dataset_t *ds = arg;
957 
958 	/*
959 	 * Can't delete a head dataset if there are snapshots of it.
960 	 * (Except if the only snapshots are from the branch we cloned
961 	 * from.)
962 	 */
963 	if (ds->ds_prev != NULL &&
964 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
965 		return (EINVAL);
966 
967 	/* Mark it as inconsistent on-disk, in case we crash */
968 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
969 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
970 
971 	return (0);
972 }
973 
974 int
975 dsl_dataset_destroy_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
976 {
977 	const char *snapname = arg;
978 	uint64_t used = 0, compressed = 0, uncompressed = 0;
979 	blkptr_t bp;
980 	zio_t *zio;
981 	int err;
982 	int after_branch_point = FALSE;
983 	int drop_lock = FALSE;
984 	dsl_pool_t *dp = dd->dd_pool;
985 	objset_t *mos = dp->dp_meta_objset;
986 	dsl_dataset_t *ds, *ds_prev = NULL;
987 	uint64_t obj;
988 
989 	if (dd->dd_phys->dd_head_dataset_obj == 0)
990 		return (EINVAL);
991 
992 	if (!RW_WRITE_HELD(&dp->dp_config_rwlock)) {
993 		rw_enter(&dp->dp_config_rwlock, RW_WRITER);
994 		drop_lock = TRUE;
995 	}
996 
997 	err = dsl_dataset_open_obj(dd->dd_pool,
998 	    dd->dd_phys->dd_head_dataset_obj, NULL,
999 	    snapname ? DS_MODE_NONE : DS_MODE_EXCLUSIVE, FTAG, &ds);
1000 
1001 	if (err == 0 && snapname) {
1002 		err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1003 		    snapname, 8, 1, &obj);
1004 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1005 		if (err == 0) {
1006 			err = dsl_dataset_open_obj(dd->dd_pool, obj, NULL,
1007 			    DS_MODE_EXCLUSIVE, FTAG, &ds);
1008 		}
1009 	}
1010 	if (err) {
1011 		if (drop_lock)
1012 			rw_exit(&dp->dp_config_rwlock);
1013 		return (err);
1014 	}
1015 
1016 	obj = ds->ds_object;
1017 
1018 	/* Can't delete a branch point. */
1019 	if (ds->ds_phys->ds_num_children > 1) {
1020 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1021 		if (drop_lock)
1022 			rw_exit(&dp->dp_config_rwlock);
1023 		return (EINVAL);
1024 	}
1025 
1026 	/*
1027 	 * Can't delete a head dataset if there are snapshots of it.
1028 	 * (Except if the only snapshots are from the branch we cloned
1029 	 * from.)
1030 	 */
1031 	if (ds->ds_prev != NULL &&
1032 	    ds->ds_prev->ds_phys->ds_next_snap_obj == obj) {
1033 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1034 		if (drop_lock)
1035 			rw_exit(&dp->dp_config_rwlock);
1036 		return (EINVAL);
1037 	}
1038 
1039 	/*
1040 	 * If we made changes this txg, traverse_dsl_dataset won't find
1041 	 * them.  Try again.
1042 	 */
1043 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
1044 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1045 		if (drop_lock)
1046 			rw_exit(&dp->dp_config_rwlock);
1047 		return (EAGAIN);
1048 	}
1049 
1050 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1051 		if (ds->ds_prev) {
1052 			ds_prev = ds->ds_prev;
1053 		} else {
1054 			err = dsl_dataset_open_obj(dd->dd_pool,
1055 			    ds->ds_phys->ds_prev_snap_obj, NULL,
1056 			    DS_MODE_NONE, FTAG, &ds_prev);
1057 			if (err) {
1058 				dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1059 				if (drop_lock)
1060 					rw_exit(&dp->dp_config_rwlock);
1061 				return (err);
1062 			}
1063 		}
1064 		after_branch_point =
1065 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1066 
1067 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1068 		if (after_branch_point &&
1069 		    ds->ds_phys->ds_next_snap_obj == 0) {
1070 			/* This clone is toast. */
1071 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1072 			ds_prev->ds_phys->ds_num_children--;
1073 		} else if (!after_branch_point) {
1074 			ds_prev->ds_phys->ds_next_snap_obj =
1075 			    ds->ds_phys->ds_next_snap_obj;
1076 		}
1077 	}
1078 
1079 	/* THE POINT OF NO (unsuccessful) RETURN */
1080 
1081 	ASSERT3P(tx->tx_pool, ==, dd->dd_pool);
1082 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1083 
1084 	if (ds->ds_phys->ds_next_snap_obj != 0) {
1085 		dsl_dataset_t *ds_next;
1086 		uint64_t itor = 0;
1087 
1088 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1089 
1090 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1091 		    ds->ds_phys->ds_next_snap_obj, NULL,
1092 		    DS_MODE_NONE, FTAG, &ds_next));
1093 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1094 
1095 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1096 		ds_next->ds_phys->ds_prev_snap_obj =
1097 		    ds->ds_phys->ds_prev_snap_obj;
1098 		ds_next->ds_phys->ds_prev_snap_txg =
1099 		    ds->ds_phys->ds_prev_snap_txg;
1100 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1101 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1102 
1103 		/*
1104 		 * Transfer to our deadlist (which will become next's
1105 		 * new deadlist) any entries from next's current
1106 		 * deadlist which were born before prev, and free the
1107 		 * other entries.
1108 		 *
1109 		 * XXX we're doing this long task with the config lock held
1110 		 */
1111 		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
1112 		    &bp) == 0) {
1113 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1114 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1115 				    &bp, tx));
1116 				if (ds_prev && !after_branch_point &&
1117 				    bp.blk_birth >
1118 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1119 					ds_prev->ds_phys->ds_unique_bytes +=
1120 					    bp_get_dasize(dp->dp_spa, &bp);
1121 				}
1122 			} else {
1123 				used += bp_get_dasize(dp->dp_spa, &bp);
1124 				compressed += BP_GET_PSIZE(&bp);
1125 				uncompressed += BP_GET_UCSIZE(&bp);
1126 				/* XXX check return value? */
1127 				(void) arc_free(zio, dp->dp_spa, tx->tx_txg,
1128 				    &bp, NULL, NULL, ARC_NOWAIT);
1129 			}
1130 		}
1131 
1132 		/* free next's deadlist */
1133 		bplist_close(&ds_next->ds_deadlist);
1134 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1135 
1136 		/* set next's deadlist to our deadlist */
1137 		ds_next->ds_phys->ds_deadlist_obj =
1138 		    ds->ds_phys->ds_deadlist_obj;
1139 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1140 		    ds_next->ds_phys->ds_deadlist_obj));
1141 		ds->ds_phys->ds_deadlist_obj = 0;
1142 
1143 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1144 			/*
1145 			 * Update next's unique to include blocks which
1146 			 * were previously shared by only this snapshot
1147 			 * and it.  Those blocks will be born after the
1148 			 * prev snap and before this snap, and will have
1149 			 * died after the next snap and before the one
1150 			 * after that (ie. be on the snap after next's
1151 			 * deadlist).
1152 			 *
1153 			 * XXX we're doing this long task with the
1154 			 * config lock held
1155 			 */
1156 			dsl_dataset_t *ds_after_next;
1157 
1158 			VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1159 			    ds_next->ds_phys->ds_next_snap_obj, NULL,
1160 			    DS_MODE_NONE, FTAG, &ds_after_next));
1161 			itor = 0;
1162 			while (bplist_iterate(&ds_after_next->ds_deadlist,
1163 			    &itor, &bp) == 0) {
1164 				if (bp.blk_birth >
1165 				    ds->ds_phys->ds_prev_snap_txg &&
1166 				    bp.blk_birth <=
1167 				    ds->ds_phys->ds_creation_txg) {
1168 					ds_next->ds_phys->ds_unique_bytes +=
1169 					    bp_get_dasize(dp->dp_spa, &bp);
1170 				}
1171 			}
1172 
1173 			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
1174 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1175 		} else {
1176 			/*
1177 			 * It would be nice to update the head dataset's
1178 			 * unique.  To do so we would have to traverse
1179 			 * it for blocks born after ds_prev, which is
1180 			 * pretty expensive just to maintain something
1181 			 * for debugging purposes.
1182 			 */
1183 			ASSERT3P(ds_next->ds_prev, ==, ds);
1184 			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
1185 			    ds_next);
1186 			if (ds_prev) {
1187 				VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1188 				    ds->ds_phys->ds_prev_snap_obj, NULL,
1189 				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
1190 			} else {
1191 				ds_next->ds_prev = NULL;
1192 			}
1193 		}
1194 		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
1195 
1196 		/*
1197 		 * NB: unique_bytes is not accurate for head objsets
1198 		 * because we don't update it when we delete the most
1199 		 * recent snapshot -- see above comment.
1200 		 */
1201 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1202 	} else {
1203 		/*
1204 		 * There's no next snapshot, so this is a head dataset.
1205 		 * Destroy the deadlist.  Unless it's a clone, the
1206 		 * deadlist should be empty.  (If it's a clone, it's
1207 		 * safe to ignore the deadlist contents.)
1208 		 */
1209 		struct killarg ka;
1210 
1211 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1212 		bplist_close(&ds->ds_deadlist);
1213 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1214 		ds->ds_phys->ds_deadlist_obj = 0;
1215 
1216 		/*
1217 		 * Free everything that we point to (that's born after
1218 		 * the previous snapshot, if we are a clone)
1219 		 *
1220 		 * XXX we're doing this long task with the config lock held
1221 		 */
1222 		ka.usedp = &used;
1223 		ka.compressedp = &compressed;
1224 		ka.uncompressedp = &uncompressed;
1225 		ka.zio = zio;
1226 		ka.tx = tx;
1227 		err = traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1228 		    ADVANCE_POST, kill_blkptr, &ka);
1229 		ASSERT3U(err, ==, 0);
1230 	}
1231 
1232 	err = zio_wait(zio);
1233 	ASSERT3U(err, ==, 0);
1234 
1235 	dsl_dir_diduse_space(dd, -used, -compressed, -uncompressed, tx);
1236 
1237 	if (ds->ds_phys->ds_snapnames_zapobj) {
1238 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1239 		ASSERT(err == 0);
1240 	}
1241 
1242 	if (dd->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1243 		/* Erase the link in the dataset */
1244 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
1245 		dd->dd_phys->dd_head_dataset_obj = 0;
1246 		/*
1247 		 * dsl_dir_sync_destroy() called us, they'll destroy
1248 		 * the dataset.
1249 		 */
1250 	} else {
1251 		/* remove from snapshot namespace */
1252 		dsl_dataset_t *ds_head;
1253 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1254 		    dd->dd_phys->dd_head_dataset_obj, NULL,
1255 		    DS_MODE_NONE, FTAG, &ds_head));
1256 #ifdef ZFS_DEBUG
1257 		{
1258 			uint64_t val;
1259 			err = zap_lookup(mos,
1260 			    ds_head->ds_phys->ds_snapnames_zapobj,
1261 			    snapname, 8, 1, &val);
1262 			ASSERT3U(err, ==, 0);
1263 			ASSERT3U(val, ==, obj);
1264 		}
1265 #endif
1266 		err = zap_remove(mos, ds_head->ds_phys->ds_snapnames_zapobj,
1267 		    snapname, tx);
1268 		ASSERT(err == 0);
1269 		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
1270 	}
1271 
1272 	if (ds_prev && ds->ds_prev != ds_prev)
1273 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1274 
1275 	err = dmu_object_free(mos, obj, tx);
1276 	ASSERT(err == 0);
1277 
1278 	/*
1279 	 * Close the objset with mode NONE, thus leaving it with
1280 	 * DOS_REF_MAX set, so that noone can access it.
1281 	 */
1282 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1283 
1284 	if (drop_lock)
1285 		rw_exit(&dp->dp_config_rwlock);
1286 	return (0);
1287 }
1288 
1289 int
1290 dsl_dataset_snapshot_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1291 {
1292 	const char *snapname = arg;
1293 	dsl_pool_t *dp = dd->dd_pool;
1294 	dmu_buf_t *dbuf;
1295 	dsl_dataset_phys_t *dsphys;
1296 	uint64_t dsobj, value;
1297 	objset_t *mos = dp->dp_meta_objset;
1298 	dsl_dataset_t *ds;
1299 	int err;
1300 
1301 	ASSERT(dmu_tx_is_syncing(tx));
1302 
1303 	if (dd->dd_phys->dd_head_dataset_obj == 0)
1304 		return (EINVAL);
1305 	err = dsl_dataset_open_obj(dp, dd->dd_phys->dd_head_dataset_obj, NULL,
1306 	    DS_MODE_NONE, FTAG, &ds);
1307 	if (err)
1308 		return (err);
1309 
1310 	err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1311 	    snapname, 8, 1, &value);
1312 	if (err == 0) {
1313 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1314 		return (EEXIST);
1315 	}
1316 	ASSERT(err == ENOENT);
1317 
1318 	/* The point of no (unsuccessful) return */
1319 
1320 	dprintf_dd(dd, "taking snapshot %s in txg %llu\n",
1321 	    snapname, tx->tx_txg);
1322 
1323 	spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1324 
1325 	rw_enter(&dp->dp_config_rwlock, RW_WRITER);
1326 
1327 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1328 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1329 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1330 	dmu_buf_will_dirty(dbuf, tx);
1331 	dsphys = dbuf->db_data;
1332 	dsphys->ds_dir_obj = dd->dd_object;
1333 	dsphys->ds_fsid_guid = unique_create();
1334 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
1335 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1336 	    sizeof (dsphys->ds_guid));
1337 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1338 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1339 	dsphys->ds_next_snap_obj = ds->ds_object;
1340 	dsphys->ds_num_children = 1;
1341 	dsphys->ds_creation_time = gethrestime_sec();
1342 	dsphys->ds_creation_txg = tx->tx_txg;
1343 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1344 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1345 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1346 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1347 	dsphys->ds_flags = ds->ds_phys->ds_flags;
1348 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1349 	dmu_buf_rele(dbuf, FTAG);
1350 
1351 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1352 		dsl_dataset_t *ds_prev;
1353 
1354 		VERIFY(0 == dsl_dataset_open_obj(dp,
1355 		    ds->ds_phys->ds_prev_snap_obj, NULL,
1356 		    DS_MODE_NONE, FTAG, &ds_prev));
1357 		ASSERT(ds_prev->ds_phys->ds_next_snap_obj ==
1358 		    ds->ds_object ||
1359 		    ds_prev->ds_phys->ds_num_children > 1);
1360 		if (ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1361 			dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1362 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1363 			    ds_prev->ds_phys->ds_creation_txg);
1364 			ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1365 		}
1366 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1367 	} else {
1368 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==, 0);
1369 	}
1370 
1371 	bplist_close(&ds->ds_deadlist);
1372 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1373 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, dsphys->ds_creation_txg);
1374 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1375 	ds->ds_phys->ds_prev_snap_txg = dsphys->ds_creation_txg;
1376 	ds->ds_phys->ds_unique_bytes = 0;
1377 	ds->ds_phys->ds_deadlist_obj =
1378 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1379 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1380 	    ds->ds_phys->ds_deadlist_obj));
1381 
1382 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1383 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1384 	    snapname, 8, 1, &dsobj, tx);
1385 	ASSERT(err == 0);
1386 
1387 	if (ds->ds_prev)
1388 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
1389 	VERIFY(0 == dsl_dataset_open_obj(dp,
1390 	    ds->ds_phys->ds_prev_snap_obj, snapname,
1391 	    DS_MODE_NONE, ds, &ds->ds_prev));
1392 
1393 	rw_exit(&dp->dp_config_rwlock);
1394 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1395 
1396 	return (0);
1397 }
1398 
1399 void
1400 dsl_dataset_sync(dsl_dataset_t *ds, dmu_tx_t *tx)
1401 {
1402 	ASSERT(dmu_tx_is_syncing(tx));
1403 	ASSERT(ds->ds_user_ptr != NULL);
1404 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1405 
1406 	dmu_objset_sync(ds->ds_user_ptr, tx);
1407 	dsl_dir_dirty(ds->ds_dir, tx);
1408 	bplist_close(&ds->ds_deadlist);
1409 
1410 	dmu_buf_rele(ds->ds_dbuf, ds);
1411 }
1412 
1413 void
1414 dsl_dataset_stats(dsl_dataset_t *ds, dmu_objset_stats_t *dds)
1415 {
1416 	/* fill in properties crap */
1417 	dsl_dir_stats(ds->ds_dir, dds);
1418 
1419 	if (ds->ds_phys->ds_num_children != 0) {
1420 		dds->dds_is_snapshot = TRUE;
1421 		dds->dds_num_clones = ds->ds_phys->ds_num_children - 1;
1422 	}
1423 
1424 	dds->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
1425 	dds->dds_last_txg = ds->ds_phys->ds_bp.blk_birth;
1426 
1427 	dds->dds_objects_used = ds->ds_phys->ds_bp.blk_fill;
1428 	dds->dds_objects_avail = DN_MAX_OBJECT - dds->dds_objects_used;
1429 
1430 	/* We override the dataset's creation time... they should be the same */
1431 	dds->dds_creation_time = ds->ds_phys->ds_creation_time;
1432 	dds->dds_creation_txg = ds->ds_phys->ds_creation_txg;
1433 	dds->dds_space_refd = ds->ds_phys->ds_used_bytes;
1434 	dds->dds_fsid_guid = ds->ds_phys->ds_fsid_guid;
1435 
1436 	if (ds->ds_phys->ds_next_snap_obj) {
1437 		/*
1438 		 * This is a snapshot; override the dd's space used with
1439 		 * our unique space
1440 		 */
1441 		dds->dds_space_used = ds->ds_phys->ds_unique_bytes;
1442 		dds->dds_compressed_bytes =
1443 		    ds->ds_phys->ds_compressed_bytes;
1444 		dds->dds_uncompressed_bytes =
1445 		    ds->ds_phys->ds_uncompressed_bytes;
1446 	}
1447 }
1448 
1449 dsl_pool_t *
1450 dsl_dataset_pool(dsl_dataset_t *ds)
1451 {
1452 	return (ds->ds_dir->dd_pool);
1453 }
1454 
1455 struct osrenamearg {
1456 	const char *oldname;
1457 	const char *newname;
1458 };
1459 
1460 static int
1461 dsl_dataset_snapshot_rename_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1462 {
1463 	struct osrenamearg *ora = arg;
1464 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1465 	dsl_dir_t *nds;
1466 	const char *tail;
1467 	int err;
1468 	dsl_dataset_t *snds, *fsds;
1469 	uint64_t val;
1470 
1471 	err = dsl_dataset_open_spa(dd->dd_pool->dp_spa, ora->oldname,
1472 	    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &snds);
1473 	if (err)
1474 		return (err);
1475 
1476 	if (snds->ds_dir != dd) {
1477 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1478 		return (EINVAL);
1479 	}
1480 
1481 	/* better be changing a snapshot */
1482 	if (snds->ds_phys->ds_next_snap_obj == 0) {
1483 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1484 		return (EINVAL);
1485 	}
1486 
1487 	/* new fs better exist */
1488 	err = dsl_dir_open_spa(dd->dd_pool->dp_spa, ora->newname,
1489 	    FTAG, &nds, &tail);
1490 	if (err) {
1491 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1492 		return (err);
1493 	}
1494 
1495 	dsl_dir_close(nds, FTAG);
1496 
1497 	/* new name better be in same fs */
1498 	if (nds != dd) {
1499 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1500 		return (EINVAL);
1501 	}
1502 
1503 	/* new name better be a snapshot */
1504 	if (tail == NULL || tail[0] != '@') {
1505 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1506 		return (EINVAL);
1507 	}
1508 
1509 	tail++;
1510 
1511 	err = dsl_dataset_open_obj(dd->dd_pool,
1512 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &fsds);
1513 	if (err) {
1514 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1515 		return (err);
1516 	}
1517 
1518 	/* new name better not be in use */
1519 	err = zap_lookup(mos, fsds->ds_phys->ds_snapnames_zapobj,
1520 	    tail, 8, 1, &val);
1521 	if (err != ENOENT) {
1522 		if (err == 0)
1523 			err = EEXIST;
1524 		dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1525 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1526 		return (EEXIST);
1527 	}
1528 
1529 	/* The point of no (unsuccessful) return */
1530 
1531 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_WRITER);
1532 	VERIFY(0 == dsl_dataset_get_snapname(snds));
1533 	err = zap_remove(mos, fsds->ds_phys->ds_snapnames_zapobj,
1534 	    snds->ds_snapname, tx);
1535 	ASSERT3U(err, ==, 0);
1536 	mutex_enter(&snds->ds_lock);
1537 	(void) strcpy(snds->ds_snapname, tail);
1538 	mutex_exit(&snds->ds_lock);
1539 	err = zap_add(mos, fsds->ds_phys->ds_snapnames_zapobj,
1540 	    snds->ds_snapname, 8, 1, &snds->ds_object, tx);
1541 	ASSERT3U(err, ==, 0);
1542 	rw_exit(&dd->dd_pool->dp_config_rwlock);
1543 
1544 	dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1545 	dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1546 	return (0);
1547 }
1548 
1549 #pragma weak dmu_objset_rename = dsl_dataset_rename
1550 int
1551 dsl_dataset_rename(const char *osname, const char *newname)
1552 {
1553 	dsl_dir_t *dd;
1554 	const char *tail;
1555 	struct osrenamearg ora;
1556 	int err;
1557 
1558 	err = dsl_dir_open(osname, FTAG, &dd, &tail);
1559 	if (err)
1560 		return (err);
1561 	if (tail == NULL) {
1562 		err = dsl_dir_sync_task(dd,
1563 		    dsl_dir_rename_sync, (void*)newname, 1<<12);
1564 		dsl_dir_close(dd, FTAG);
1565 		return (err);
1566 	}
1567 	if (tail[0] != '@') {
1568 		/* the name ended in a nonexistant component */
1569 		dsl_dir_close(dd, FTAG);
1570 		return (ENOENT);
1571 	}
1572 
1573 	ora.oldname = osname;
1574 	ora.newname = newname;
1575 
1576 	err = dsl_dir_sync_task(dd,
1577 	    dsl_dataset_snapshot_rename_sync, &ora, 1<<12);
1578 	dsl_dir_close(dd, FTAG);
1579 	return (err);
1580 }
1581 
1582 /* ARGSUSED */
1583 static int
1584 dsl_dataset_promote_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1585 {
1586 	dsl_dir_t *pdd = NULL;
1587 	dsl_dataset_t *ds = NULL;
1588 	dsl_dataset_t *hds = NULL;
1589 	dsl_dataset_t *phds = NULL;
1590 	dsl_dataset_t *pivot_ds = NULL;
1591 	dsl_dataset_t *newnext_ds = NULL;
1592 	int err;
1593 	char *name = NULL;
1594 	uint64_t used = 0, comp = 0, uncomp = 0, unique = 0, itor = 0;
1595 	blkptr_t bp;
1596 
1597 	/* Check that it is a clone */
1598 	if (dd->dd_phys->dd_clone_parent_obj == 0)
1599 		return (EINVAL);
1600 
1601 	/* Open everyone */
1602 	if (err = dsl_dataset_open_obj(dd->dd_pool,
1603 	    dd->dd_phys->dd_clone_parent_obj,
1604 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &pivot_ds))
1605 		goto out;
1606 	pdd = pivot_ds->ds_dir;
1607 	if (err = dsl_dataset_open_obj(dd->dd_pool,
1608 	    pdd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &phds))
1609 		goto out;
1610 	if (err = dsl_dataset_open_obj(dd->dd_pool,
1611 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds))
1612 		goto out;
1613 
1614 	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE) {
1615 		err = EXDEV;
1616 		goto out;
1617 	}
1618 
1619 	/* find pivot point's new next ds */
1620 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, hds->ds_object,
1621 	    NULL, DS_MODE_NONE, FTAG, &newnext_ds));
1622 	while (newnext_ds->ds_phys->ds_prev_snap_obj != pivot_ds->ds_object) {
1623 		dsl_dataset_t *prev;
1624 
1625 		if (err = dsl_dataset_open_obj(dd->dd_pool,
1626 		    newnext_ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_NONE,
1627 		    FTAG, &prev))
1628 			goto out;
1629 		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
1630 		newnext_ds = prev;
1631 	}
1632 
1633 	/* compute pivot point's new unique space */
1634 	while ((err = bplist_iterate(&newnext_ds->ds_deadlist,
1635 	    &itor, &bp)) == 0) {
1636 		if (bp.blk_birth > pivot_ds->ds_phys->ds_prev_snap_txg)
1637 			unique += bp_get_dasize(dd->dd_pool->dp_spa, &bp);
1638 	}
1639 	if (err != ENOENT)
1640 		goto out;
1641 
1642 	/* need the config lock to ensure that the snapshots are not open */
1643 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_WRITER);
1644 
1645 	/* Walk the snapshots that we are moving */
1646 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
1647 	ds = pivot_ds;
1648 	/* CONSTCOND */
1649 	while (TRUE) {
1650 		uint64_t val, dlused, dlcomp, dluncomp;
1651 		dsl_dataset_t *prev;
1652 
1653 		/* Check that the snapshot name does not conflict */
1654 		dsl_dataset_name(ds, name);
1655 		err = zap_lookup(dd->dd_pool->dp_meta_objset,
1656 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
1657 		    8, 1, &val);
1658 		if (err != ENOENT) {
1659 			if (err == 0)
1660 				err = EEXIST;
1661 			goto out;
1662 		}
1663 
1664 		/*
1665 		 * compute space to transfer.  Each snapshot gave birth to:
1666 		 * (my used) - (prev's used) + (deadlist's used)
1667 		 */
1668 		used += ds->ds_phys->ds_used_bytes;
1669 		comp += ds->ds_phys->ds_compressed_bytes;
1670 		uncomp += ds->ds_phys->ds_uncompressed_bytes;
1671 
1672 		/* If we reach the first snapshot, we're done. */
1673 		if (ds->ds_phys->ds_prev_snap_obj == 0)
1674 			break;
1675 
1676 		if (err = bplist_space(&ds->ds_deadlist,
1677 		    &dlused, &dlcomp, &dluncomp))
1678 			goto out;
1679 		if (err = dsl_dataset_open_obj(dd->dd_pool,
1680 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
1681 		    FTAG, &prev))
1682 			goto out;
1683 		used += dlused - prev->ds_phys->ds_used_bytes;
1684 		comp += dlcomp - prev->ds_phys->ds_compressed_bytes;
1685 		uncomp += dluncomp - prev->ds_phys->ds_uncompressed_bytes;
1686 
1687 		/*
1688 		 * We could be a clone of a clone.  If we reach our
1689 		 * parent's branch point, we're done.
1690 		 */
1691 		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
1692 			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
1693 			break;
1694 		}
1695 		if (ds != pivot_ds)
1696 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1697 		ds = prev;
1698 	}
1699 	if (ds != pivot_ds)
1700 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1701 	ds = NULL;
1702 
1703 	/* Check that there is enough space here */
1704 	if (err = dsl_dir_transfer_possible(pdd, dd, used))
1705 		goto out;
1706 
1707 	/* The point of no (unsuccessful) return */
1708 
1709 	/* move snapshots to this dir */
1710 	ds = pivot_ds;
1711 	/* CONSTCOND */
1712 	while (TRUE) {
1713 		dsl_dataset_t *prev;
1714 
1715 		/* move snap name entry */
1716 		dsl_dataset_name(ds, name);
1717 		VERIFY(0 == zap_remove(dd->dd_pool->dp_meta_objset,
1718 		    phds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname, tx));
1719 		VERIFY(0 == zap_add(dd->dd_pool->dp_meta_objset,
1720 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
1721 		    8, 1, &ds->ds_object, tx));
1722 
1723 		/* change containing dsl_dir */
1724 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1725 		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, pdd->dd_object);
1726 		ds->ds_phys->ds_dir_obj = dd->dd_object;
1727 		ASSERT3P(ds->ds_dir, ==, pdd);
1728 		dsl_dir_close(ds->ds_dir, ds);
1729 		VERIFY(0 == dsl_dir_open_obj(dd->dd_pool, dd->dd_object,
1730 		    NULL, ds, &ds->ds_dir));
1731 
1732 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
1733 
1734 		if (ds->ds_phys->ds_prev_snap_obj == 0)
1735 			break;
1736 
1737 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1738 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
1739 		    FTAG, &prev));
1740 
1741 		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
1742 			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
1743 			break;
1744 		}
1745 		if (ds != pivot_ds)
1746 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1747 		ds = prev;
1748 	}
1749 
1750 	/* change pivot point's next snap */
1751 	dmu_buf_will_dirty(pivot_ds->ds_dbuf, tx);
1752 	pivot_ds->ds_phys->ds_next_snap_obj = newnext_ds->ds_object;
1753 
1754 	/* change clone_parent-age */
1755 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
1756 	ASSERT3U(dd->dd_phys->dd_clone_parent_obj, ==, pivot_ds->ds_object);
1757 	dd->dd_phys->dd_clone_parent_obj = pdd->dd_phys->dd_clone_parent_obj;
1758 	dmu_buf_will_dirty(pdd->dd_dbuf, tx);
1759 	pdd->dd_phys->dd_clone_parent_obj = pivot_ds->ds_object;
1760 
1761 	/* change space accounting */
1762 	dsl_dir_diduse_space(pdd, -used, -comp, -uncomp, tx);
1763 	dsl_dir_diduse_space(dd, used, comp, uncomp, tx);
1764 	pivot_ds->ds_phys->ds_unique_bytes = unique;
1765 
1766 	err = 0;
1767 
1768 out:
1769 	if (RW_WRITE_HELD(&dd->dd_pool->dp_config_rwlock))
1770 		rw_exit(&dd->dd_pool->dp_config_rwlock);
1771 	if (hds)
1772 		dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
1773 	if (phds)
1774 		dsl_dataset_close(phds, DS_MODE_NONE, FTAG);
1775 	if (ds && ds != pivot_ds)
1776 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1777 	if (pivot_ds)
1778 		dsl_dataset_close(pivot_ds, DS_MODE_EXCLUSIVE, FTAG);
1779 	if (newnext_ds)
1780 		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
1781 	if (name)
1782 		kmem_free(name, MAXPATHLEN);
1783 	return (err);
1784 }
1785 
1786 int
1787 dsl_dataset_promote(const char *name)
1788 {
1789 	dsl_dataset_t *ds;
1790 	int err;
1791 	dmu_object_info_t doi;
1792 
1793 	err = dsl_dataset_open(name, DS_MODE_NONE, FTAG, &ds);
1794 	if (err)
1795 		return (err);
1796 
1797 	err = dmu_object_info(ds->ds_dir->dd_pool->dp_meta_objset,
1798 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
1799 	if (err) {
1800 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1801 		return (err);
1802 	}
1803 
1804 	/*
1805 	 * Add in 128x the snapnames zapobj size, since we will be moving
1806 	 * a bunch of snapnames to the promoted ds, and dirtying their
1807 	 * bonus buffers.
1808 	 */
1809 	err = dsl_dir_sync_task(ds->ds_dir, dsl_dataset_promote_sync, NULL,
1810 	    (1<<20) + (doi.doi_physical_blks << (SPA_MINBLOCKSHIFT + 7)));
1811 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1812 	return (err);
1813 }
1814