xref: /titanic_51/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision 2401904da3a3a741ce3aa7e27493641b2257523f)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu_objset.h>
29 #include <sys/dsl_dataset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dsl_prop.h>
32 #include <sys/dsl_synctask.h>
33 #include <sys/dmu_traverse.h>
34 #include <sys/dmu_tx.h>
35 #include <sys/arc.h>
36 #include <sys/zio.h>
37 #include <sys/zap.h>
38 #include <sys/unique.h>
39 #include <sys/zfs_context.h>
40 
41 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
42 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
43 static dsl_checkfunc_t dsl_dataset_rollback_check;
44 static dsl_syncfunc_t dsl_dataset_rollback_sync;
45 static dsl_checkfunc_t dsl_dataset_destroy_check;
46 static dsl_syncfunc_t dsl_dataset_destroy_sync;
47 
48 #define	DOS_REF_MAX	(1ULL << 62)
49 
50 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
51 
52 /*
53  * We use weighted reference counts to express the various forms of exclusion
54  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
55  * is DOS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
56  * This makes the exclusion logic simple: the total refcnt for all opens cannot
57  * exceed DOS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
58  * weight (DOS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
59  * just over half of the refcnt space, so there can't be more than one, but it
60  * can peacefully coexist with any number of STANDARD opens.
61  */
62 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
63 	0,			/* DOS_MODE_NONE - invalid		*/
64 	1,			/* DOS_MODE_STANDARD - unlimited number	*/
65 	(DOS_REF_MAX >> 1) + 1,	/* DOS_MODE_PRIMARY - only one of these	*/
66 	DOS_REF_MAX		/* DOS_MODE_EXCLUSIVE - no other opens	*/
67 };
68 
69 
70 void
71 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
72 {
73 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
74 	int compressed = BP_GET_PSIZE(bp);
75 	int uncompressed = BP_GET_UCSIZE(bp);
76 
77 	dprintf_bp(bp, "born, ds=%p\n", ds);
78 
79 	ASSERT(dmu_tx_is_syncing(tx));
80 	/* It could have been compressed away to nothing */
81 	if (BP_IS_HOLE(bp))
82 		return;
83 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
84 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
85 	if (ds == NULL) {
86 		/*
87 		 * Account for the meta-objset space in its placeholder
88 		 * dsl_dir.
89 		 */
90 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
91 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
92 		    used, compressed, uncompressed, tx);
93 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
94 		return;
95 	}
96 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
97 	mutex_enter(&ds->ds_lock);
98 	ds->ds_phys->ds_used_bytes += used;
99 	ds->ds_phys->ds_compressed_bytes += compressed;
100 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
101 	ds->ds_phys->ds_unique_bytes += used;
102 	mutex_exit(&ds->ds_lock);
103 	dsl_dir_diduse_space(ds->ds_dir,
104 	    used, compressed, uncompressed, tx);
105 }
106 
107 void
108 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
109 {
110 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
111 	int compressed = BP_GET_PSIZE(bp);
112 	int uncompressed = BP_GET_UCSIZE(bp);
113 
114 	ASSERT(dmu_tx_is_syncing(tx));
115 	if (BP_IS_HOLE(bp))
116 		return;
117 
118 	ASSERT(used > 0);
119 	if (ds == NULL) {
120 		/*
121 		 * Account for the meta-objset space in its placeholder
122 		 * dataset.
123 		 */
124 		/* XXX this can fail, what do we do when it does? */
125 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
126 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
127 		bzero(bp, sizeof (blkptr_t));
128 
129 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
130 		    -used, -compressed, -uncompressed, tx);
131 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
132 		return;
133 	}
134 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
135 
136 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
137 
138 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
139 		dprintf_bp(bp, "freeing: %s", "");
140 		/* XXX check return code? */
141 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
142 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
143 
144 		mutex_enter(&ds->ds_lock);
145 		/* XXX unique_bytes is not accurate for head datasets */
146 		/* ASSERT3U(ds->ds_phys->ds_unique_bytes, >=, used); */
147 		ds->ds_phys->ds_unique_bytes -= used;
148 		mutex_exit(&ds->ds_lock);
149 		dsl_dir_diduse_space(ds->ds_dir,
150 		    -used, -compressed, -uncompressed, tx);
151 	} else {
152 		dprintf_bp(bp, "putting on dead list: %s", "");
153 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
154 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
155 		if (ds->ds_phys->ds_prev_snap_obj != 0) {
156 			ASSERT3U(ds->ds_prev->ds_object, ==,
157 			    ds->ds_phys->ds_prev_snap_obj);
158 			ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
159 			if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
160 			    ds->ds_object && bp->blk_birth >
161 			    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
162 				dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
163 				mutex_enter(&ds->ds_prev->ds_lock);
164 				ds->ds_prev->ds_phys->ds_unique_bytes +=
165 				    used;
166 				mutex_exit(&ds->ds_prev->ds_lock);
167 			}
168 		}
169 	}
170 	bzero(bp, sizeof (blkptr_t));
171 	mutex_enter(&ds->ds_lock);
172 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
173 	ds->ds_phys->ds_used_bytes -= used;
174 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
175 	ds->ds_phys->ds_compressed_bytes -= compressed;
176 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
177 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
178 	mutex_exit(&ds->ds_lock);
179 }
180 
181 uint64_t
182 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
183 {
184 	if (ds == NULL)
185 		return (0);
186 	/*
187 	 * The snapshot creation could fail, but that would cause an
188 	 * incorrect FALSE return, which would only result in an
189 	 * overestimation of the amount of space that an operation would
190 	 * consume, which is OK.
191 	 *
192 	 * There's also a small window where we could miss a pending
193 	 * snapshot, because we could set the sync task in the quiescing
194 	 * phase.  So this should only be used as a guess.
195 	 */
196 	return (MAX(ds->ds_phys->ds_prev_snap_txg, ds->ds_trysnap_txg));
197 }
198 
199 int
200 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
201 {
202 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
203 }
204 
205 /* ARGSUSED */
206 static void
207 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
208 {
209 	dsl_dataset_t *ds = dsv;
210 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
211 
212 	/* open_refcount == DOS_REF_MAX when deleting */
213 	ASSERT(ds->ds_open_refcount == 0 ||
214 	    ds->ds_open_refcount == DOS_REF_MAX);
215 
216 	dprintf_ds(ds, "evicting %s\n", "");
217 
218 	unique_remove(ds->ds_phys->ds_fsid_guid);
219 
220 	if (ds->ds_user_ptr != NULL)
221 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
222 
223 	if (ds->ds_prev) {
224 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
225 		ds->ds_prev = NULL;
226 	}
227 
228 	bplist_close(&ds->ds_deadlist);
229 	dsl_dir_close(ds->ds_dir, ds);
230 
231 	if (list_link_active(&ds->ds_synced_link))
232 		list_remove(&dp->dp_synced_objsets, ds);
233 
234 	mutex_destroy(&ds->ds_lock);
235 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
236 
237 	kmem_free(ds, sizeof (dsl_dataset_t));
238 }
239 
240 static int
241 dsl_dataset_get_snapname(dsl_dataset_t *ds)
242 {
243 	dsl_dataset_phys_t *headphys;
244 	int err;
245 	dmu_buf_t *headdbuf;
246 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
247 	objset_t *mos = dp->dp_meta_objset;
248 
249 	if (ds->ds_snapname[0])
250 		return (0);
251 	if (ds->ds_phys->ds_next_snap_obj == 0)
252 		return (0);
253 
254 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
255 	    FTAG, &headdbuf);
256 	if (err)
257 		return (err);
258 	headphys = headdbuf->db_data;
259 	err = zap_value_search(dp->dp_meta_objset,
260 	    headphys->ds_snapnames_zapobj, ds->ds_object, ds->ds_snapname);
261 	dmu_buf_rele(headdbuf, FTAG);
262 	return (err);
263 }
264 
265 int
266 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
267     int mode, void *tag, dsl_dataset_t **dsp)
268 {
269 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
270 	objset_t *mos = dp->dp_meta_objset;
271 	dmu_buf_t *dbuf;
272 	dsl_dataset_t *ds;
273 	int err;
274 
275 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
276 	    dsl_pool_sync_context(dp));
277 
278 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
279 	if (err)
280 		return (err);
281 	ds = dmu_buf_get_user(dbuf);
282 	if (ds == NULL) {
283 		dsl_dataset_t *winner;
284 
285 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
286 		ds->ds_dbuf = dbuf;
287 		ds->ds_object = dsobj;
288 		ds->ds_phys = dbuf->db_data;
289 
290 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
291 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
292 		    NULL);
293 
294 		err = bplist_open(&ds->ds_deadlist,
295 		    mos, ds->ds_phys->ds_deadlist_obj);
296 		if (err == 0) {
297 			err = dsl_dir_open_obj(dp,
298 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
299 		}
300 		if (err) {
301 			/*
302 			 * we don't really need to close the blist if we
303 			 * just opened it.
304 			 */
305 			mutex_destroy(&ds->ds_lock);
306 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
307 			kmem_free(ds, sizeof (dsl_dataset_t));
308 			dmu_buf_rele(dbuf, tag);
309 			return (err);
310 		}
311 
312 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
313 			ds->ds_snapname[0] = '\0';
314 			if (ds->ds_phys->ds_prev_snap_obj) {
315 				err = dsl_dataset_open_obj(dp,
316 				    ds->ds_phys->ds_prev_snap_obj, NULL,
317 				    DS_MODE_NONE, ds, &ds->ds_prev);
318 			}
319 		} else {
320 			if (snapname) {
321 #ifdef ZFS_DEBUG
322 				dsl_dataset_phys_t *headphys;
323 				dmu_buf_t *headdbuf;
324 				err = dmu_bonus_hold(mos,
325 				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
326 				    FTAG, &headdbuf);
327 				if (err == 0) {
328 					headphys = headdbuf->db_data;
329 					uint64_t foundobj;
330 					err = zap_lookup(dp->dp_meta_objset,
331 					    headphys->ds_snapnames_zapobj,
332 					    snapname, sizeof (foundobj), 1,
333 					    &foundobj);
334 					ASSERT3U(foundobj, ==, dsobj);
335 					dmu_buf_rele(headdbuf, FTAG);
336 				}
337 #endif
338 				(void) strcat(ds->ds_snapname, snapname);
339 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
340 				err = dsl_dataset_get_snapname(ds);
341 			}
342 		}
343 
344 		if (err == 0) {
345 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
346 			    dsl_dataset_evict);
347 		}
348 		if (err || winner) {
349 			bplist_close(&ds->ds_deadlist);
350 			if (ds->ds_prev) {
351 				dsl_dataset_close(ds->ds_prev,
352 				    DS_MODE_NONE, ds);
353 			}
354 			dsl_dir_close(ds->ds_dir, ds);
355 			mutex_destroy(&ds->ds_lock);
356 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
357 			kmem_free(ds, sizeof (dsl_dataset_t));
358 			if (err) {
359 				dmu_buf_rele(dbuf, tag);
360 				return (err);
361 			}
362 			ds = winner;
363 		} else {
364 			uint64_t new =
365 			    unique_insert(ds->ds_phys->ds_fsid_guid);
366 			if (new != ds->ds_phys->ds_fsid_guid) {
367 				/* XXX it won't necessarily be synced... */
368 				ds->ds_phys->ds_fsid_guid = new;
369 			}
370 		}
371 	}
372 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
373 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
374 
375 	mutex_enter(&ds->ds_lock);
376 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
377 	    (ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) &&
378 	    !DS_MODE_IS_INCONSISTENT(mode)) ||
379 	    (ds->ds_open_refcount + weight > DOS_REF_MAX)) {
380 		mutex_exit(&ds->ds_lock);
381 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
382 		return (EBUSY);
383 	}
384 	ds->ds_open_refcount += weight;
385 	mutex_exit(&ds->ds_lock);
386 
387 	*dsp = ds;
388 	return (0);
389 }
390 
391 int
392 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
393     void *tag, dsl_dataset_t **dsp)
394 {
395 	dsl_dir_t *dd;
396 	dsl_pool_t *dp;
397 	const char *tail;
398 	uint64_t obj;
399 	dsl_dataset_t *ds = NULL;
400 	int err = 0;
401 
402 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
403 	if (err)
404 		return (err);
405 
406 	dp = dd->dd_pool;
407 	obj = dd->dd_phys->dd_head_dataset_obj;
408 	rw_enter(&dp->dp_config_rwlock, RW_READER);
409 	if (obj == 0) {
410 		/* A dataset with no associated objset */
411 		err = ENOENT;
412 		goto out;
413 	}
414 
415 	if (tail != NULL) {
416 		objset_t *mos = dp->dp_meta_objset;
417 
418 		err = dsl_dataset_open_obj(dp, obj, NULL,
419 		    DS_MODE_NONE, tag, &ds);
420 		if (err)
421 			goto out;
422 		obj = ds->ds_phys->ds_snapnames_zapobj;
423 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
424 		ds = NULL;
425 
426 		if (tail[0] != '@') {
427 			err = ENOENT;
428 			goto out;
429 		}
430 		tail++;
431 
432 		/* Look for a snapshot */
433 		if (!DS_MODE_IS_READONLY(mode)) {
434 			err = EROFS;
435 			goto out;
436 		}
437 		dprintf("looking for snapshot '%s'\n", tail);
438 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
439 		if (err)
440 			goto out;
441 	}
442 	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
443 
444 out:
445 	rw_exit(&dp->dp_config_rwlock);
446 	dsl_dir_close(dd, FTAG);
447 
448 	ASSERT3U((err == 0), ==, (ds != NULL));
449 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
450 
451 	*dsp = ds;
452 	return (err);
453 }
454 
455 int
456 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
457 {
458 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
459 }
460 
461 void
462 dsl_dataset_name(dsl_dataset_t *ds, char *name)
463 {
464 	if (ds == NULL) {
465 		(void) strcpy(name, "mos");
466 	} else {
467 		dsl_dir_name(ds->ds_dir, name);
468 		VERIFY(0 == dsl_dataset_get_snapname(ds));
469 		if (ds->ds_snapname[0]) {
470 			(void) strcat(name, "@");
471 			if (!MUTEX_HELD(&ds->ds_lock)) {
472 				/*
473 				 * We use a "recursive" mutex so that we
474 				 * can call dprintf_ds() with ds_lock held.
475 				 */
476 				mutex_enter(&ds->ds_lock);
477 				(void) strcat(name, ds->ds_snapname);
478 				mutex_exit(&ds->ds_lock);
479 			} else {
480 				(void) strcat(name, ds->ds_snapname);
481 			}
482 		}
483 	}
484 }
485 
486 void
487 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
488 {
489 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
490 	mutex_enter(&ds->ds_lock);
491 	ASSERT3U(ds->ds_open_refcount, >=, weight);
492 	ds->ds_open_refcount -= weight;
493 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
494 	    mode, ds->ds_open_refcount);
495 	mutex_exit(&ds->ds_lock);
496 
497 	dmu_buf_rele(ds->ds_dbuf, tag);
498 }
499 
500 void
501 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
502 {
503 	objset_t *mos = dp->dp_meta_objset;
504 	dmu_buf_t *dbuf;
505 	dsl_dataset_phys_t *dsphys;
506 	dsl_dataset_t *ds;
507 	uint64_t dsobj;
508 	dsl_dir_t *dd;
509 
510 	dsl_dir_create_root(mos, ddobjp, tx);
511 	VERIFY(0 == dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG, &dd));
512 
513 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
514 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
515 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
516 	dmu_buf_will_dirty(dbuf, tx);
517 	dsphys = dbuf->db_data;
518 	dsphys->ds_dir_obj = dd->dd_object;
519 	dsphys->ds_fsid_guid = unique_create();
520 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
521 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
522 	    sizeof (dsphys->ds_guid));
523 	dsphys->ds_snapnames_zapobj =
524 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
525 	dsphys->ds_creation_time = gethrestime_sec();
526 	dsphys->ds_creation_txg = tx->tx_txg;
527 	dsphys->ds_deadlist_obj =
528 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
529 	dmu_buf_rele(dbuf, FTAG);
530 
531 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
532 	dd->dd_phys->dd_head_dataset_obj = dsobj;
533 	dsl_dir_close(dd, FTAG);
534 
535 	VERIFY(0 ==
536 	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
537 	(void) dmu_objset_create_impl(dp->dp_spa, ds, DMU_OST_ZFS, tx);
538 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
539 }
540 
541 uint64_t
542 dsl_dataset_create_sync(dsl_dir_t *pdd,
543     const char *lastname, dsl_dataset_t *clone_parent, dmu_tx_t *tx)
544 {
545 	dsl_pool_t *dp = pdd->dd_pool;
546 	dmu_buf_t *dbuf;
547 	dsl_dataset_phys_t *dsphys;
548 	uint64_t dsobj, ddobj;
549 	objset_t *mos = dp->dp_meta_objset;
550 	dsl_dir_t *dd;
551 
552 	ASSERT(clone_parent == NULL || clone_parent->ds_dir->dd_pool == dp);
553 	ASSERT(clone_parent == NULL ||
554 	    clone_parent->ds_phys->ds_num_children > 0);
555 	ASSERT(lastname[0] != '@');
556 	ASSERT(dmu_tx_is_syncing(tx));
557 
558 	ddobj = dsl_dir_create_sync(pdd, lastname, tx);
559 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
560 
561 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
562 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
563 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
564 	dmu_buf_will_dirty(dbuf, tx);
565 	dsphys = dbuf->db_data;
566 	dsphys->ds_dir_obj = dd->dd_object;
567 	dsphys->ds_fsid_guid = unique_create();
568 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
569 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
570 	    sizeof (dsphys->ds_guid));
571 	dsphys->ds_snapnames_zapobj =
572 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
573 	dsphys->ds_creation_time = gethrestime_sec();
574 	dsphys->ds_creation_txg = tx->tx_txg;
575 	dsphys->ds_deadlist_obj =
576 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
577 	if (clone_parent) {
578 		dsphys->ds_prev_snap_obj = clone_parent->ds_object;
579 		dsphys->ds_prev_snap_txg =
580 		    clone_parent->ds_phys->ds_creation_txg;
581 		dsphys->ds_used_bytes =
582 		    clone_parent->ds_phys->ds_used_bytes;
583 		dsphys->ds_compressed_bytes =
584 		    clone_parent->ds_phys->ds_compressed_bytes;
585 		dsphys->ds_uncompressed_bytes =
586 		    clone_parent->ds_phys->ds_uncompressed_bytes;
587 		dsphys->ds_bp = clone_parent->ds_phys->ds_bp;
588 
589 		dmu_buf_will_dirty(clone_parent->ds_dbuf, tx);
590 		clone_parent->ds_phys->ds_num_children++;
591 
592 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
593 		dd->dd_phys->dd_clone_parent_obj = clone_parent->ds_object;
594 	}
595 	dmu_buf_rele(dbuf, FTAG);
596 
597 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
598 	dd->dd_phys->dd_head_dataset_obj = dsobj;
599 	dsl_dir_close(dd, FTAG);
600 
601 	return (dsobj);
602 }
603 
604 struct destroyarg {
605 	dsl_sync_task_group_t *dstg;
606 	char *snapname;
607 	void *tag;
608 	char *failed;
609 };
610 
611 static int
612 dsl_snapshot_destroy_one(char *name, void *arg)
613 {
614 	struct destroyarg *da = arg;
615 	dsl_dataset_t *ds;
616 	char *cp;
617 	int err;
618 
619 	(void) strcat(name, "@");
620 	(void) strcat(name, da->snapname);
621 	err = dsl_dataset_open(name,
622 	    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
623 	    da->tag, &ds);
624 	cp = strchr(name, '@');
625 	*cp = '\0';
626 	if (err == ENOENT)
627 		return (0);
628 	if (err) {
629 		(void) strcpy(da->failed, name);
630 		return (err);
631 	}
632 
633 	dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
634 	    dsl_dataset_destroy_sync, ds, da->tag, 0);
635 	return (0);
636 }
637 
638 /*
639  * Destroy 'snapname' in all descendants of 'fsname'.
640  */
641 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
642 int
643 dsl_snapshots_destroy(char *fsname, char *snapname)
644 {
645 	int err;
646 	struct destroyarg da;
647 	dsl_sync_task_t *dst;
648 	spa_t *spa;
649 	char *cp;
650 
651 	cp = strchr(fsname, '/');
652 	if (cp) {
653 		*cp = '\0';
654 		err = spa_open(fsname, &spa, FTAG);
655 		*cp = '/';
656 	} else {
657 		err = spa_open(fsname, &spa, FTAG);
658 	}
659 	if (err)
660 		return (err);
661 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
662 	da.snapname = snapname;
663 	da.tag = FTAG;
664 	da.failed = fsname;
665 
666 	err = dmu_objset_find(fsname,
667 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
668 
669 	if (err == 0)
670 		err = dsl_sync_task_group_wait(da.dstg);
671 
672 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
673 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
674 		dsl_dataset_t *ds = dst->dst_arg1;
675 		if (dst->dst_err) {
676 			dsl_dataset_name(ds, fsname);
677 			cp = strchr(fsname, '@');
678 			*cp = '\0';
679 		}
680 		/*
681 		 * If it was successful, destroy_sync would have
682 		 * closed the ds
683 		 */
684 		if (err)
685 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
686 	}
687 
688 	dsl_sync_task_group_destroy(da.dstg);
689 	spa_close(spa, FTAG);
690 	return (err);
691 }
692 
693 int
694 dsl_dataset_destroy(const char *name)
695 {
696 	int err;
697 	dsl_sync_task_group_t *dstg;
698 	objset_t *os;
699 	dsl_dataset_t *ds;
700 	dsl_dir_t *dd;
701 	uint64_t obj;
702 
703 	if (strchr(name, '@')) {
704 		/* Destroying a snapshot is simpler */
705 		err = dsl_dataset_open(name,
706 		    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
707 		    FTAG, &ds);
708 		if (err)
709 			return (err);
710 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
711 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
712 		    ds, FTAG, 0);
713 		if (err)
714 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
715 		return (err);
716 	}
717 
718 	err = dmu_objset_open(name, DMU_OST_ANY,
719 	    DS_MODE_EXCLUSIVE | DS_MODE_INCONSISTENT, &os);
720 	if (err)
721 		return (err);
722 	ds = os->os->os_dsl_dataset;
723 	dd = ds->ds_dir;
724 
725 	/*
726 	 * Check for errors and mark this ds as inconsistent, in
727 	 * case we crash while freeing the objects.
728 	 */
729 	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
730 	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
731 	if (err) {
732 		dmu_objset_close(os);
733 		return (err);
734 	}
735 
736 	/*
737 	 * remove the objects in open context, so that we won't
738 	 * have too much to do in syncing context.
739 	 */
740 	for (obj = 0; err == 0;
741 	    err = dmu_object_next(os, &obj, FALSE)) {
742 		dmu_tx_t *tx = dmu_tx_create(os);
743 		dmu_tx_hold_free(tx, obj, 0, DMU_OBJECT_END);
744 		dmu_tx_hold_bonus(tx, obj);
745 		err = dmu_tx_assign(tx, TXG_WAIT);
746 		if (err) {
747 			/*
748 			 * Perhaps there is not enough disk
749 			 * space.  Just deal with it from
750 			 * dsl_dataset_destroy_sync().
751 			 */
752 			dmu_tx_abort(tx);
753 			continue;
754 		}
755 		VERIFY(0 == dmu_object_free(os, obj, tx));
756 		dmu_tx_commit(tx);
757 	}
758 	/* Make sure it's not dirty before we finish destroying it. */
759 	txg_wait_synced(dd->dd_pool, 0);
760 
761 	dmu_objset_close(os);
762 	if (err != ESRCH)
763 		return (err);
764 
765 	err = dsl_dataset_open(name,
766 	    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
767 	    FTAG, &ds);
768 	if (err)
769 		return (err);
770 
771 	err = dsl_dir_open(name, FTAG, &dd, NULL);
772 	if (err) {
773 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
774 		return (err);
775 	}
776 
777 	/*
778 	 * Blow away the dsl_dir + head dataset.
779 	 */
780 	dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
781 	dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
782 	    dsl_dataset_destroy_sync, ds, FTAG, 0);
783 	dsl_sync_task_create(dstg, dsl_dir_destroy_check,
784 	    dsl_dir_destroy_sync, dd, FTAG, 0);
785 	err = dsl_sync_task_group_wait(dstg);
786 	dsl_sync_task_group_destroy(dstg);
787 	/* if it is successful, *destroy_sync will close the ds+dd */
788 	if (err) {
789 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
790 		dsl_dir_close(dd, FTAG);
791 	}
792 	return (err);
793 }
794 
795 int
796 dsl_dataset_rollback(dsl_dataset_t *ds)
797 {
798 	ASSERT3U(ds->ds_open_refcount, ==, DOS_REF_MAX);
799 	return (dsl_sync_task_do(ds->ds_dir->dd_pool,
800 	    dsl_dataset_rollback_check, dsl_dataset_rollback_sync,
801 	    ds, NULL, 0));
802 }
803 
804 void *
805 dsl_dataset_set_user_ptr(dsl_dataset_t *ds,
806     void *p, dsl_dataset_evict_func_t func)
807 {
808 	void *old;
809 
810 	mutex_enter(&ds->ds_lock);
811 	old = ds->ds_user_ptr;
812 	if (old == NULL) {
813 		ds->ds_user_ptr = p;
814 		ds->ds_user_evict_func = func;
815 	}
816 	mutex_exit(&ds->ds_lock);
817 	return (old);
818 }
819 
820 void *
821 dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
822 {
823 	return (ds->ds_user_ptr);
824 }
825 
826 
827 void
828 dsl_dataset_get_blkptr(dsl_dataset_t *ds, blkptr_t *bp)
829 {
830 	*bp = ds->ds_phys->ds_bp;
831 }
832 
833 void
834 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
835 {
836 	ASSERT(dmu_tx_is_syncing(tx));
837 	/* If it's the meta-objset, set dp_meta_rootbp */
838 	if (ds == NULL) {
839 		tx->tx_pool->dp_meta_rootbp = *bp;
840 	} else {
841 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
842 		ds->ds_phys->ds_bp = *bp;
843 	}
844 }
845 
846 spa_t *
847 dsl_dataset_get_spa(dsl_dataset_t *ds)
848 {
849 	return (ds->ds_dir->dd_pool->dp_spa);
850 }
851 
852 void
853 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
854 {
855 	dsl_pool_t *dp;
856 
857 	if (ds == NULL) /* this is the meta-objset */
858 		return;
859 
860 	ASSERT(ds->ds_user_ptr != NULL);
861 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
862 
863 	dp = ds->ds_dir->dd_pool;
864 
865 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
866 		/* up the hold count until we can be written out */
867 		dmu_buf_add_ref(ds->ds_dbuf, ds);
868 	}
869 }
870 
871 struct killarg {
872 	uint64_t *usedp;
873 	uint64_t *compressedp;
874 	uint64_t *uncompressedp;
875 	zio_t *zio;
876 	dmu_tx_t *tx;
877 };
878 
879 static int
880 kill_blkptr(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
881 {
882 	struct killarg *ka = arg;
883 	blkptr_t *bp = &bc->bc_blkptr;
884 
885 	ASSERT3U(bc->bc_errno, ==, 0);
886 
887 	/*
888 	 * Since this callback is not called concurrently, no lock is
889 	 * needed on the accounting values.
890 	 */
891 	*ka->usedp += bp_get_dasize(spa, bp);
892 	*ka->compressedp += BP_GET_PSIZE(bp);
893 	*ka->uncompressedp += BP_GET_UCSIZE(bp);
894 	/* XXX check for EIO? */
895 	(void) arc_free(ka->zio, spa, ka->tx->tx_txg, bp, NULL, NULL,
896 	    ARC_NOWAIT);
897 	return (0);
898 }
899 
900 /* ARGSUSED */
901 static int
902 dsl_dataset_rollback_check(void *arg1, void *arg2, dmu_tx_t *tx)
903 {
904 	dsl_dataset_t *ds = arg1;
905 
906 	/*
907 	 * There must be a previous snapshot.  I suppose we could roll
908 	 * it back to being empty (and re-initialize the upper (ZPL)
909 	 * layer).  But for now there's no way to do this via the user
910 	 * interface.
911 	 */
912 	if (ds->ds_phys->ds_prev_snap_txg == 0)
913 		return (EINVAL);
914 
915 	/*
916 	 * This must not be a snapshot.
917 	 */
918 	if (ds->ds_phys->ds_next_snap_obj != 0)
919 		return (EINVAL);
920 
921 	/*
922 	 * If we made changes this txg, traverse_dsl_dataset won't find
923 	 * them.  Try again.
924 	 */
925 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
926 		return (EAGAIN);
927 
928 	return (0);
929 }
930 
931 /* ARGSUSED */
932 static void
933 dsl_dataset_rollback_sync(void *arg1, void *arg2, dmu_tx_t *tx)
934 {
935 	dsl_dataset_t *ds = arg1;
936 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
937 
938 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
939 
940 	/* Zero out the deadlist. */
941 	bplist_close(&ds->ds_deadlist);
942 	bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
943 	ds->ds_phys->ds_deadlist_obj =
944 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
945 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
946 	    ds->ds_phys->ds_deadlist_obj));
947 
948 	{
949 		/* Free blkptrs that we gave birth to */
950 		zio_t *zio;
951 		uint64_t used = 0, compressed = 0, uncompressed = 0;
952 		struct killarg ka;
953 
954 		zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL,
955 		    ZIO_FLAG_MUSTSUCCEED);
956 		ka.usedp = &used;
957 		ka.compressedp = &compressed;
958 		ka.uncompressedp = &uncompressed;
959 		ka.zio = zio;
960 		ka.tx = tx;
961 		(void) traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
962 		    ADVANCE_POST, kill_blkptr, &ka);
963 		(void) zio_wait(zio);
964 
965 		dsl_dir_diduse_space(ds->ds_dir,
966 		    -used, -compressed, -uncompressed, tx);
967 	}
968 
969 	/* Change our contents to that of the prev snapshot */
970 	ASSERT3U(ds->ds_prev->ds_object, ==, ds->ds_phys->ds_prev_snap_obj);
971 	ds->ds_phys->ds_bp = ds->ds_prev->ds_phys->ds_bp;
972 	ds->ds_phys->ds_used_bytes = ds->ds_prev->ds_phys->ds_used_bytes;
973 	ds->ds_phys->ds_compressed_bytes =
974 	    ds->ds_prev->ds_phys->ds_compressed_bytes;
975 	ds->ds_phys->ds_uncompressed_bytes =
976 	    ds->ds_prev->ds_phys->ds_uncompressed_bytes;
977 	ds->ds_phys->ds_flags = ds->ds_prev->ds_phys->ds_flags;
978 	ds->ds_phys->ds_unique_bytes = 0;
979 
980 	if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
981 		dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
982 		ds->ds_prev->ds_phys->ds_unique_bytes = 0;
983 	}
984 }
985 
986 /* ARGSUSED */
987 static int
988 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
989 {
990 	dsl_dataset_t *ds = arg1;
991 
992 	/*
993 	 * Can't delete a head dataset if there are snapshots of it.
994 	 * (Except if the only snapshots are from the branch we cloned
995 	 * from.)
996 	 */
997 	if (ds->ds_prev != NULL &&
998 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
999 		return (EINVAL);
1000 
1001 	return (0);
1002 }
1003 
1004 /* ARGSUSED */
1005 static void
1006 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1007 {
1008 	dsl_dataset_t *ds = arg1;
1009 
1010 	/* Mark it as inconsistent on-disk, in case we crash */
1011 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1012 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
1013 }
1014 
1015 /* ARGSUSED */
1016 static int
1017 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
1018 {
1019 	dsl_dataset_t *ds = arg1;
1020 
1021 	/* Can't delete a branch point. */
1022 	if (ds->ds_phys->ds_num_children > 1)
1023 		return (EEXIST);
1024 
1025 	/*
1026 	 * Can't delete a head dataset if there are snapshots of it.
1027 	 * (Except if the only snapshots are from the branch we cloned
1028 	 * from.)
1029 	 */
1030 	if (ds->ds_prev != NULL &&
1031 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
1032 		return (EINVAL);
1033 
1034 	/*
1035 	 * If we made changes this txg, traverse_dsl_dataset won't find
1036 	 * them.  Try again.
1037 	 */
1038 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
1039 		return (EAGAIN);
1040 
1041 	/* XXX we should do some i/o error checking... */
1042 	return (0);
1043 }
1044 
1045 static void
1046 dsl_dataset_destroy_sync(void *arg1, void *tag, dmu_tx_t *tx)
1047 {
1048 	dsl_dataset_t *ds = arg1;
1049 	uint64_t used = 0, compressed = 0, uncompressed = 0;
1050 	zio_t *zio;
1051 	int err;
1052 	int after_branch_point = FALSE;
1053 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1054 	objset_t *mos = dp->dp_meta_objset;
1055 	dsl_dataset_t *ds_prev = NULL;
1056 	uint64_t obj;
1057 
1058 	ASSERT3U(ds->ds_open_refcount, ==, DOS_REF_MAX);
1059 	ASSERT3U(ds->ds_phys->ds_num_children, <=, 1);
1060 	ASSERT(ds->ds_prev == NULL ||
1061 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
1062 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
1063 
1064 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1065 
1066 	obj = ds->ds_object;
1067 
1068 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1069 		if (ds->ds_prev) {
1070 			ds_prev = ds->ds_prev;
1071 		} else {
1072 			VERIFY(0 == dsl_dataset_open_obj(dp,
1073 			    ds->ds_phys->ds_prev_snap_obj, NULL,
1074 			    DS_MODE_NONE, FTAG, &ds_prev));
1075 		}
1076 		after_branch_point =
1077 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1078 
1079 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1080 		if (after_branch_point &&
1081 		    ds->ds_phys->ds_next_snap_obj == 0) {
1082 			/* This clone is toast. */
1083 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1084 			ds_prev->ds_phys->ds_num_children--;
1085 		} else if (!after_branch_point) {
1086 			ds_prev->ds_phys->ds_next_snap_obj =
1087 			    ds->ds_phys->ds_next_snap_obj;
1088 		}
1089 	}
1090 
1091 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1092 
1093 	if (ds->ds_phys->ds_next_snap_obj != 0) {
1094 		blkptr_t bp;
1095 		dsl_dataset_t *ds_next;
1096 		uint64_t itor = 0;
1097 
1098 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1099 
1100 		VERIFY(0 == dsl_dataset_open_obj(dp,
1101 		    ds->ds_phys->ds_next_snap_obj, NULL,
1102 		    DS_MODE_NONE, FTAG, &ds_next));
1103 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1104 
1105 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1106 		ds_next->ds_phys->ds_prev_snap_obj =
1107 		    ds->ds_phys->ds_prev_snap_obj;
1108 		ds_next->ds_phys->ds_prev_snap_txg =
1109 		    ds->ds_phys->ds_prev_snap_txg;
1110 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1111 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1112 
1113 		/*
1114 		 * Transfer to our deadlist (which will become next's
1115 		 * new deadlist) any entries from next's current
1116 		 * deadlist which were born before prev, and free the
1117 		 * other entries.
1118 		 *
1119 		 * XXX we're doing this long task with the config lock held
1120 		 */
1121 		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
1122 		    &bp) == 0) {
1123 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1124 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1125 				    &bp, tx));
1126 				if (ds_prev && !after_branch_point &&
1127 				    bp.blk_birth >
1128 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1129 					ds_prev->ds_phys->ds_unique_bytes +=
1130 					    bp_get_dasize(dp->dp_spa, &bp);
1131 				}
1132 			} else {
1133 				used += bp_get_dasize(dp->dp_spa, &bp);
1134 				compressed += BP_GET_PSIZE(&bp);
1135 				uncompressed += BP_GET_UCSIZE(&bp);
1136 				/* XXX check return value? */
1137 				(void) arc_free(zio, dp->dp_spa, tx->tx_txg,
1138 				    &bp, NULL, NULL, ARC_NOWAIT);
1139 			}
1140 		}
1141 
1142 		/* free next's deadlist */
1143 		bplist_close(&ds_next->ds_deadlist);
1144 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1145 
1146 		/* set next's deadlist to our deadlist */
1147 		ds_next->ds_phys->ds_deadlist_obj =
1148 		    ds->ds_phys->ds_deadlist_obj;
1149 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1150 		    ds_next->ds_phys->ds_deadlist_obj));
1151 		ds->ds_phys->ds_deadlist_obj = 0;
1152 
1153 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1154 			/*
1155 			 * Update next's unique to include blocks which
1156 			 * were previously shared by only this snapshot
1157 			 * and it.  Those blocks will be born after the
1158 			 * prev snap and before this snap, and will have
1159 			 * died after the next snap and before the one
1160 			 * after that (ie. be on the snap after next's
1161 			 * deadlist).
1162 			 *
1163 			 * XXX we're doing this long task with the
1164 			 * config lock held
1165 			 */
1166 			dsl_dataset_t *ds_after_next;
1167 
1168 			VERIFY(0 == dsl_dataset_open_obj(dp,
1169 			    ds_next->ds_phys->ds_next_snap_obj, NULL,
1170 			    DS_MODE_NONE, FTAG, &ds_after_next));
1171 			itor = 0;
1172 			while (bplist_iterate(&ds_after_next->ds_deadlist,
1173 			    &itor, &bp) == 0) {
1174 				if (bp.blk_birth >
1175 				    ds->ds_phys->ds_prev_snap_txg &&
1176 				    bp.blk_birth <=
1177 				    ds->ds_phys->ds_creation_txg) {
1178 					ds_next->ds_phys->ds_unique_bytes +=
1179 					    bp_get_dasize(dp->dp_spa, &bp);
1180 				}
1181 			}
1182 
1183 			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
1184 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1185 		} else {
1186 			/*
1187 			 * It would be nice to update the head dataset's
1188 			 * unique.  To do so we would have to traverse
1189 			 * it for blocks born after ds_prev, which is
1190 			 * pretty expensive just to maintain something
1191 			 * for debugging purposes.
1192 			 */
1193 			ASSERT3P(ds_next->ds_prev, ==, ds);
1194 			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
1195 			    ds_next);
1196 			if (ds_prev) {
1197 				VERIFY(0 == dsl_dataset_open_obj(dp,
1198 				    ds->ds_phys->ds_prev_snap_obj, NULL,
1199 				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
1200 			} else {
1201 				ds_next->ds_prev = NULL;
1202 			}
1203 		}
1204 		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
1205 
1206 		/*
1207 		 * NB: unique_bytes is not accurate for head objsets
1208 		 * because we don't update it when we delete the most
1209 		 * recent snapshot -- see above comment.
1210 		 */
1211 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1212 	} else {
1213 		/*
1214 		 * There's no next snapshot, so this is a head dataset.
1215 		 * Destroy the deadlist.  Unless it's a clone, the
1216 		 * deadlist should be empty.  (If it's a clone, it's
1217 		 * safe to ignore the deadlist contents.)
1218 		 */
1219 		struct killarg ka;
1220 
1221 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1222 		bplist_close(&ds->ds_deadlist);
1223 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1224 		ds->ds_phys->ds_deadlist_obj = 0;
1225 
1226 		/*
1227 		 * Free everything that we point to (that's born after
1228 		 * the previous snapshot, if we are a clone)
1229 		 *
1230 		 * XXX we're doing this long task with the config lock held
1231 		 */
1232 		ka.usedp = &used;
1233 		ka.compressedp = &compressed;
1234 		ka.uncompressedp = &uncompressed;
1235 		ka.zio = zio;
1236 		ka.tx = tx;
1237 		err = traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1238 		    ADVANCE_POST, kill_blkptr, &ka);
1239 		ASSERT3U(err, ==, 0);
1240 	}
1241 
1242 	err = zio_wait(zio);
1243 	ASSERT3U(err, ==, 0);
1244 
1245 	dsl_dir_diduse_space(ds->ds_dir, -used, -compressed, -uncompressed, tx);
1246 
1247 	if (ds->ds_phys->ds_snapnames_zapobj) {
1248 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1249 		ASSERT(err == 0);
1250 	}
1251 
1252 	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1253 		/* Erase the link in the dataset */
1254 		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
1255 		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
1256 		/*
1257 		 * dsl_dir_sync_destroy() called us, they'll destroy
1258 		 * the dataset.
1259 		 */
1260 	} else {
1261 		/* remove from snapshot namespace */
1262 		dsl_dataset_t *ds_head;
1263 		VERIFY(0 == dsl_dataset_open_obj(dp,
1264 		    ds->ds_dir->dd_phys->dd_head_dataset_obj, NULL,
1265 		    DS_MODE_NONE, FTAG, &ds_head));
1266 		VERIFY(0 == dsl_dataset_get_snapname(ds));
1267 #ifdef ZFS_DEBUG
1268 		{
1269 			uint64_t val;
1270 			err = zap_lookup(mos,
1271 			    ds_head->ds_phys->ds_snapnames_zapobj,
1272 			    ds->ds_snapname, 8, 1, &val);
1273 			ASSERT3U(err, ==, 0);
1274 			ASSERT3U(val, ==, obj);
1275 		}
1276 #endif
1277 		err = zap_remove(mos, ds_head->ds_phys->ds_snapnames_zapobj,
1278 		    ds->ds_snapname, tx);
1279 		ASSERT(err == 0);
1280 		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
1281 	}
1282 
1283 	if (ds_prev && ds->ds_prev != ds_prev)
1284 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1285 
1286 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
1287 	VERIFY(0 == dmu_object_free(mos, obj, tx));
1288 }
1289 
1290 /* ARGSUSED */
1291 int
1292 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
1293 {
1294 	objset_t *os = arg1;
1295 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
1296 	const char *snapname = arg2;
1297 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
1298 	int err;
1299 	uint64_t value;
1300 
1301 	/*
1302 	 * We don't allow multiple snapshots of the same txg.  If there
1303 	 * is already one, try again.
1304 	 */
1305 	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
1306 		return (EAGAIN);
1307 
1308 	/*
1309 	 * Check for conflicting name snapshot name.
1310 	 */
1311 	err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1312 	    snapname, 8, 1, &value);
1313 	if (err == 0)
1314 		return (EEXIST);
1315 	if (err != ENOENT)
1316 		return (err);
1317 
1318 	ds->ds_trysnap_txg = tx->tx_txg;
1319 	return (0);
1320 }
1321 
1322 void
1323 dsl_dataset_snapshot_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1324 {
1325 	objset_t *os = arg1;
1326 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
1327 	const char *snapname = arg2;
1328 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1329 	dmu_buf_t *dbuf;
1330 	dsl_dataset_phys_t *dsphys;
1331 	uint64_t dsobj;
1332 	objset_t *mos = dp->dp_meta_objset;
1333 	int err;
1334 
1335 	spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1336 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
1337 
1338 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1339 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1340 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1341 	dmu_buf_will_dirty(dbuf, tx);
1342 	dsphys = dbuf->db_data;
1343 	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
1344 	dsphys->ds_fsid_guid = unique_create();
1345 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
1346 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1347 	    sizeof (dsphys->ds_guid));
1348 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1349 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1350 	dsphys->ds_next_snap_obj = ds->ds_object;
1351 	dsphys->ds_num_children = 1;
1352 	dsphys->ds_creation_time = gethrestime_sec();
1353 	dsphys->ds_creation_txg = tx->tx_txg;
1354 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1355 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1356 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1357 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1358 	dsphys->ds_flags = ds->ds_phys->ds_flags;
1359 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1360 	dmu_buf_rele(dbuf, FTAG);
1361 
1362 	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
1363 	if (ds->ds_prev) {
1364 		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
1365 		    ds->ds_object ||
1366 		    ds->ds_prev->ds_phys->ds_num_children > 1);
1367 		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1368 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
1369 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1370 			    ds->ds_prev->ds_phys->ds_creation_txg);
1371 			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1372 		}
1373 	}
1374 
1375 	bplist_close(&ds->ds_deadlist);
1376 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1377 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, dsphys->ds_creation_txg);
1378 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1379 	ds->ds_phys->ds_prev_snap_txg = dsphys->ds_creation_txg;
1380 	ds->ds_phys->ds_unique_bytes = 0;
1381 	ds->ds_phys->ds_deadlist_obj =
1382 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1383 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1384 	    ds->ds_phys->ds_deadlist_obj));
1385 
1386 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1387 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1388 	    snapname, 8, 1, &dsobj, tx);
1389 	ASSERT(err == 0);
1390 
1391 	if (ds->ds_prev)
1392 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
1393 	VERIFY(0 == dsl_dataset_open_obj(dp,
1394 	    ds->ds_phys->ds_prev_snap_obj, snapname,
1395 	    DS_MODE_NONE, ds, &ds->ds_prev));
1396 }
1397 
1398 void
1399 dsl_dataset_sync(dsl_dataset_t *ds, dmu_tx_t *tx)
1400 {
1401 	ASSERT(dmu_tx_is_syncing(tx));
1402 	ASSERT(ds->ds_user_ptr != NULL);
1403 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1404 
1405 	dmu_objset_sync(ds->ds_user_ptr, tx);
1406 	dsl_dir_dirty(ds->ds_dir, tx);
1407 	bplist_close(&ds->ds_deadlist);
1408 
1409 	dmu_buf_rele(ds->ds_dbuf, ds);
1410 }
1411 
1412 void
1413 dsl_dataset_stats(dsl_dataset_t *ds, dmu_objset_stats_t *dds)
1414 {
1415 	/* fill in properties crap */
1416 	dsl_dir_stats(ds->ds_dir, dds);
1417 
1418 	if (ds->ds_phys->ds_num_children != 0) {
1419 		dds->dds_is_snapshot = TRUE;
1420 		dds->dds_num_clones = ds->ds_phys->ds_num_children - 1;
1421 	}
1422 
1423 	dds->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
1424 	dds->dds_last_txg = ds->ds_phys->ds_bp.blk_birth;
1425 
1426 	dds->dds_objects_used = ds->ds_phys->ds_bp.blk_fill;
1427 	dds->dds_objects_avail = DN_MAX_OBJECT - dds->dds_objects_used;
1428 
1429 	/* We override the dataset's creation time... they should be the same */
1430 	dds->dds_creation_time = ds->ds_phys->ds_creation_time;
1431 	dds->dds_creation_txg = ds->ds_phys->ds_creation_txg;
1432 	dds->dds_space_refd = ds->ds_phys->ds_used_bytes;
1433 	dds->dds_fsid_guid = ds->ds_phys->ds_fsid_guid;
1434 
1435 	if (ds->ds_phys->ds_next_snap_obj) {
1436 		/*
1437 		 * This is a snapshot; override the dd's space used with
1438 		 * our unique space
1439 		 */
1440 		dds->dds_space_used = ds->ds_phys->ds_unique_bytes;
1441 		dds->dds_compressed_bytes =
1442 		    ds->ds_phys->ds_compressed_bytes;
1443 		dds->dds_uncompressed_bytes =
1444 		    ds->ds_phys->ds_uncompressed_bytes;
1445 	}
1446 }
1447 
1448 dsl_pool_t *
1449 dsl_dataset_pool(dsl_dataset_t *ds)
1450 {
1451 	return (ds->ds_dir->dd_pool);
1452 }
1453 
1454 /* ARGSUSED */
1455 static int
1456 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
1457 {
1458 	dsl_dataset_t *ds = arg1;
1459 	char *newsnapname = arg2;
1460 	dsl_dir_t *dd = ds->ds_dir;
1461 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1462 	dsl_dataset_t *hds;
1463 	uint64_t val;
1464 	int err;
1465 
1466 	err = dsl_dataset_open_obj(dd->dd_pool,
1467 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds);
1468 	if (err)
1469 		return (err);
1470 
1471 	/* new name better not be in use */
1472 	err = zap_lookup(mos, hds->ds_phys->ds_snapnames_zapobj,
1473 	    newsnapname, 8, 1, &val);
1474 	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
1475 
1476 	if (err == 0)
1477 		err = EEXIST;
1478 	else if (err == ENOENT)
1479 		err = 0;
1480 	return (err);
1481 }
1482 
1483 static void
1484 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1485 {
1486 	dsl_dataset_t *ds = arg1;
1487 	char *newsnapname = arg2;
1488 	dsl_dir_t *dd = ds->ds_dir;
1489 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1490 	dsl_dataset_t *hds;
1491 	int err;
1492 
1493 	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
1494 
1495 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1496 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds));
1497 
1498 	VERIFY(0 == dsl_dataset_get_snapname(ds));
1499 	err = zap_remove(mos, hds->ds_phys->ds_snapnames_zapobj,
1500 	    ds->ds_snapname, tx);
1501 	ASSERT3U(err, ==, 0);
1502 	mutex_enter(&ds->ds_lock);
1503 	(void) strcpy(ds->ds_snapname, newsnapname);
1504 	mutex_exit(&ds->ds_lock);
1505 	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
1506 	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
1507 	ASSERT3U(err, ==, 0);
1508 
1509 	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
1510 }
1511 
1512 #pragma weak dmu_objset_rename = dsl_dataset_rename
1513 int
1514 dsl_dataset_rename(const char *oldname, const char *newname)
1515 {
1516 	dsl_dir_t *dd;
1517 	dsl_dataset_t *ds;
1518 	const char *tail;
1519 	int err;
1520 
1521 	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
1522 	if (err)
1523 		return (err);
1524 	if (tail == NULL) {
1525 		err = dsl_dir_rename(dd, newname);
1526 		dsl_dir_close(dd, FTAG);
1527 		return (err);
1528 	}
1529 	if (tail[0] != '@') {
1530 		/* the name ended in a nonexistant component */
1531 		dsl_dir_close(dd, FTAG);
1532 		return (ENOENT);
1533 	}
1534 
1535 	dsl_dir_close(dd, FTAG);
1536 
1537 	/* new name must be snapshot in same filesystem */
1538 	tail = strchr(newname, '@');
1539 	if (tail == NULL)
1540 		return (EINVAL);
1541 	tail++;
1542 	if (strncmp(oldname, newname, tail - newname) != 0)
1543 		return (EXDEV);
1544 
1545 	err = dsl_dataset_open(oldname,
1546 	    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &ds);
1547 	if (err)
1548 		return (err);
1549 
1550 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1551 	    dsl_dataset_snapshot_rename_check,
1552 	    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
1553 
1554 	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
1555 
1556 	return (err);
1557 }
1558 
1559 struct promotearg {
1560 	uint64_t used, comp, uncomp, unique;
1561 	uint64_t newnext_obj, snapnames_obj;
1562 };
1563 
1564 static int
1565 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
1566 {
1567 	dsl_dataset_t *hds = arg1;
1568 	struct promotearg *pa = arg2;
1569 	dsl_dir_t *dd = hds->ds_dir;
1570 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
1571 	dsl_dir_t *pdd = NULL;
1572 	dsl_dataset_t *ds = NULL;
1573 	dsl_dataset_t *pivot_ds = NULL;
1574 	dsl_dataset_t *newnext_ds = NULL;
1575 	int err;
1576 	char *name = NULL;
1577 	uint64_t itor = 0;
1578 	blkptr_t bp;
1579 
1580 	bzero(pa, sizeof (*pa));
1581 
1582 	/* Check that it is a clone */
1583 	if (dd->dd_phys->dd_clone_parent_obj == 0)
1584 		return (EINVAL);
1585 
1586 	/* Since this is so expensive, don't do the preliminary check */
1587 	if (!dmu_tx_is_syncing(tx))
1588 		return (0);
1589 
1590 	if (err = dsl_dataset_open_obj(dp,
1591 	    dd->dd_phys->dd_clone_parent_obj,
1592 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &pivot_ds))
1593 		goto out;
1594 	pdd = pivot_ds->ds_dir;
1595 
1596 	{
1597 		dsl_dataset_t *phds;
1598 		if (err = dsl_dataset_open_obj(dd->dd_pool,
1599 		    pdd->dd_phys->dd_head_dataset_obj,
1600 		    NULL, DS_MODE_NONE, FTAG, &phds))
1601 			goto out;
1602 		pa->snapnames_obj = phds->ds_phys->ds_snapnames_zapobj;
1603 		dsl_dataset_close(phds, DS_MODE_NONE, FTAG);
1604 	}
1605 
1606 	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE) {
1607 		err = EXDEV;
1608 		goto out;
1609 	}
1610 
1611 	/* find pivot point's new next ds */
1612 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, hds->ds_object,
1613 	    NULL, DS_MODE_NONE, FTAG, &newnext_ds));
1614 	while (newnext_ds->ds_phys->ds_prev_snap_obj != pivot_ds->ds_object) {
1615 		dsl_dataset_t *prev;
1616 
1617 		if (err = dsl_dataset_open_obj(dd->dd_pool,
1618 		    newnext_ds->ds_phys->ds_prev_snap_obj,
1619 		    NULL, DS_MODE_NONE, FTAG, &prev))
1620 			goto out;
1621 		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
1622 		newnext_ds = prev;
1623 	}
1624 	pa->newnext_obj = newnext_ds->ds_object;
1625 
1626 	/* compute pivot point's new unique space */
1627 	while ((err = bplist_iterate(&newnext_ds->ds_deadlist,
1628 	    &itor, &bp)) == 0) {
1629 		if (bp.blk_birth > pivot_ds->ds_phys->ds_prev_snap_txg)
1630 			pa->unique += bp_get_dasize(dd->dd_pool->dp_spa, &bp);
1631 	}
1632 	if (err != ENOENT)
1633 		goto out;
1634 
1635 	/* Walk the snapshots that we are moving */
1636 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
1637 	ds = pivot_ds;
1638 	/* CONSTCOND */
1639 	while (TRUE) {
1640 		uint64_t val, dlused, dlcomp, dluncomp;
1641 		dsl_dataset_t *prev;
1642 
1643 		/* Check that the snapshot name does not conflict */
1644 		dsl_dataset_name(ds, name);
1645 		err = zap_lookup(dd->dd_pool->dp_meta_objset,
1646 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
1647 		    8, 1, &val);
1648 		if (err != ENOENT) {
1649 			if (err == 0)
1650 				err = EEXIST;
1651 			goto out;
1652 		}
1653 
1654 		/*
1655 		 * compute space to transfer.  Each snapshot gave birth to:
1656 		 * (my used) - (prev's used) + (deadlist's used)
1657 		 */
1658 		pa->used += ds->ds_phys->ds_used_bytes;
1659 		pa->comp += ds->ds_phys->ds_compressed_bytes;
1660 		pa->uncomp += ds->ds_phys->ds_uncompressed_bytes;
1661 
1662 		/* If we reach the first snapshot, we're done. */
1663 		if (ds->ds_phys->ds_prev_snap_obj == 0)
1664 			break;
1665 
1666 		if (err = bplist_space(&ds->ds_deadlist,
1667 		    &dlused, &dlcomp, &dluncomp))
1668 			goto out;
1669 		if (err = dsl_dataset_open_obj(dd->dd_pool,
1670 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
1671 		    FTAG, &prev))
1672 			goto out;
1673 		pa->used += dlused - prev->ds_phys->ds_used_bytes;
1674 		pa->comp += dlcomp - prev->ds_phys->ds_compressed_bytes;
1675 		pa->uncomp += dluncomp - prev->ds_phys->ds_uncompressed_bytes;
1676 
1677 		/*
1678 		 * We could be a clone of a clone.  If we reach our
1679 		 * parent's branch point, we're done.
1680 		 */
1681 		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
1682 			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
1683 			break;
1684 		}
1685 		if (ds != pivot_ds)
1686 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1687 		ds = prev;
1688 	}
1689 
1690 	/* Check that there is enough space here */
1691 	err = dsl_dir_transfer_possible(pdd, dd, pa->used);
1692 
1693 out:
1694 	if (ds && ds != pivot_ds)
1695 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1696 	if (pivot_ds)
1697 		dsl_dataset_close(pivot_ds, DS_MODE_EXCLUSIVE, FTAG);
1698 	if (newnext_ds)
1699 		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
1700 	if (name)
1701 		kmem_free(name, MAXPATHLEN);
1702 	return (err);
1703 }
1704 
1705 static void
1706 dsl_dataset_promote_sync(void *arg1, void *arg2, dmu_tx_t *tx)
1707 {
1708 	dsl_dataset_t *hds = arg1;
1709 	struct promotearg *pa = arg2;
1710 	dsl_dir_t *dd = hds->ds_dir;
1711 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
1712 	dsl_dir_t *pdd = NULL;
1713 	dsl_dataset_t *ds, *pivot_ds;
1714 	char *name;
1715 
1716 	ASSERT(dd->dd_phys->dd_clone_parent_obj != 0);
1717 	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
1718 
1719 	VERIFY(0 == dsl_dataset_open_obj(dp,
1720 	    dd->dd_phys->dd_clone_parent_obj,
1721 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &pivot_ds));
1722 	/*
1723 	 * We need to explicitly open pdd, since pivot_ds's pdd will be
1724 	 * changing.
1725 	 */
1726 	VERIFY(0 == dsl_dir_open_obj(dp, pivot_ds->ds_dir->dd_object,
1727 	    NULL, FTAG, &pdd));
1728 
1729 	/* move snapshots to this dir */
1730 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
1731 	ds = pivot_ds;
1732 	/* CONSTCOND */
1733 	while (TRUE) {
1734 		dsl_dataset_t *prev;
1735 
1736 		/* move snap name entry */
1737 		dsl_dataset_name(ds, name);
1738 		VERIFY(0 == zap_remove(dp->dp_meta_objset,
1739 		    pa->snapnames_obj, ds->ds_snapname, tx));
1740 		VERIFY(0 == zap_add(dp->dp_meta_objset,
1741 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
1742 		    8, 1, &ds->ds_object, tx));
1743 
1744 		/* change containing dsl_dir */
1745 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
1746 		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, pdd->dd_object);
1747 		ds->ds_phys->ds_dir_obj = dd->dd_object;
1748 		ASSERT3P(ds->ds_dir, ==, pdd);
1749 		dsl_dir_close(ds->ds_dir, ds);
1750 		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
1751 		    NULL, ds, &ds->ds_dir));
1752 
1753 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
1754 
1755 		if (ds->ds_phys->ds_prev_snap_obj == 0)
1756 			break;
1757 
1758 		VERIFY(0 == dsl_dataset_open_obj(dp,
1759 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
1760 		    FTAG, &prev));
1761 
1762 		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
1763 			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
1764 			break;
1765 		}
1766 		if (ds != pivot_ds)
1767 			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1768 		ds = prev;
1769 	}
1770 	if (ds != pivot_ds)
1771 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1772 
1773 	/* change pivot point's next snap */
1774 	dmu_buf_will_dirty(pivot_ds->ds_dbuf, tx);
1775 	pivot_ds->ds_phys->ds_next_snap_obj = pa->newnext_obj;
1776 
1777 	/* change clone_parent-age */
1778 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
1779 	ASSERT3U(dd->dd_phys->dd_clone_parent_obj, ==, pivot_ds->ds_object);
1780 	dd->dd_phys->dd_clone_parent_obj = pdd->dd_phys->dd_clone_parent_obj;
1781 	dmu_buf_will_dirty(pdd->dd_dbuf, tx);
1782 	pdd->dd_phys->dd_clone_parent_obj = pivot_ds->ds_object;
1783 
1784 	/* change space accounting */
1785 	dsl_dir_diduse_space(pdd, -pa->used, -pa->comp, -pa->uncomp, tx);
1786 	dsl_dir_diduse_space(dd, pa->used, pa->comp, pa->uncomp, tx);
1787 	pivot_ds->ds_phys->ds_unique_bytes = pa->unique;
1788 
1789 	dsl_dir_close(pdd, FTAG);
1790 	dsl_dataset_close(pivot_ds, DS_MODE_EXCLUSIVE, FTAG);
1791 	kmem_free(name, MAXPATHLEN);
1792 }
1793 
1794 int
1795 dsl_dataset_promote(const char *name)
1796 {
1797 	dsl_dataset_t *ds;
1798 	int err;
1799 	dmu_object_info_t doi;
1800 	struct promotearg pa;
1801 
1802 	err = dsl_dataset_open(name, DS_MODE_NONE, FTAG, &ds);
1803 	if (err)
1804 		return (err);
1805 
1806 	err = dmu_object_info(ds->ds_dir->dd_pool->dp_meta_objset,
1807 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
1808 	if (err) {
1809 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1810 		return (err);
1811 	}
1812 
1813 	/*
1814 	 * Add in 128x the snapnames zapobj size, since we will be moving
1815 	 * a bunch of snapnames to the promoted ds, and dirtying their
1816 	 * bonus buffers.
1817 	 */
1818 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
1819 	    dsl_dataset_promote_check,
1820 	    dsl_dataset_promote_sync, ds, &pa, 2 + 2 * doi.doi_physical_blks);
1821 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1822 	return (err);
1823 }
1824