xref: /titanic_44/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision d485aa23b5e424dd136afdf657683389f93f72d6)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 #include <sys/dmu_objset.h>
30 #include <sys/dsl_dataset.h>
31 #include <sys/dsl_dir.h>
32 #include <sys/dmu_traverse.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/arc.h>
35 #include <sys/zio.h>
36 #include <sys/zap.h>
37 #include <sys/unique.h>
38 #include <sys/zfs_context.h>
39 
40 #define	DOS_REF_MAX	(1ULL << 62)
41 
42 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
43 
44 #define	BP_GET_UCSIZE(bp) \
45 	((BP_GET_LEVEL(bp) > 0 || dmu_ot[BP_GET_TYPE(bp)].ot_metadata) ? \
46 	BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp));
47 
48 /*
49  * We use weighted reference counts to express the various forms of exclusion
50  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
51  * is DOS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
52  * This makes the exclusion logic simple: the total refcnt for all opens cannot
53  * exceed DOS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
54  * weight (DOS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
55  * just over half of the refcnt space, so there can't be more than one, but it
56  * can peacefully coexist with any number of STANDARD opens.
57  */
58 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
59 	0,			/* DOS_MODE_NONE - invalid		*/
60 	1,			/* DOS_MODE_STANDARD - unlimited number	*/
61 	(DOS_REF_MAX >> 1) + 1,	/* DOS_MODE_PRIMARY - only one of these	*/
62 	DOS_REF_MAX		/* DOS_MODE_EXCLUSIVE - no other opens	*/
63 };
64 
65 
66 void
67 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
68 {
69 	int used = BP_GET_ASIZE(bp);
70 	int compressed = BP_GET_PSIZE(bp);
71 	int uncompressed = BP_GET_UCSIZE(bp);
72 
73 	dprintf_bp(bp, "born, ds=%p\n", ds);
74 
75 	ASSERT(dmu_tx_is_syncing(tx));
76 	/* It could have been compressed away to nothing */
77 	if (BP_IS_HOLE(bp))
78 		return;
79 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
80 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
81 	if (ds == NULL) {
82 		/*
83 		 * Account for the meta-objset space in its placeholder
84 		 * dsl_dir.
85 		 */
86 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
87 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
88 		    used, compressed, uncompressed, tx);
89 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
90 		return;
91 	}
92 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
93 	mutex_enter(&ds->ds_lock);
94 	ds->ds_phys->ds_used_bytes += used;
95 	ds->ds_phys->ds_compressed_bytes += compressed;
96 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
97 	ds->ds_phys->ds_unique_bytes += used;
98 	mutex_exit(&ds->ds_lock);
99 	dsl_dir_diduse_space(ds->ds_dir,
100 	    used, compressed, uncompressed, tx);
101 }
102 
103 void
104 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
105 {
106 	int used = BP_GET_ASIZE(bp);
107 	int compressed = BP_GET_PSIZE(bp);
108 	int uncompressed = BP_GET_UCSIZE(bp);
109 
110 	ASSERT(dmu_tx_is_syncing(tx));
111 	if (BP_IS_HOLE(bp))
112 		return;
113 
114 	ASSERT(used > 0);
115 	if (ds == NULL) {
116 		/*
117 		 * Account for the meta-objset space in its placeholder
118 		 * dataset.
119 		 */
120 		/* XXX this can fail, what do we do when it does? */
121 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
122 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
123 		bzero(bp, sizeof (blkptr_t));
124 
125 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
126 		    -used, -compressed, -uncompressed, tx);
127 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
128 		return;
129 	}
130 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
131 
132 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
133 
134 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
135 		dprintf_bp(bp, "freeing: %s", "");
136 		/* XXX check return code? */
137 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
138 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
139 
140 		mutex_enter(&ds->ds_lock);
141 		/* XXX unique_bytes is not accurate for head datasets */
142 		/* ASSERT3U(ds->ds_phys->ds_unique_bytes, >=, used); */
143 		ds->ds_phys->ds_unique_bytes -= used;
144 		mutex_exit(&ds->ds_lock);
145 		dsl_dir_diduse_space(ds->ds_dir,
146 		    -used, -compressed, -uncompressed, tx);
147 	} else {
148 		dprintf_bp(bp, "putting on dead list: %s", "");
149 		bplist_enqueue(&ds->ds_deadlist, bp, tx);
150 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
151 		if (ds->ds_phys->ds_prev_snap_obj != 0) {
152 			ASSERT3U(ds->ds_prev->ds_object, ==,
153 			    ds->ds_phys->ds_prev_snap_obj);
154 			ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
155 			if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
156 			    ds->ds_object &&
157 			    bp->blk_birth >
158 			    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
159 				dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
160 				mutex_enter(&ds->ds_prev->ds_lock);
161 				ds->ds_prev->ds_phys->ds_unique_bytes +=
162 				    used;
163 				mutex_exit(&ds->ds_prev->ds_lock);
164 			}
165 		}
166 	}
167 	bzero(bp, sizeof (blkptr_t));
168 	mutex_enter(&ds->ds_lock);
169 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
170 	ds->ds_phys->ds_used_bytes -= used;
171 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
172 	ds->ds_phys->ds_compressed_bytes -= compressed;
173 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
174 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
175 	mutex_exit(&ds->ds_lock);
176 }
177 
178 int
179 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth, dmu_tx_t *tx)
180 {
181 	uint64_t prev_snap_txg;
182 	dsl_dir_t *dd;
183 	/* ASSERT that it is not a snapshot */
184 	if (ds == NULL)
185 		return (TRUE);
186 	/*
187 	 * The snapshot creation could fail, but that would cause an
188 	 * incorrect FALSE return, which would only result in an
189 	 * overestimation of the amount of space that an operation would
190 	 * consume, which is OK.
191 	 *
192 	 * There's also a small window where we could miss a pending
193 	 * snapshot, because we could set the sync task in the quiescing
194 	 * phase.  So this should only be used as a guess.
195 	 */
196 	dd = ds->ds_dir;
197 	mutex_enter(&dd->dd_lock);
198 	if (dd->dd_sync_func == dsl_dataset_snapshot_sync &&
199 	    dd->dd_sync_txg < tx->tx_txg)
200 		prev_snap_txg = dd->dd_sync_txg;
201 	else
202 		prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
203 	mutex_exit(&dd->dd_lock);
204 	return (blk_birth > prev_snap_txg);
205 }
206 
207 /* ARGSUSED */
208 static void
209 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
210 {
211 	dsl_dataset_t *ds = dsv;
212 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
213 
214 	/* open_refcount == DOS_REF_MAX when deleting */
215 	ASSERT(ds->ds_open_refcount == 0 ||
216 	    ds->ds_open_refcount == DOS_REF_MAX);
217 
218 	dprintf_ds(ds, "evicting %s\n", "");
219 
220 	unique_remove(ds->ds_phys->ds_fsid_guid);
221 
222 	if (ds->ds_user_ptr != NULL)
223 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
224 
225 	if (ds->ds_prev) {
226 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
227 		ds->ds_prev = NULL;
228 	}
229 
230 	bplist_close(&ds->ds_deadlist);
231 	dsl_dir_close(ds->ds_dir, ds);
232 
233 	if (list_link_active(&ds->ds_synced_link))
234 		list_remove(&dp->dp_synced_objsets, ds);
235 
236 	kmem_free(ds, sizeof (dsl_dataset_t));
237 }
238 
239 static void
240 dsl_dataset_get_snapname(dsl_dataset_t *ds)
241 {
242 	dsl_dataset_phys_t *headphys;
243 	int err;
244 	dmu_buf_t *headdbuf;
245 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
246 	objset_t *mos = dp->dp_meta_objset;
247 
248 	if (ds->ds_snapname[0])
249 		return;
250 	if (ds->ds_phys->ds_next_snap_obj == 0)
251 		return;
252 
253 	headdbuf = dmu_bonus_hold_tag(mos,
254 	    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG);
255 	dmu_buf_read(headdbuf);
256 	headphys = headdbuf->db_data;
257 	err = zap_value_search(dp->dp_meta_objset,
258 	    headphys->ds_snapnames_zapobj, ds->ds_object, ds->ds_snapname);
259 	ASSERT(err == 0);
260 	dmu_buf_rele_tag(headdbuf, FTAG);
261 }
262 
263 dsl_dataset_t *
264 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
265     int mode, void *tag)
266 {
267 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
268 	objset_t *mos = dp->dp_meta_objset;
269 	dmu_buf_t *dbuf;
270 	dsl_dataset_t *ds;
271 
272 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
273 	    dsl_pool_sync_context(dp));
274 
275 	dbuf = dmu_bonus_hold_tag(mos, dsobj, tag);
276 	dmu_buf_read(dbuf);
277 	ds = dmu_buf_get_user(dbuf);
278 	if (ds == NULL) {
279 		dsl_dataset_t *winner;
280 
281 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
282 		ds->ds_dbuf = dbuf;
283 		ds->ds_object = dsobj;
284 		ds->ds_phys = dbuf->db_data;
285 		ds->ds_dir = dsl_dir_open_obj(dp,
286 		    ds->ds_phys->ds_dir_obj, NULL, ds);
287 
288 		bplist_open(&ds->ds_deadlist,
289 		    mos, ds->ds_phys->ds_deadlist_obj);
290 
291 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
292 			ds->ds_snapname[0] = '\0';
293 			if (ds->ds_phys->ds_prev_snap_obj) {
294 				ds->ds_prev =
295 				    dsl_dataset_open_obj(dp,
296 				    ds->ds_phys->ds_prev_snap_obj, NULL,
297 				    DS_MODE_NONE, ds);
298 			}
299 		} else {
300 			if (snapname) {
301 #ifdef ZFS_DEBUG
302 				dsl_dataset_phys_t *headphys;
303 				int err;
304 				dmu_buf_t *headdbuf = dmu_bonus_hold_tag(mos,
305 				    ds->ds_dir->dd_phys->
306 				    dd_head_dataset_obj, FTAG);
307 				dmu_buf_read(headdbuf);
308 				headphys = headdbuf->db_data;
309 				uint64_t foundobj;
310 				err = zap_lookup(dp->dp_meta_objset,
311 				    headphys->ds_snapnames_zapobj,
312 				    snapname, sizeof (foundobj), 1, &foundobj);
313 				ASSERT3U(err, ==, 0);
314 				ASSERT3U(foundobj, ==, dsobj);
315 				dmu_buf_rele_tag(headdbuf, FTAG);
316 #endif
317 				(void) strcat(ds->ds_snapname, snapname);
318 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
319 				dsl_dataset_get_snapname(ds);
320 			}
321 		}
322 
323 		winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
324 		    dsl_dataset_evict);
325 		if (winner) {
326 			bplist_close(&ds->ds_deadlist);
327 			if (ds->ds_prev) {
328 				dsl_dataset_close(ds->ds_prev,
329 				    DS_MODE_NONE, ds);
330 			}
331 			dsl_dir_close(ds->ds_dir, ds);
332 			kmem_free(ds, sizeof (dsl_dataset_t));
333 			ds = winner;
334 		} else {
335 			uint64_t new =
336 			    unique_insert(ds->ds_phys->ds_fsid_guid);
337 			if (new != ds->ds_phys->ds_fsid_guid) {
338 				/* XXX it won't necessarily be synced... */
339 				ds->ds_phys->ds_fsid_guid = new;
340 			}
341 		}
342 	}
343 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
344 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
345 
346 	mutex_enter(&ds->ds_lock);
347 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
348 	    ds->ds_phys->ds_restoring && !DS_MODE_IS_RESTORE(mode)) ||
349 	    (ds->ds_open_refcount + weight > DOS_REF_MAX)) {
350 		mutex_exit(&ds->ds_lock);
351 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
352 		return (NULL);
353 	}
354 	ds->ds_open_refcount += weight;
355 	mutex_exit(&ds->ds_lock);
356 
357 	return (ds);
358 }
359 
360 int
361 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
362     void *tag, dsl_dataset_t **dsp)
363 {
364 	dsl_dir_t *dd;
365 	dsl_pool_t *dp;
366 	const char *tail;
367 	uint64_t obj;
368 	dsl_dataset_t *ds = NULL;
369 	int err = 0;
370 
371 	dd = dsl_dir_open_spa(spa, name, FTAG, &tail);
372 	if (dd == NULL)
373 		return (ENOENT);
374 
375 	dp = dd->dd_pool;
376 	obj = dd->dd_phys->dd_head_dataset_obj;
377 	rw_enter(&dp->dp_config_rwlock, RW_READER);
378 	if (obj == 0) {
379 		/* A dataset with no associated objset */
380 		err = ENOENT;
381 		goto out;
382 	}
383 
384 	if (tail != NULL) {
385 		objset_t *mos = dp->dp_meta_objset;
386 
387 		ds = dsl_dataset_open_obj(dp, obj, NULL, DS_MODE_NONE, tag);
388 		obj = ds->ds_phys->ds_snapnames_zapobj;
389 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
390 		ds = NULL;
391 
392 		if (tail[0] != '@') {
393 			err = ENOENT;
394 			goto out;
395 		}
396 		tail++;
397 
398 		/* Look for a snapshot */
399 		if (!DS_MODE_IS_READONLY(mode)) {
400 			err = EROFS;
401 			goto out;
402 		}
403 		dprintf("looking for snapshot '%s'\n", tail);
404 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
405 		if (err)
406 			goto out;
407 	}
408 	ds = dsl_dataset_open_obj(dp, obj, tail, mode, tag);
409 	if (ds == NULL)
410 		err = EBUSY;
411 
412 out:
413 	rw_exit(&dp->dp_config_rwlock);
414 	dsl_dir_close(dd, FTAG);
415 
416 	ASSERT3U((err == 0), ==, (ds != NULL));
417 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
418 
419 	*dsp = ds;
420 	return (err);
421 }
422 
423 int
424 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
425 {
426 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
427 }
428 
429 void
430 dsl_dataset_name(dsl_dataset_t *ds, char *name)
431 {
432 	if (ds == NULL) {
433 		(void) strcpy(name, "mos");
434 	} else {
435 		dsl_dir_name(ds->ds_dir, name);
436 		dsl_dataset_get_snapname(ds);
437 		if (ds->ds_snapname[0]) {
438 			(void) strcat(name, "@");
439 			if (!MUTEX_HELD(&ds->ds_lock)) {
440 				/*
441 				 * We use a "recursive" mutex so that we
442 				 * can call dprintf_ds() with ds_lock held.
443 				 */
444 				mutex_enter(&ds->ds_lock);
445 				(void) strcat(name, ds->ds_snapname);
446 				mutex_exit(&ds->ds_lock);
447 			} else {
448 				(void) strcat(name, ds->ds_snapname);
449 			}
450 		}
451 	}
452 }
453 
454 void
455 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
456 {
457 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
458 	mutex_enter(&ds->ds_lock);
459 	ASSERT3U(ds->ds_open_refcount, >=, weight);
460 	ds->ds_open_refcount -= weight;
461 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
462 	    mode, ds->ds_open_refcount);
463 	mutex_exit(&ds->ds_lock);
464 
465 	dmu_buf_rele_tag(ds->ds_dbuf, tag);
466 }
467 
468 void
469 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
470 {
471 	objset_t *mos = dp->dp_meta_objset;
472 	dmu_buf_t *dbuf;
473 	dsl_dataset_phys_t *dsphys;
474 	dsl_dataset_t *ds;
475 	uint64_t dsobj;
476 	dsl_dir_t *dd;
477 
478 	dsl_dir_create_root(mos, ddobjp, tx);
479 	dd = dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG);
480 	ASSERT(dd != NULL);
481 
482 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
483 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
484 	dbuf = dmu_bonus_hold(mos, dsobj);
485 	dmu_buf_will_dirty(dbuf, tx);
486 	dsphys = dbuf->db_data;
487 	dsphys->ds_dir_obj = dd->dd_object;
488 	dsphys->ds_fsid_guid = unique_create();
489 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
490 	    sizeof (dsphys->ds_guid));
491 	dsphys->ds_snapnames_zapobj =
492 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
493 	dsphys->ds_creation_time = gethrestime_sec();
494 	dsphys->ds_creation_txg = tx->tx_txg;
495 	dsphys->ds_deadlist_obj =
496 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
497 	dmu_buf_rele(dbuf);
498 
499 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
500 	dd->dd_phys->dd_head_dataset_obj = dsobj;
501 	dsl_dir_close(dd, FTAG);
502 
503 	ds = dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG);
504 	(void) dmu_objset_create_impl(dp->dp_spa, ds, DMU_OST_ZFS, tx);
505 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
506 }
507 
508 int
509 dsl_dataset_create_sync(dsl_dir_t *pds, const char *fullname,
510     const char *lastname, dsl_dataset_t *clone_parent, dmu_tx_t *tx)
511 {
512 	int err;
513 	dsl_pool_t *dp = pds->dd_pool;
514 	dmu_buf_t *dbuf;
515 	dsl_dataset_phys_t *dsphys;
516 	uint64_t dsobj;
517 	objset_t *mos = dp->dp_meta_objset;
518 	dsl_dir_t *dd;
519 
520 	if (clone_parent != NULL) {
521 		/*
522 		 * You can't clone across pools.
523 		 */
524 		if (clone_parent->ds_dir->dd_pool != dp)
525 			return (EXDEV);
526 
527 		/*
528 		 * You can only clone snapshots, not the head datasets.
529 		 */
530 		if (clone_parent->ds_phys->ds_num_children == 0)
531 			return (EINVAL);
532 	}
533 
534 	ASSERT(lastname[0] != '@');
535 	ASSERT(dmu_tx_is_syncing(tx));
536 
537 	err = dsl_dir_create_sync(pds, lastname, tx);
538 	if (err)
539 		return (err);
540 	dd = dsl_dir_open_spa(dp->dp_spa, fullname, FTAG, NULL);
541 	ASSERT(dd != NULL);
542 
543 	/* This is the point of no (unsuccessful) return */
544 
545 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
546 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
547 	dbuf = dmu_bonus_hold(mos, dsobj);
548 	dmu_buf_will_dirty(dbuf, tx);
549 	dsphys = dbuf->db_data;
550 	dsphys->ds_dir_obj = dd->dd_object;
551 	dsphys->ds_fsid_guid = unique_create();
552 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
553 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
554 	    sizeof (dsphys->ds_guid));
555 	dsphys->ds_snapnames_zapobj =
556 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
557 	dsphys->ds_creation_time = gethrestime_sec();
558 	dsphys->ds_creation_txg = tx->tx_txg;
559 	dsphys->ds_deadlist_obj =
560 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
561 	if (clone_parent) {
562 		dsphys->ds_prev_snap_obj = clone_parent->ds_object;
563 		dsphys->ds_prev_snap_txg =
564 		    clone_parent->ds_phys->ds_creation_txg;
565 		dsphys->ds_used_bytes =
566 		    clone_parent->ds_phys->ds_used_bytes;
567 		dsphys->ds_compressed_bytes =
568 		    clone_parent->ds_phys->ds_compressed_bytes;
569 		dsphys->ds_uncompressed_bytes =
570 		    clone_parent->ds_phys->ds_uncompressed_bytes;
571 		dsphys->ds_bp = clone_parent->ds_phys->ds_bp;
572 
573 		dmu_buf_will_dirty(clone_parent->ds_dbuf, tx);
574 		clone_parent->ds_phys->ds_num_children++;
575 
576 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
577 		dd->dd_phys->dd_clone_parent_obj = clone_parent->ds_object;
578 	}
579 	dmu_buf_rele(dbuf);
580 
581 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
582 	dd->dd_phys->dd_head_dataset_obj = dsobj;
583 	dsl_dir_close(dd, FTAG);
584 
585 	return (0);
586 }
587 
588 
589 int
590 dsl_dataset_destroy(const char *name)
591 {
592 	int err;
593 	dsl_pool_t *dp;
594 	dsl_dir_t *dd;
595 	const char *tail;
596 
597 	dd = dsl_dir_open(name, FTAG, &tail);
598 	if (dd == NULL)
599 		return (ENOENT);
600 
601 	dp = dd->dd_pool;
602 	if (tail != NULL) {
603 		if (tail[0] != '@') {
604 			dsl_dir_close(dd, FTAG);
605 			return (ENOENT);
606 		}
607 		tail++;
608 		/* Just blow away the snapshot */
609 		do {
610 			txg_wait_synced(dp, 0);
611 			err = dsl_dir_sync_task(dd,
612 			    dsl_dataset_destroy_sync, (void*)tail, 0);
613 		} while (err == EAGAIN);
614 		dsl_dir_close(dd, FTAG);
615 	} else {
616 		char buf[MAXNAMELEN];
617 		char *cp;
618 
619 		dsl_dir_t *pds;
620 		if (dd->dd_phys->dd_parent_obj == 0) {
621 			dsl_dir_close(dd, FTAG);
622 			return (EINVAL);
623 		}
624 		/*
625 		 * Make sure it's not dirty before we destroy it.
626 		 */
627 		txg_wait_synced(dd->dd_pool, 0);
628 		/*
629 		 * Blow away the dsl_dir + head dataset.
630 		 * dsl_dir_destroy_sync() will call
631 		 * dsl_dataset_destroy_sync() to destroy the head dataset.
632 		 */
633 		rw_enter(&dp->dp_config_rwlock, RW_READER);
634 		pds = dsl_dir_open_obj(dd->dd_pool,
635 		    dd->dd_phys->dd_parent_obj, NULL, FTAG);
636 		dsl_dir_close(dd, FTAG);
637 		rw_exit(&dp->dp_config_rwlock);
638 
639 		(void) strcpy(buf, name);
640 		cp = strrchr(buf, '/') + 1;
641 		ASSERT(cp[0] != '\0');
642 		do {
643 			txg_wait_synced(dp, 0);
644 			err = dsl_dir_sync_task(pds,
645 			    dsl_dir_destroy_sync, cp, 0);
646 		} while (err == EAGAIN);
647 		dsl_dir_close(pds, FTAG);
648 	}
649 
650 	return (err);
651 }
652 
653 int
654 dsl_dataset_rollback(const char *name)
655 {
656 	int err;
657 	dsl_dir_t *dd;
658 	const char *tail;
659 
660 	dd = dsl_dir_open(name, FTAG, &tail);
661 	if (dd == NULL)
662 		return (ENOENT);
663 
664 	if (tail != NULL) {
665 		dsl_dir_close(dd, FTAG);
666 		return (EINVAL);
667 	}
668 	do {
669 		txg_wait_synced(dd->dd_pool, 0);
670 		err = dsl_dir_sync_task(dd,
671 		    dsl_dataset_rollback_sync, NULL, 0);
672 	} while (err == EAGAIN);
673 	dsl_dir_close(dd, FTAG);
674 
675 	return (err);
676 }
677 
678 void *
679 dsl_dataset_set_user_ptr(dsl_dataset_t *ds,
680     void *p, dsl_dataset_evict_func_t func)
681 {
682 	void *old;
683 
684 	mutex_enter(&ds->ds_lock);
685 	old = ds->ds_user_ptr;
686 	if (old == NULL) {
687 		ds->ds_user_ptr = p;
688 		ds->ds_user_evict_func = func;
689 	}
690 	mutex_exit(&ds->ds_lock);
691 	return (old);
692 }
693 
694 void *
695 dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
696 {
697 	return (ds->ds_user_ptr);
698 }
699 
700 
701 void
702 dsl_dataset_get_blkptr(dsl_dataset_t *ds, blkptr_t *bp)
703 {
704 	*bp = ds->ds_phys->ds_bp;
705 }
706 
707 void
708 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
709 {
710 	ASSERT(dmu_tx_is_syncing(tx));
711 	/* If it's the meta-objset, set dp_meta_rootbp */
712 	if (ds == NULL) {
713 		tx->tx_pool->dp_meta_rootbp = *bp;
714 	} else {
715 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
716 		ds->ds_phys->ds_bp = *bp;
717 	}
718 }
719 
720 spa_t *
721 dsl_dataset_get_spa(dsl_dataset_t *ds)
722 {
723 	return (ds->ds_dir->dd_pool->dp_spa);
724 }
725 
726 void
727 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
728 {
729 	dsl_pool_t *dp;
730 
731 	if (ds == NULL) /* this is the meta-objset */
732 		return;
733 
734 	ASSERT(ds->ds_user_ptr != NULL);
735 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
736 
737 	dp = ds->ds_dir->dd_pool;
738 
739 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
740 		/* up the hold count until we can be written out */
741 		dmu_buf_add_ref(ds->ds_dbuf, ds);
742 	}
743 }
744 
745 struct killarg {
746 	uint64_t *usedp;
747 	uint64_t *compressedp;
748 	uint64_t *uncompressedp;
749 	zio_t *zio;
750 	dmu_tx_t *tx;
751 };
752 
753 static int
754 kill_blkptr(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
755 {
756 	struct killarg *ka = arg;
757 	blkptr_t *bp = &bc->bc_blkptr;
758 
759 	ASSERT3U(bc->bc_errno, ==, 0);
760 
761 	/*
762 	 * Since this callback is not called concurrently, no lock is
763 	 * needed on the accounting values.
764 	 */
765 	*ka->usedp += BP_GET_ASIZE(bp);
766 	*ka->compressedp += BP_GET_PSIZE(bp);
767 	*ka->uncompressedp += BP_GET_UCSIZE(bp);
768 	/* XXX check for EIO? */
769 	(void) arc_free(ka->zio, spa, ka->tx->tx_txg, bp, NULL, NULL,
770 	    ARC_NOWAIT);
771 	return (0);
772 }
773 
774 /* ARGSUSED */
775 int
776 dsl_dataset_rollback_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
777 {
778 	objset_t *mos = dd->dd_pool->dp_meta_objset;
779 	dsl_dataset_t *ds;
780 
781 	if (dd->dd_phys->dd_head_dataset_obj == 0)
782 		return (EINVAL);
783 	ds = dsl_dataset_open_obj(dd->dd_pool,
784 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG);
785 
786 	if (ds->ds_phys->ds_prev_snap_txg == 0) {
787 		/*
788 		 * There's no previous snapshot.  I suppose we could
789 		 * roll it back to being empty (and re-initialize the
790 		 * upper (ZPL) layer).  But for now there's no way to do
791 		 * this via the user interface.
792 		 */
793 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
794 		return (EINVAL);
795 	}
796 
797 	mutex_enter(&ds->ds_lock);
798 	if (ds->ds_open_refcount > 0) {
799 		mutex_exit(&ds->ds_lock);
800 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
801 		return (EBUSY);
802 	}
803 
804 	/*
805 	 * If we made changes this txg, traverse_dsl_dataset won't find
806 	 * them.  Try again.
807 	 */
808 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
809 		mutex_exit(&ds->ds_lock);
810 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
811 		return (EAGAIN);
812 	}
813 
814 	/* THE POINT OF NO (unsuccessful) RETURN */
815 	ds->ds_open_refcount = DOS_REF_MAX;
816 	mutex_exit(&ds->ds_lock);
817 
818 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
819 
820 	/* Zero out the deadlist. */
821 	dprintf("old deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
822 	bplist_close(&ds->ds_deadlist);
823 	bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
824 	ds->ds_phys->ds_deadlist_obj =
825 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
826 	bplist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
827 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
828 
829 	{
830 		/* Free blkptrs that we gave birth to */
831 		zio_t *zio;
832 		uint64_t used = 0, compressed = 0, uncompressed = 0;
833 		struct killarg ka;
834 
835 		zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL,
836 		    ZIO_FLAG_MUSTSUCCEED);
837 		ka.usedp = &used;
838 		ka.compressedp = &compressed;
839 		ka.uncompressedp = &uncompressed;
840 		ka.zio = zio;
841 		ka.tx = tx;
842 		(void) traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
843 		    ADVANCE_POST, kill_blkptr, &ka);
844 		(void) zio_wait(zio);
845 
846 		dsl_dir_diduse_space(dd,
847 		    -used, -compressed, -uncompressed, tx);
848 	}
849 
850 	/* Change our contents to that of the prev snapshot (finally!) */
851 	ASSERT3U(ds->ds_prev->ds_object, ==, ds->ds_phys->ds_prev_snap_obj);
852 	ds->ds_phys->ds_bp = ds->ds_prev->ds_phys->ds_bp;
853 	ds->ds_phys->ds_used_bytes = ds->ds_prev->ds_phys->ds_used_bytes;
854 	ds->ds_phys->ds_compressed_bytes =
855 	    ds->ds_prev->ds_phys->ds_compressed_bytes;
856 	ds->ds_phys->ds_uncompressed_bytes =
857 	    ds->ds_prev->ds_phys->ds_uncompressed_bytes;
858 	ds->ds_phys->ds_restoring = ds->ds_prev->ds_phys->ds_restoring;
859 	ds->ds_phys->ds_unique_bytes = 0;
860 
861 	dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
862 	ds->ds_prev->ds_phys->ds_unique_bytes = 0;
863 
864 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
865 	ds->ds_open_refcount = 0;
866 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
867 
868 	return (0);
869 }
870 
871 int
872 dsl_dataset_destroy_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
873 {
874 	const char *snapname = arg;
875 	uint64_t used = 0, compressed = 0, uncompressed = 0;
876 	blkptr_t bp;
877 	zio_t *zio;
878 	int err;
879 	int after_branch_point = FALSE;
880 	int drop_lock = FALSE;
881 	dsl_pool_t *dp = dd->dd_pool;
882 	objset_t *mos = dp->dp_meta_objset;
883 	dsl_dataset_t *ds, *ds_prev = NULL;
884 	uint64_t obj;
885 
886 	if (dd->dd_phys->dd_head_dataset_obj == 0)
887 		return (EINVAL);
888 
889 	if (!RW_WRITE_HELD(&dp->dp_config_rwlock)) {
890 		rw_enter(&dp->dp_config_rwlock, RW_WRITER);
891 		drop_lock = TRUE;
892 	}
893 
894 	ds = dsl_dataset_open_obj(dd->dd_pool,
895 	    dd->dd_phys->dd_head_dataset_obj, NULL,
896 	    snapname ? DS_MODE_NONE : DS_MODE_EXCLUSIVE, FTAG);
897 
898 	if (snapname) {
899 		err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
900 		    snapname, 8, 1, &obj);
901 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
902 		if (err) {
903 			if (drop_lock)
904 				rw_exit(&dp->dp_config_rwlock);
905 			return (err);
906 		}
907 
908 		ds = dsl_dataset_open_obj(dd->dd_pool, obj, NULL,
909 		    DS_MODE_EXCLUSIVE, FTAG);
910 	}
911 	if (ds == NULL) {
912 		if (drop_lock)
913 			rw_exit(&dp->dp_config_rwlock);
914 		return (EBUSY);
915 	}
916 
917 	obj = ds->ds_object;
918 
919 	/* Can't delete a branch point. */
920 	if (ds->ds_phys->ds_num_children > 1) {
921 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
922 		if (drop_lock)
923 			rw_exit(&dp->dp_config_rwlock);
924 		return (EINVAL);
925 	}
926 
927 	/*
928 	 * Can't delete a head dataset if there are snapshots of it.
929 	 * (Except if the only snapshots are from the branch we cloned
930 	 * from.)
931 	 */
932 	if (ds->ds_prev != NULL &&
933 	    ds->ds_prev->ds_phys->ds_next_snap_obj == obj) {
934 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
935 		if (drop_lock)
936 			rw_exit(&dp->dp_config_rwlock);
937 		return (EINVAL);
938 	}
939 
940 	/*
941 	 * If we made changes this txg, traverse_dsl_dataset won't find
942 	 * them.  Try again.
943 	 */
944 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
945 		mutex_exit(&ds->ds_lock);
946 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
947 		if (drop_lock)
948 			rw_exit(&dp->dp_config_rwlock);
949 		return (EAGAIN);
950 	}
951 
952 	/* THE POINT OF NO (unsuccessful) RETURN */
953 
954 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
955 		if (ds->ds_prev) {
956 			ds_prev = ds->ds_prev;
957 		} else {
958 			ds_prev = dsl_dataset_open_obj(dd->dd_pool,
959 			    ds->ds_phys->ds_prev_snap_obj, NULL,
960 			    DS_MODE_NONE, FTAG);
961 		}
962 		after_branch_point =
963 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
964 
965 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
966 		if (after_branch_point &&
967 		    ds->ds_phys->ds_next_snap_obj == 0) {
968 			/* This clone is toast. */
969 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
970 			ds_prev->ds_phys->ds_num_children--;
971 		} else if (!after_branch_point) {
972 			ds_prev->ds_phys->ds_next_snap_obj =
973 			    ds->ds_phys->ds_next_snap_obj;
974 		}
975 	}
976 
977 	ASSERT3P(tx->tx_pool, ==, dd->dd_pool);
978 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
979 
980 	if (ds->ds_phys->ds_next_snap_obj != 0) {
981 		dsl_dataset_t *ds_next;
982 		uint64_t itor = 0;
983 
984 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
985 
986 		ds_next = dsl_dataset_open_obj(dd->dd_pool,
987 		    ds->ds_phys->ds_next_snap_obj, NULL, DS_MODE_NONE, FTAG);
988 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
989 
990 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
991 		ds_next->ds_phys->ds_prev_snap_obj =
992 		    ds->ds_phys->ds_prev_snap_obj;
993 		ds_next->ds_phys->ds_prev_snap_txg =
994 		    ds->ds_phys->ds_prev_snap_txg;
995 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
996 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
997 
998 		/*
999 		 * Transfer to our deadlist (which will become next's
1000 		 * new deadlist) any entries from next's current
1001 		 * deadlist which were born before prev, and free the
1002 		 * other entries.
1003 		 *
1004 		 * XXX we're doing this long task with the config lock held
1005 		 */
1006 		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
1007 		    &bp) == 0) {
1008 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1009 				bplist_enqueue(&ds->ds_deadlist, &bp, tx);
1010 				if (ds_prev && !after_branch_point &&
1011 				    bp.blk_birth >
1012 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1013 					ds_prev->ds_phys->ds_unique_bytes +=
1014 					    BP_GET_ASIZE(&bp);
1015 				}
1016 			} else {
1017 				used += BP_GET_ASIZE(&bp);
1018 				compressed += BP_GET_PSIZE(&bp);
1019 				uncompressed += BP_GET_UCSIZE(&bp);
1020 				/* XXX check return value? */
1021 				(void) arc_free(zio, dp->dp_spa, tx->tx_txg,
1022 				    &bp, NULL, NULL, ARC_NOWAIT);
1023 			}
1024 		}
1025 
1026 		/* free next's deadlist */
1027 		bplist_close(&ds_next->ds_deadlist);
1028 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1029 
1030 		/* set next's deadlist to our deadlist */
1031 		ds_next->ds_phys->ds_deadlist_obj =
1032 		    ds->ds_phys->ds_deadlist_obj;
1033 		bplist_open(&ds_next->ds_deadlist, mos,
1034 		    ds_next->ds_phys->ds_deadlist_obj);
1035 		ds->ds_phys->ds_deadlist_obj = 0;
1036 
1037 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1038 			/*
1039 			 * Update next's unique to include blocks which
1040 			 * were previously shared by only this snapshot
1041 			 * and it.  Those blocks will be born after the
1042 			 * prev snap and before this snap, and will have
1043 			 * died after the next snap and before the one
1044 			 * after that (ie. be on the snap after next's
1045 			 * deadlist).
1046 			 *
1047 			 * XXX we're doing this long task with the
1048 			 * config lock held
1049 			 */
1050 			dsl_dataset_t *ds_after_next;
1051 
1052 			ds_after_next = dsl_dataset_open_obj(dd->dd_pool,
1053 			    ds_next->ds_phys->ds_next_snap_obj, NULL,
1054 			    DS_MODE_NONE, FTAG);
1055 			itor = 0;
1056 			while (bplist_iterate(&ds_after_next->ds_deadlist,
1057 			    &itor, &bp) == 0) {
1058 				if (bp.blk_birth >
1059 				    ds->ds_phys->ds_prev_snap_txg &&
1060 				    bp.blk_birth <=
1061 				    ds->ds_phys->ds_creation_txg) {
1062 					ds_next->ds_phys->ds_unique_bytes +=
1063 					    BP_GET_ASIZE(&bp);
1064 				}
1065 			}
1066 
1067 			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
1068 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1069 		} else {
1070 			/*
1071 			 * It would be nice to update the head dataset's
1072 			 * unique.  To do so we would have to traverse
1073 			 * it for blocks born after ds_prev, which is
1074 			 * pretty expensive just to maintain something
1075 			 * for debugging purposes.
1076 			 */
1077 			ASSERT3P(ds_next->ds_prev, ==, ds);
1078 			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
1079 			    ds_next);
1080 			if (ds_prev) {
1081 				ds_next->ds_prev = dsl_dataset_open_obj(
1082 				    dd->dd_pool, ds->ds_phys->ds_prev_snap_obj,
1083 				    NULL, DS_MODE_NONE, ds_next);
1084 			} else {
1085 				ds_next->ds_prev = NULL;
1086 			}
1087 		}
1088 		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
1089 
1090 		/*
1091 		 * NB: unique_bytes is not accurate for head objsets
1092 		 * because we don't update it when we delete the most
1093 		 * recent snapshot -- see above comment.
1094 		 */
1095 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1096 	} else {
1097 		/*
1098 		 * There's no next snapshot, so this is a head dataset.
1099 		 * Destroy the deadlist.  Unless it's a clone, the
1100 		 * deadlist should be empty.  (If it's a clone, it's
1101 		 * safe to ignore the deadlist contents.)
1102 		 */
1103 		struct killarg ka;
1104 
1105 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1106 		bplist_close(&ds->ds_deadlist);
1107 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1108 		ds->ds_phys->ds_deadlist_obj = 0;
1109 
1110 		/*
1111 		 * Free everything that we point to (that's born after
1112 		 * the previous snapshot, if we are a clone)
1113 		 *
1114 		 * XXX we're doing this long task with the config lock held
1115 		 */
1116 		ka.usedp = &used;
1117 		ka.compressedp = &compressed;
1118 		ka.uncompressedp = &uncompressed;
1119 		ka.zio = zio;
1120 		ka.tx = tx;
1121 		err = traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1122 		    ADVANCE_POST, kill_blkptr, &ka);
1123 		ASSERT3U(err, ==, 0);
1124 	}
1125 
1126 	err = zio_wait(zio);
1127 	ASSERT3U(err, ==, 0);
1128 
1129 	dsl_dir_diduse_space(dd, -used, -compressed, -uncompressed, tx);
1130 
1131 	if (ds->ds_phys->ds_snapnames_zapobj) {
1132 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1133 		ASSERT(err == 0);
1134 	}
1135 
1136 	if (dd->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1137 		/* Erase the link in the dataset */
1138 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
1139 		dd->dd_phys->dd_head_dataset_obj = 0;
1140 		/*
1141 		 * dsl_dir_sync_destroy() called us, they'll destroy
1142 		 * the dataset.
1143 		 */
1144 	} else {
1145 		/* remove from snapshot namespace */
1146 		dsl_dataset_t *ds_head;
1147 		ds_head = dsl_dataset_open_obj(dd->dd_pool,
1148 		    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG);
1149 #ifdef ZFS_DEBUG
1150 		{
1151 			uint64_t val;
1152 			err = zap_lookup(mos,
1153 			    ds_head->ds_phys->ds_snapnames_zapobj,
1154 			    snapname, 8, 1, &val);
1155 			ASSERT3U(err, ==, 0);
1156 			ASSERT3U(val, ==, obj);
1157 		}
1158 #endif
1159 		err = zap_remove(mos, ds_head->ds_phys->ds_snapnames_zapobj,
1160 		    snapname, tx);
1161 		ASSERT(err == 0);
1162 		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
1163 	}
1164 
1165 	if (ds_prev && ds->ds_prev != ds_prev)
1166 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1167 
1168 	err = dmu_object_free(mos, obj, tx);
1169 	ASSERT(err == 0);
1170 
1171 	/*
1172 	 * Close the objset with mode NONE, thus leaving it with
1173 	 * DOS_REF_MAX set, so that noone can access it.
1174 	 */
1175 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1176 
1177 	if (drop_lock)
1178 		rw_exit(&dp->dp_config_rwlock);
1179 	return (0);
1180 }
1181 
1182 int
1183 dsl_dataset_snapshot_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1184 {
1185 	const char *snapname = arg;
1186 	dsl_pool_t *dp = dd->dd_pool;
1187 	dmu_buf_t *dbuf;
1188 	dsl_dataset_phys_t *dsphys;
1189 	uint64_t dsobj, value;
1190 	objset_t *mos = dp->dp_meta_objset;
1191 	dsl_dataset_t *ds;
1192 	int err;
1193 
1194 	ASSERT(dmu_tx_is_syncing(tx));
1195 
1196 	if (dd->dd_phys->dd_head_dataset_obj == 0)
1197 		return (EINVAL);
1198 	ds = dsl_dataset_open_obj(dp, dd->dd_phys->dd_head_dataset_obj, NULL,
1199 	    DS_MODE_NONE, FTAG);
1200 
1201 	err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1202 	    snapname, 8, 1, &value);
1203 	if (err == 0) {
1204 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1205 		return (EEXIST);
1206 	}
1207 	ASSERT(err == ENOENT);
1208 
1209 	/* The point of no (unsuccessful) return */
1210 
1211 	dprintf_dd(dd, "taking snapshot %s in txg %llu\n",
1212 	    snapname, tx->tx_txg);
1213 
1214 	spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1215 
1216 	rw_enter(&dp->dp_config_rwlock, RW_WRITER);
1217 
1218 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1219 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1220 	dbuf = dmu_bonus_hold(mos, dsobj);
1221 	dmu_buf_will_dirty(dbuf, tx);
1222 	dsphys = dbuf->db_data;
1223 	dsphys->ds_dir_obj = dd->dd_object;
1224 	dsphys->ds_fsid_guid = unique_create();
1225 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
1226 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1227 	    sizeof (dsphys->ds_guid));
1228 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1229 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1230 	dsphys->ds_next_snap_obj = ds->ds_object;
1231 	dsphys->ds_num_children = 1;
1232 	dsphys->ds_creation_time = gethrestime_sec();
1233 	dsphys->ds_creation_txg = tx->tx_txg;
1234 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1235 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1236 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1237 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1238 	dsphys->ds_restoring = ds->ds_phys->ds_restoring;
1239 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1240 	dmu_buf_rele(dbuf);
1241 
1242 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1243 		dsl_dataset_t *ds_prev;
1244 
1245 		ds_prev = dsl_dataset_open_obj(dp,
1246 		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_NONE, FTAG);
1247 		ASSERT(ds_prev->ds_phys->ds_next_snap_obj ==
1248 		    ds->ds_object ||
1249 		    ds_prev->ds_phys->ds_num_children > 1);
1250 		if (ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1251 			dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1252 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1253 			    ds_prev->ds_phys->ds_creation_txg);
1254 			ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1255 		}
1256 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1257 	} else {
1258 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==, 0);
1259 	}
1260 
1261 	bplist_close(&ds->ds_deadlist);
1262 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1263 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, dsphys->ds_creation_txg);
1264 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1265 	ds->ds_phys->ds_prev_snap_txg = dsphys->ds_creation_txg;
1266 	ds->ds_phys->ds_unique_bytes = 0;
1267 	ds->ds_phys->ds_deadlist_obj =
1268 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1269 	bplist_open(&ds->ds_deadlist, mos, ds->ds_phys->ds_deadlist_obj);
1270 
1271 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1272 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1273 	    snapname, 8, 1, &dsobj, tx);
1274 	ASSERT(err == 0);
1275 
1276 	if (ds->ds_prev)
1277 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
1278 	ds->ds_prev = dsl_dataset_open_obj(dp,
1279 	    ds->ds_phys->ds_prev_snap_obj, snapname, DS_MODE_NONE, ds);
1280 
1281 	rw_exit(&dp->dp_config_rwlock);
1282 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1283 
1284 	return (0);
1285 }
1286 
1287 void
1288 dsl_dataset_sync(dsl_dataset_t *ds, dmu_tx_t *tx)
1289 {
1290 	ASSERT(dmu_tx_is_syncing(tx));
1291 	ASSERT(ds->ds_user_ptr != NULL);
1292 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1293 
1294 	dmu_objset_sync(ds->ds_user_ptr, tx);
1295 	dsl_dir_dirty(ds->ds_dir, tx);
1296 	bplist_close(&ds->ds_deadlist);
1297 
1298 	dmu_buf_remove_ref(ds->ds_dbuf, ds);
1299 }
1300 
1301 void
1302 dsl_dataset_stats(dsl_dataset_t *ds, dmu_objset_stats_t *dds)
1303 {
1304 	/* fill in properties crap */
1305 	dsl_dir_stats(ds->ds_dir, dds);
1306 
1307 	if (ds->ds_phys->ds_num_children != 0) {
1308 		dds->dds_is_snapshot = TRUE;
1309 		dds->dds_num_clones = ds->ds_phys->ds_num_children - 1;
1310 	}
1311 
1312 	dds->dds_last_txg = ds->ds_phys->ds_bp.blk_birth;
1313 
1314 	dds->dds_objects_used = ds->ds_phys->ds_bp.blk_fill;
1315 	dds->dds_objects_avail = DN_MAX_OBJECT - dds->dds_objects_used;
1316 
1317 	/* We override the dataset's creation time... they should be the same */
1318 	dds->dds_creation_time = ds->ds_phys->ds_creation_time;
1319 	dds->dds_creation_txg = ds->ds_phys->ds_creation_txg;
1320 	dds->dds_space_refd = ds->ds_phys->ds_used_bytes;
1321 	dds->dds_fsid_guid = ds->ds_phys->ds_fsid_guid;
1322 	dds->dds_guid = ds->ds_phys->ds_guid;
1323 
1324 	if (ds->ds_phys->ds_next_snap_obj) {
1325 		/*
1326 		 * This is a snapshot; override the dd's space used with
1327 		 * our unique space
1328 		 */
1329 		dds->dds_space_used = ds->ds_phys->ds_unique_bytes;
1330 		dds->dds_compressed_bytes =
1331 		    ds->ds_phys->ds_compressed_bytes;
1332 		dds->dds_uncompressed_bytes =
1333 		    ds->ds_phys->ds_uncompressed_bytes;
1334 	}
1335 
1336 	dds->dds_objset_obj = ds->ds_object;
1337 }
1338 
1339 dsl_pool_t *
1340 dsl_dataset_pool(dsl_dataset_t *ds)
1341 {
1342 	return (ds->ds_dir->dd_pool);
1343 }
1344 
1345 struct osrenamearg {
1346 	const char *oldname;
1347 	const char *newname;
1348 };
1349 
1350 static int
1351 dsl_dataset_snapshot_rename_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1352 {
1353 	struct osrenamearg *ora = arg;
1354 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1355 	dsl_dir_t *nds;
1356 	const char *tail;
1357 	int err;
1358 	dsl_dataset_t *snds, *fsds;
1359 	uint64_t val;
1360 
1361 	err = dsl_dataset_open_spa(dd->dd_pool->dp_spa, ora->oldname,
1362 	    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &snds);
1363 	if (err)
1364 		return (err);
1365 
1366 	if (snds->ds_dir != dd) {
1367 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1368 		return (EINVAL);
1369 	}
1370 
1371 	/* better be changing a snapshot */
1372 	if (snds->ds_phys->ds_next_snap_obj == 0) {
1373 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1374 		return (EINVAL);
1375 	}
1376 
1377 	/* new fs better exist */
1378 	nds = dsl_dir_open_spa(dd->dd_pool->dp_spa, ora->newname, FTAG, &tail);
1379 	if (nds == NULL) {
1380 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1381 		return (ENOENT);
1382 	}
1383 
1384 	dsl_dir_close(nds, FTAG);
1385 
1386 	/* new name better be in same fs */
1387 	if (nds != dd) {
1388 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1389 		return (EINVAL);
1390 	}
1391 
1392 	/* new name better be a snapshot */
1393 	if (tail == NULL || tail[0] != '@') {
1394 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1395 		return (EINVAL);
1396 	}
1397 
1398 	tail++;
1399 
1400 	fsds = dsl_dataset_open_obj(dd->dd_pool,
1401 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG);
1402 
1403 	/* new name better not be in use */
1404 	err = zap_lookup(mos, fsds->ds_phys->ds_snapnames_zapobj,
1405 	    tail, 8, 1, &val);
1406 	if (err != ENOENT) {
1407 		if (err == 0)
1408 			err = EEXIST;
1409 		dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1410 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1411 		return (EEXIST);
1412 	}
1413 
1414 	/* The point of no (unsuccessful) return */
1415 
1416 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_WRITER);
1417 	dsl_dataset_get_snapname(snds);
1418 	err = zap_remove(mos, fsds->ds_phys->ds_snapnames_zapobj,
1419 	    snds->ds_snapname, tx);
1420 	ASSERT3U(err, ==, 0);
1421 	mutex_enter(&snds->ds_lock);
1422 	(void) strcpy(snds->ds_snapname, tail);
1423 	mutex_exit(&snds->ds_lock);
1424 	err = zap_add(mos, fsds->ds_phys->ds_snapnames_zapobj,
1425 	    snds->ds_snapname, 8, 1, &snds->ds_object, tx);
1426 	ASSERT3U(err, ==, 0);
1427 	rw_exit(&dd->dd_pool->dp_config_rwlock);
1428 
1429 	dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1430 	dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1431 	return (0);
1432 }
1433 
1434 #pragma weak dmu_objset_rename = dsl_dataset_rename
1435 int
1436 dsl_dataset_rename(const char *osname, const char *newname)
1437 {
1438 	dsl_dir_t *dd;
1439 	const char *tail;
1440 	struct osrenamearg ora;
1441 	int err;
1442 
1443 	dd = dsl_dir_open(osname, FTAG, &tail);
1444 	if (dd == NULL)
1445 		return (ENOENT);
1446 	if (tail == NULL) {
1447 		err = dsl_dir_sync_task(dd,
1448 		    dsl_dir_rename_sync, (void*)newname, 1<<12);
1449 		dsl_dir_close(dd, FTAG);
1450 		return (err);
1451 	}
1452 	if (tail[0] != '@') {
1453 		/* the name ended in a nonexistant component */
1454 		dsl_dir_close(dd, FTAG);
1455 		return (ENOENT);
1456 	}
1457 
1458 	ora.oldname = osname;
1459 	ora.newname = newname;
1460 
1461 	err = dsl_dir_sync_task(dd,
1462 	    dsl_dataset_snapshot_rename_sync, &ora, 1<<12);
1463 	dsl_dir_close(dd, FTAG);
1464 	return (err);
1465 }
1466