xref: /titanic_50/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision d8260c5137b0926a897a3763eca8997922ad7401)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu_objset.h>
29 #include <sys/dsl_dataset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dmu_traverse.h>
32 #include <sys/dmu_tx.h>
33 #include <sys/arc.h>
34 #include <sys/zio.h>
35 #include <sys/zap.h>
36 #include <sys/unique.h>
37 #include <sys/zfs_context.h>
38 
39 static int dsl_dataset_destroy_begin_sync(dsl_dir_t *dd,
40     void *arg, dmu_tx_t *tx);
41 
42 #define	DOS_REF_MAX	(1ULL << 62)
43 
44 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
45 
46 #define	BP_GET_UCSIZE(bp) \
47 	((BP_GET_LEVEL(bp) > 0 || dmu_ot[BP_GET_TYPE(bp)].ot_metadata) ? \
48 	BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp));
49 
50 /*
51  * We use weighted reference counts to express the various forms of exclusion
52  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
53  * is DOS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
54  * This makes the exclusion logic simple: the total refcnt for all opens cannot
55  * exceed DOS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
56  * weight (DOS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
57  * just over half of the refcnt space, so there can't be more than one, but it
58  * can peacefully coexist with any number of STANDARD opens.
59  */
60 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
61 	0,			/* DOS_MODE_NONE - invalid		*/
62 	1,			/* DOS_MODE_STANDARD - unlimited number	*/
63 	(DOS_REF_MAX >> 1) + 1,	/* DOS_MODE_PRIMARY - only one of these	*/
64 	DOS_REF_MAX		/* DOS_MODE_EXCLUSIVE - no other opens	*/
65 };
66 
67 
68 void
69 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
70 {
71 	int used = BP_GET_ASIZE(bp);
72 	int compressed = BP_GET_PSIZE(bp);
73 	int uncompressed = BP_GET_UCSIZE(bp);
74 
75 	dprintf_bp(bp, "born, ds=%p\n", ds);
76 
77 	ASSERT(dmu_tx_is_syncing(tx));
78 	/* It could have been compressed away to nothing */
79 	if (BP_IS_HOLE(bp))
80 		return;
81 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
82 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
83 	if (ds == NULL) {
84 		/*
85 		 * Account for the meta-objset space in its placeholder
86 		 * dsl_dir.
87 		 */
88 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
89 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
90 		    used, compressed, uncompressed, tx);
91 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
92 		return;
93 	}
94 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
95 	mutex_enter(&ds->ds_lock);
96 	ds->ds_phys->ds_used_bytes += used;
97 	ds->ds_phys->ds_compressed_bytes += compressed;
98 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
99 	ds->ds_phys->ds_unique_bytes += used;
100 	mutex_exit(&ds->ds_lock);
101 	dsl_dir_diduse_space(ds->ds_dir,
102 	    used, compressed, uncompressed, tx);
103 }
104 
105 void
106 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
107 {
108 	int used = BP_GET_ASIZE(bp);
109 	int compressed = BP_GET_PSIZE(bp);
110 	int uncompressed = BP_GET_UCSIZE(bp);
111 
112 	ASSERT(dmu_tx_is_syncing(tx));
113 	if (BP_IS_HOLE(bp))
114 		return;
115 
116 	ASSERT(used > 0);
117 	if (ds == NULL) {
118 		/*
119 		 * Account for the meta-objset space in its placeholder
120 		 * dataset.
121 		 */
122 		/* XXX this can fail, what do we do when it does? */
123 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
124 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
125 		bzero(bp, sizeof (blkptr_t));
126 
127 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
128 		    -used, -compressed, -uncompressed, tx);
129 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
130 		return;
131 	}
132 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
133 
134 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
135 
136 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
137 		dprintf_bp(bp, "freeing: %s", "");
138 		/* XXX check return code? */
139 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
140 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
141 
142 		mutex_enter(&ds->ds_lock);
143 		/* XXX unique_bytes is not accurate for head datasets */
144 		/* ASSERT3U(ds->ds_phys->ds_unique_bytes, >=, used); */
145 		ds->ds_phys->ds_unique_bytes -= used;
146 		mutex_exit(&ds->ds_lock);
147 		dsl_dir_diduse_space(ds->ds_dir,
148 		    -used, -compressed, -uncompressed, tx);
149 	} else {
150 		dprintf_bp(bp, "putting on dead list: %s", "");
151 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
152 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
153 		if (ds->ds_phys->ds_prev_snap_obj != 0) {
154 			ASSERT3U(ds->ds_prev->ds_object, ==,
155 			    ds->ds_phys->ds_prev_snap_obj);
156 			ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
157 			if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
158 			    ds->ds_object &&
159 			    bp->blk_birth >
160 			    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
161 				dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
162 				mutex_enter(&ds->ds_prev->ds_lock);
163 				ds->ds_prev->ds_phys->ds_unique_bytes +=
164 				    used;
165 				mutex_exit(&ds->ds_prev->ds_lock);
166 			}
167 		}
168 	}
169 	bzero(bp, sizeof (blkptr_t));
170 	mutex_enter(&ds->ds_lock);
171 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
172 	ds->ds_phys->ds_used_bytes -= used;
173 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
174 	ds->ds_phys->ds_compressed_bytes -= compressed;
175 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
176 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
177 	mutex_exit(&ds->ds_lock);
178 }
179 
180 uint64_t
181 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
182 {
183 	uint64_t txg;
184 	dsl_dir_t *dd;
185 
186 	if (ds == NULL)
187 		return (0);
188 	/*
189 	 * The snapshot creation could fail, but that would cause an
190 	 * incorrect FALSE return, which would only result in an
191 	 * overestimation of the amount of space that an operation would
192 	 * consume, which is OK.
193 	 *
194 	 * There's also a small window where we could miss a pending
195 	 * snapshot, because we could set the sync task in the quiescing
196 	 * phase.  So this should only be used as a guess.
197 	 */
198 	dd = ds->ds_dir;
199 	mutex_enter(&dd->dd_lock);
200 	if (dd->dd_sync_func == dsl_dataset_snapshot_sync)
201 		txg = dd->dd_sync_txg;
202 	else
203 		txg = ds->ds_phys->ds_prev_snap_txg;
204 	mutex_exit(&dd->dd_lock);
205 
206 	return (txg);
207 }
208 
209 int
210 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
211 {
212 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
213 }
214 
215 /* ARGSUSED */
216 static void
217 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
218 {
219 	dsl_dataset_t *ds = dsv;
220 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
221 
222 	/* open_refcount == DOS_REF_MAX when deleting */
223 	ASSERT(ds->ds_open_refcount == 0 ||
224 	    ds->ds_open_refcount == DOS_REF_MAX);
225 
226 	dprintf_ds(ds, "evicting %s\n", "");
227 
228 	unique_remove(ds->ds_phys->ds_fsid_guid);
229 
230 	if (ds->ds_user_ptr != NULL)
231 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
232 
233 	if (ds->ds_prev) {
234 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
235 		ds->ds_prev = NULL;
236 	}
237 
238 	bplist_close(&ds->ds_deadlist);
239 	dsl_dir_close(ds->ds_dir, ds);
240 
241 	if (list_link_active(&ds->ds_synced_link))
242 		list_remove(&dp->dp_synced_objsets, ds);
243 
244 	kmem_free(ds, sizeof (dsl_dataset_t));
245 }
246 
247 static int
248 dsl_dataset_get_snapname(dsl_dataset_t *ds)
249 {
250 	dsl_dataset_phys_t *headphys;
251 	int err;
252 	dmu_buf_t *headdbuf;
253 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
254 	objset_t *mos = dp->dp_meta_objset;
255 
256 	if (ds->ds_snapname[0])
257 		return (0);
258 	if (ds->ds_phys->ds_next_snap_obj == 0)
259 		return (0);
260 
261 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
262 	    FTAG, &headdbuf);
263 	if (err)
264 		return (err);
265 	headphys = headdbuf->db_data;
266 	err = zap_value_search(dp->dp_meta_objset,
267 	    headphys->ds_snapnames_zapobj, ds->ds_object, ds->ds_snapname);
268 	dmu_buf_rele(headdbuf, FTAG);
269 	return (err);
270 }
271 
272 int
273 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
274     int mode, void *tag, dsl_dataset_t **dsp)
275 {
276 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
277 	objset_t *mos = dp->dp_meta_objset;
278 	dmu_buf_t *dbuf;
279 	dsl_dataset_t *ds;
280 	int err;
281 
282 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
283 	    dsl_pool_sync_context(dp));
284 
285 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
286 	if (err)
287 		return (err);
288 	ds = dmu_buf_get_user(dbuf);
289 	if (ds == NULL) {
290 		dsl_dataset_t *winner;
291 
292 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
293 		ds->ds_dbuf = dbuf;
294 		ds->ds_object = dsobj;
295 		ds->ds_phys = dbuf->db_data;
296 
297 		err = bplist_open(&ds->ds_deadlist,
298 		    mos, ds->ds_phys->ds_deadlist_obj);
299 		if (err == 0) {
300 			err = dsl_dir_open_obj(dp,
301 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
302 		}
303 		if (err) {
304 			/*
305 			 * we don't really need to close the blist if we
306 			 * just opened it.
307 			 */
308 			kmem_free(ds, sizeof (dsl_dataset_t));
309 			dmu_buf_rele(dbuf, tag);
310 			return (err);
311 		}
312 
313 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
314 			ds->ds_snapname[0] = '\0';
315 			if (ds->ds_phys->ds_prev_snap_obj) {
316 				err = dsl_dataset_open_obj(dp,
317 				    ds->ds_phys->ds_prev_snap_obj, NULL,
318 				    DS_MODE_NONE, ds, &ds->ds_prev);
319 			}
320 		} else {
321 			if (snapname) {
322 #ifdef ZFS_DEBUG
323 				dsl_dataset_phys_t *headphys;
324 				dmu_buf_t *headdbuf;
325 				err = dmu_bonus_hold(mos,
326 				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
327 				    FTAG, &headdbuf);
328 				if (err == 0) {
329 					headphys = headdbuf->db_data;
330 					uint64_t foundobj;
331 					err = zap_lookup(dp->dp_meta_objset,
332 					    headphys->ds_snapnames_zapobj,
333 					    snapname, sizeof (foundobj), 1,
334 					    &foundobj);
335 					ASSERT3U(foundobj, ==, dsobj);
336 					dmu_buf_rele(headdbuf, FTAG);
337 				}
338 #endif
339 				(void) strcat(ds->ds_snapname, snapname);
340 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
341 				err = dsl_dataset_get_snapname(ds);
342 			}
343 		}
344 
345 		if (err == 0) {
346 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
347 			    dsl_dataset_evict);
348 		}
349 		if (err || winner) {
350 			bplist_close(&ds->ds_deadlist);
351 			if (ds->ds_prev) {
352 				dsl_dataset_close(ds->ds_prev,
353 				    DS_MODE_NONE, ds);
354 			}
355 			dsl_dir_close(ds->ds_dir, ds);
356 			kmem_free(ds, sizeof (dsl_dataset_t));
357 			if (err) {
358 				dmu_buf_rele(dbuf, tag);
359 				return (err);
360 			}
361 			ds = winner;
362 		} else {
363 			uint64_t new =
364 			    unique_insert(ds->ds_phys->ds_fsid_guid);
365 			if (new != ds->ds_phys->ds_fsid_guid) {
366 				/* XXX it won't necessarily be synced... */
367 				ds->ds_phys->ds_fsid_guid = new;
368 			}
369 		}
370 	}
371 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
372 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
373 
374 	mutex_enter(&ds->ds_lock);
375 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
376 	    ds->ds_phys->ds_inconsistent && !DS_MODE_IS_INCONSISTENT(mode)) ||
377 	    (ds->ds_open_refcount + weight > DOS_REF_MAX)) {
378 		mutex_exit(&ds->ds_lock);
379 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
380 		return (EBUSY);
381 	}
382 	ds->ds_open_refcount += weight;
383 	mutex_exit(&ds->ds_lock);
384 
385 	*dsp = ds;
386 	return (0);
387 }
388 
389 int
390 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
391     void *tag, dsl_dataset_t **dsp)
392 {
393 	dsl_dir_t *dd;
394 	dsl_pool_t *dp;
395 	const char *tail;
396 	uint64_t obj;
397 	dsl_dataset_t *ds = NULL;
398 	int err = 0;
399 
400 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
401 	if (err)
402 		return (err);
403 
404 	dp = dd->dd_pool;
405 	obj = dd->dd_phys->dd_head_dataset_obj;
406 	rw_enter(&dp->dp_config_rwlock, RW_READER);
407 	if (obj == 0) {
408 		/* A dataset with no associated objset */
409 		err = ENOENT;
410 		goto out;
411 	}
412 
413 	if (tail != NULL) {
414 		objset_t *mos = dp->dp_meta_objset;
415 
416 		err = dsl_dataset_open_obj(dp, obj, NULL,
417 		    DS_MODE_NONE, tag, &ds);
418 		if (err)
419 			goto out;
420 		obj = ds->ds_phys->ds_snapnames_zapobj;
421 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
422 		ds = NULL;
423 
424 		if (tail[0] != '@') {
425 			err = ENOENT;
426 			goto out;
427 		}
428 		tail++;
429 
430 		/* Look for a snapshot */
431 		if (!DS_MODE_IS_READONLY(mode)) {
432 			err = EROFS;
433 			goto out;
434 		}
435 		dprintf("looking for snapshot '%s'\n", tail);
436 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
437 		if (err)
438 			goto out;
439 	}
440 	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
441 
442 out:
443 	rw_exit(&dp->dp_config_rwlock);
444 	dsl_dir_close(dd, FTAG);
445 
446 	ASSERT3U((err == 0), ==, (ds != NULL));
447 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
448 
449 	*dsp = ds;
450 	return (err);
451 }
452 
453 int
454 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
455 {
456 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
457 }
458 
459 void
460 dsl_dataset_name(dsl_dataset_t *ds, char *name)
461 {
462 	if (ds == NULL) {
463 		(void) strcpy(name, "mos");
464 	} else {
465 		dsl_dir_name(ds->ds_dir, name);
466 		VERIFY(0 == dsl_dataset_get_snapname(ds));
467 		if (ds->ds_snapname[0]) {
468 			(void) strcat(name, "@");
469 			if (!MUTEX_HELD(&ds->ds_lock)) {
470 				/*
471 				 * We use a "recursive" mutex so that we
472 				 * can call dprintf_ds() with ds_lock held.
473 				 */
474 				mutex_enter(&ds->ds_lock);
475 				(void) strcat(name, ds->ds_snapname);
476 				mutex_exit(&ds->ds_lock);
477 			} else {
478 				(void) strcat(name, ds->ds_snapname);
479 			}
480 		}
481 	}
482 }
483 
484 void
485 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
486 {
487 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
488 	mutex_enter(&ds->ds_lock);
489 	ASSERT3U(ds->ds_open_refcount, >=, weight);
490 	ds->ds_open_refcount -= weight;
491 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
492 	    mode, ds->ds_open_refcount);
493 	mutex_exit(&ds->ds_lock);
494 
495 	dmu_buf_rele(ds->ds_dbuf, tag);
496 }
497 
498 void
499 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
500 {
501 	objset_t *mos = dp->dp_meta_objset;
502 	dmu_buf_t *dbuf;
503 	dsl_dataset_phys_t *dsphys;
504 	dsl_dataset_t *ds;
505 	uint64_t dsobj;
506 	dsl_dir_t *dd;
507 
508 	dsl_dir_create_root(mos, ddobjp, tx);
509 	VERIFY(0 == dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG, &dd));
510 
511 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
512 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
513 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
514 	dmu_buf_will_dirty(dbuf, tx);
515 	dsphys = dbuf->db_data;
516 	dsphys->ds_dir_obj = dd->dd_object;
517 	dsphys->ds_fsid_guid = unique_create();
518 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
519 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
520 	    sizeof (dsphys->ds_guid));
521 	dsphys->ds_snapnames_zapobj =
522 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
523 	dsphys->ds_creation_time = gethrestime_sec();
524 	dsphys->ds_creation_txg = tx->tx_txg;
525 	dsphys->ds_deadlist_obj =
526 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
527 	dmu_buf_rele(dbuf, FTAG);
528 
529 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
530 	dd->dd_phys->dd_head_dataset_obj = dsobj;
531 	dsl_dir_close(dd, FTAG);
532 
533 	VERIFY(0 ==
534 	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
535 	(void) dmu_objset_create_impl(dp->dp_spa, ds, DMU_OST_ZFS, tx);
536 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
537 }
538 
539 int
540 dsl_dataset_create_sync(dsl_dir_t *pds, const char *fullname,
541     const char *lastname, dsl_dataset_t *clone_parent, dmu_tx_t *tx)
542 {
543 	int err;
544 	dsl_pool_t *dp = pds->dd_pool;
545 	dmu_buf_t *dbuf;
546 	dsl_dataset_phys_t *dsphys;
547 	uint64_t dsobj;
548 	objset_t *mos = dp->dp_meta_objset;
549 	dsl_dir_t *dd;
550 
551 	if (clone_parent != NULL) {
552 		/*
553 		 * You can't clone across pools.
554 		 */
555 		if (clone_parent->ds_dir->dd_pool != dp)
556 			return (EXDEV);
557 
558 		/*
559 		 * You can only clone snapshots, not the head datasets.
560 		 */
561 		if (clone_parent->ds_phys->ds_num_children == 0)
562 			return (EINVAL);
563 	}
564 
565 	ASSERT(lastname[0] != '@');
566 	ASSERT(dmu_tx_is_syncing(tx));
567 
568 	err = dsl_dir_create_sync(pds, lastname, tx);
569 	if (err)
570 		return (err);
571 	VERIFY(0 == dsl_dir_open_spa(dp->dp_spa, fullname, FTAG, &dd, NULL));
572 
573 	/* This is the point of no (unsuccessful) return */
574 
575 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
576 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
577 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
578 	dmu_buf_will_dirty(dbuf, tx);
579 	dsphys = dbuf->db_data;
580 	dsphys->ds_dir_obj = dd->dd_object;
581 	dsphys->ds_fsid_guid = unique_create();
582 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
583 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
584 	    sizeof (dsphys->ds_guid));
585 	dsphys->ds_snapnames_zapobj =
586 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
587 	dsphys->ds_creation_time = gethrestime_sec();
588 	dsphys->ds_creation_txg = tx->tx_txg;
589 	dsphys->ds_deadlist_obj =
590 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
591 	if (clone_parent) {
592 		dsphys->ds_prev_snap_obj = clone_parent->ds_object;
593 		dsphys->ds_prev_snap_txg =
594 		    clone_parent->ds_phys->ds_creation_txg;
595 		dsphys->ds_used_bytes =
596 		    clone_parent->ds_phys->ds_used_bytes;
597 		dsphys->ds_compressed_bytes =
598 		    clone_parent->ds_phys->ds_compressed_bytes;
599 		dsphys->ds_uncompressed_bytes =
600 		    clone_parent->ds_phys->ds_uncompressed_bytes;
601 		dsphys->ds_bp = clone_parent->ds_phys->ds_bp;
602 
603 		dmu_buf_will_dirty(clone_parent->ds_dbuf, tx);
604 		clone_parent->ds_phys->ds_num_children++;
605 
606 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
607 		dd->dd_phys->dd_clone_parent_obj = clone_parent->ds_object;
608 	}
609 	dmu_buf_rele(dbuf, FTAG);
610 
611 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
612 	dd->dd_phys->dd_head_dataset_obj = dsobj;
613 	dsl_dir_close(dd, FTAG);
614 
615 	return (0);
616 }
617 
618 int
619 dsl_dataset_destroy(const char *name)
620 {
621 	int err;
622 	dsl_pool_t *dp;
623 	dsl_dir_t *dd;
624 	const char *tail;
625 
626 	err = dsl_dir_open(name, FTAG, &dd, &tail);
627 	if (err)
628 		return (err);
629 
630 	dp = dd->dd_pool;
631 	if (tail != NULL) {
632 		if (tail[0] != '@') {
633 			dsl_dir_close(dd, FTAG);
634 			return (ENOENT);
635 		}
636 		tail++;
637 		/* Just blow away the snapshot */
638 		do {
639 			txg_wait_synced(dp, 0);
640 			err = dsl_dir_sync_task(dd,
641 			    dsl_dataset_destroy_sync, (void*)tail, 0);
642 		} while (err == EAGAIN);
643 		dsl_dir_close(dd, FTAG);
644 	} else {
645 		char buf[MAXNAMELEN];
646 		char *cp;
647 		objset_t *os;
648 		uint64_t obj;
649 		dsl_dir_t *pds;
650 
651 		if (dd->dd_phys->dd_parent_obj == 0) {
652 			dsl_dir_close(dd, FTAG);
653 			return (EINVAL);
654 		}
655 
656 		err = dmu_objset_open(name, DMU_OST_ANY,
657 		    DS_MODE_PRIMARY | DS_MODE_INCONSISTENT, &os);
658 		if (err) {
659 			dsl_dir_close(dd, FTAG);
660 			return (err);
661 		}
662 
663 		/*
664 		 * Check for errors and mark this ds as inconsistent, in
665 		 * case we crash while freeing the objects.
666 		 */
667 		err = dsl_dir_sync_task(os->os->os_dsl_dataset->ds_dir,
668 		    dsl_dataset_destroy_begin_sync, os->os->os_dsl_dataset, 0);
669 		if (err) {
670 			dmu_objset_close(os);
671 			dsl_dir_close(dd, FTAG);
672 			return (err);
673 		}
674 
675 		/*
676 		 * remove the objects in open context, so that we won't
677 		 * have too much to do in syncing context.
678 		 */
679 		for (obj = 0; err == 0;
680 		    err = dmu_object_next(os, &obj, FALSE)) {
681 			dmu_tx_t *tx = dmu_tx_create(os);
682 			dmu_tx_hold_free(tx, obj, 0, DMU_OBJECT_END);
683 			dmu_tx_hold_bonus(tx, obj);
684 			err = dmu_tx_assign(tx, TXG_WAIT);
685 			if (err) {
686 				/*
687 				 * Perhaps there is not enough disk
688 				 * space.  Just deal with it from
689 				 * dsl_dataset_destroy_sync().
690 				 */
691 				dmu_tx_abort(tx);
692 				continue;
693 			}
694 			VERIFY(0 == dmu_object_free(os, obj, tx));
695 			dmu_tx_commit(tx);
696 		}
697 		/* Make sure it's not dirty before we finish destroying it. */
698 		txg_wait_synced(dd->dd_pool, 0);
699 
700 		dmu_objset_close(os);
701 		if (err != ESRCH) {
702 			dsl_dir_close(dd, FTAG);
703 			return (err);
704 		}
705 
706 		/*
707 		 * Blow away the dsl_dir + head dataset.
708 		 * dsl_dir_destroy_sync() will call
709 		 * dsl_dataset_destroy_sync() to destroy the head dataset.
710 		 */
711 		rw_enter(&dp->dp_config_rwlock, RW_READER);
712 		err = dsl_dir_open_obj(dd->dd_pool,
713 		    dd->dd_phys->dd_parent_obj, NULL, FTAG, &pds);
714 		dsl_dir_close(dd, FTAG);
715 		rw_exit(&dp->dp_config_rwlock);
716 		if (err)
717 			return (err);
718 
719 		(void) strcpy(buf, name);
720 		cp = strrchr(buf, '/') + 1;
721 		ASSERT(cp[0] != '\0');
722 		do {
723 			txg_wait_synced(dp, 0);
724 			err = dsl_dir_sync_task(pds,
725 			    dsl_dir_destroy_sync, cp, 0);
726 		} while (err == EAGAIN);
727 		dsl_dir_close(pds, FTAG);
728 	}
729 
730 	return (err);
731 }
732 
733 int
734 dsl_dataset_rollback(const char *name)
735 {
736 	int err;
737 	dsl_dir_t *dd;
738 	const char *tail;
739 
740 	err = dsl_dir_open(name, FTAG, &dd, &tail);
741 	if (err)
742 		return (err);
743 
744 	if (tail != NULL) {
745 		dsl_dir_close(dd, FTAG);
746 		return (EINVAL);
747 	}
748 	do {
749 		txg_wait_synced(dd->dd_pool, 0);
750 		err = dsl_dir_sync_task(dd,
751 		    dsl_dataset_rollback_sync, NULL, 0);
752 	} while (err == EAGAIN);
753 	dsl_dir_close(dd, FTAG);
754 
755 	return (err);
756 }
757 
758 void *
759 dsl_dataset_set_user_ptr(dsl_dataset_t *ds,
760     void *p, dsl_dataset_evict_func_t func)
761 {
762 	void *old;
763 
764 	mutex_enter(&ds->ds_lock);
765 	old = ds->ds_user_ptr;
766 	if (old == NULL) {
767 		ds->ds_user_ptr = p;
768 		ds->ds_user_evict_func = func;
769 	}
770 	mutex_exit(&ds->ds_lock);
771 	return (old);
772 }
773 
774 void *
775 dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
776 {
777 	return (ds->ds_user_ptr);
778 }
779 
780 
781 void
782 dsl_dataset_get_blkptr(dsl_dataset_t *ds, blkptr_t *bp)
783 {
784 	*bp = ds->ds_phys->ds_bp;
785 }
786 
787 void
788 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
789 {
790 	ASSERT(dmu_tx_is_syncing(tx));
791 	/* If it's the meta-objset, set dp_meta_rootbp */
792 	if (ds == NULL) {
793 		tx->tx_pool->dp_meta_rootbp = *bp;
794 	} else {
795 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
796 		ds->ds_phys->ds_bp = *bp;
797 	}
798 }
799 
800 spa_t *
801 dsl_dataset_get_spa(dsl_dataset_t *ds)
802 {
803 	return (ds->ds_dir->dd_pool->dp_spa);
804 }
805 
806 void
807 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
808 {
809 	dsl_pool_t *dp;
810 
811 	if (ds == NULL) /* this is the meta-objset */
812 		return;
813 
814 	ASSERT(ds->ds_user_ptr != NULL);
815 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
816 
817 	dp = ds->ds_dir->dd_pool;
818 
819 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
820 		/* up the hold count until we can be written out */
821 		dmu_buf_add_ref(ds->ds_dbuf, ds);
822 	}
823 }
824 
825 struct killarg {
826 	uint64_t *usedp;
827 	uint64_t *compressedp;
828 	uint64_t *uncompressedp;
829 	zio_t *zio;
830 	dmu_tx_t *tx;
831 };
832 
833 static int
834 kill_blkptr(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
835 {
836 	struct killarg *ka = arg;
837 	blkptr_t *bp = &bc->bc_blkptr;
838 
839 	ASSERT3U(bc->bc_errno, ==, 0);
840 
841 	/*
842 	 * Since this callback is not called concurrently, no lock is
843 	 * needed on the accounting values.
844 	 */
845 	*ka->usedp += BP_GET_ASIZE(bp);
846 	*ka->compressedp += BP_GET_PSIZE(bp);
847 	*ka->uncompressedp += BP_GET_UCSIZE(bp);
848 	/* XXX check for EIO? */
849 	(void) arc_free(ka->zio, spa, ka->tx->tx_txg, bp, NULL, NULL,
850 	    ARC_NOWAIT);
851 	return (0);
852 }
853 
854 /* ARGSUSED */
855 int
856 dsl_dataset_rollback_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
857 {
858 	objset_t *mos = dd->dd_pool->dp_meta_objset;
859 	dsl_dataset_t *ds;
860 	int err;
861 
862 	if (dd->dd_phys->dd_head_dataset_obj == 0)
863 		return (EINVAL);
864 	err = dsl_dataset_open_obj(dd->dd_pool,
865 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &ds);
866 	if (err)
867 		return (err);
868 
869 	if (ds->ds_phys->ds_prev_snap_txg == 0) {
870 		/*
871 		 * There's no previous snapshot.  I suppose we could
872 		 * roll it back to being empty (and re-initialize the
873 		 * upper (ZPL) layer).  But for now there's no way to do
874 		 * this via the user interface.
875 		 */
876 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
877 		return (EINVAL);
878 	}
879 
880 	mutex_enter(&ds->ds_lock);
881 	if (ds->ds_open_refcount > 0) {
882 		mutex_exit(&ds->ds_lock);
883 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
884 		return (EBUSY);
885 	}
886 
887 	/*
888 	 * If we made changes this txg, traverse_dsl_dataset won't find
889 	 * them.  Try again.
890 	 */
891 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
892 		mutex_exit(&ds->ds_lock);
893 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
894 		return (EAGAIN);
895 	}
896 
897 	/* THE POINT OF NO (unsuccessful) RETURN */
898 	ds->ds_open_refcount = DOS_REF_MAX;
899 	mutex_exit(&ds->ds_lock);
900 
901 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
902 
903 	/* Zero out the deadlist. */
904 	dprintf("old deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
905 	bplist_close(&ds->ds_deadlist);
906 	bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
907 	ds->ds_phys->ds_deadlist_obj =
908 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
909 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
910 	    ds->ds_phys->ds_deadlist_obj));
911 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
912 
913 	{
914 		/* Free blkptrs that we gave birth to */
915 		zio_t *zio;
916 		uint64_t used = 0, compressed = 0, uncompressed = 0;
917 		struct killarg ka;
918 
919 		zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL,
920 		    ZIO_FLAG_MUSTSUCCEED);
921 		ka.usedp = &used;
922 		ka.compressedp = &compressed;
923 		ka.uncompressedp = &uncompressed;
924 		ka.zio = zio;
925 		ka.tx = tx;
926 		(void) traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
927 		    ADVANCE_POST, kill_blkptr, &ka);
928 		(void) zio_wait(zio);
929 
930 		dsl_dir_diduse_space(dd,
931 		    -used, -compressed, -uncompressed, tx);
932 	}
933 
934 	/* Change our contents to that of the prev snapshot (finally!) */
935 	ASSERT3U(ds->ds_prev->ds_object, ==, ds->ds_phys->ds_prev_snap_obj);
936 	ds->ds_phys->ds_bp = ds->ds_prev->ds_phys->ds_bp;
937 	ds->ds_phys->ds_used_bytes = ds->ds_prev->ds_phys->ds_used_bytes;
938 	ds->ds_phys->ds_compressed_bytes =
939 	    ds->ds_prev->ds_phys->ds_compressed_bytes;
940 	ds->ds_phys->ds_uncompressed_bytes =
941 	    ds->ds_prev->ds_phys->ds_uncompressed_bytes;
942 	ds->ds_phys->ds_inconsistent = ds->ds_prev->ds_phys->ds_inconsistent;
943 	ds->ds_phys->ds_unique_bytes = 0;
944 
945 	dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
946 	ds->ds_prev->ds_phys->ds_unique_bytes = 0;
947 
948 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
949 	ds->ds_open_refcount = 0;
950 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
951 
952 	return (0);
953 }
954 
955 /* ARGSUSED */
956 static int
957 dsl_dataset_destroy_begin_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
958 {
959 	dsl_dataset_t *ds = arg;
960 
961 	/*
962 	 * Can't delete a head dataset if there are snapshots of it.
963 	 * (Except if the only snapshots are from the branch we cloned
964 	 * from.)
965 	 */
966 	if (ds->ds_prev != NULL &&
967 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
968 		return (EINVAL);
969 
970 	/* Mark it as inconsistent on-disk, in case we crash */
971 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
972 	ds->ds_phys->ds_inconsistent = TRUE;
973 
974 	return (0);
975 }
976 
977 int
978 dsl_dataset_destroy_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
979 {
980 	const char *snapname = arg;
981 	uint64_t used = 0, compressed = 0, uncompressed = 0;
982 	blkptr_t bp;
983 	zio_t *zio;
984 	int err;
985 	int after_branch_point = FALSE;
986 	int drop_lock = FALSE;
987 	dsl_pool_t *dp = dd->dd_pool;
988 	objset_t *mos = dp->dp_meta_objset;
989 	dsl_dataset_t *ds, *ds_prev = NULL;
990 	uint64_t obj;
991 
992 	if (dd->dd_phys->dd_head_dataset_obj == 0)
993 		return (EINVAL);
994 
995 	if (!RW_WRITE_HELD(&dp->dp_config_rwlock)) {
996 		rw_enter(&dp->dp_config_rwlock, RW_WRITER);
997 		drop_lock = TRUE;
998 	}
999 
1000 	err = dsl_dataset_open_obj(dd->dd_pool,
1001 	    dd->dd_phys->dd_head_dataset_obj, NULL,
1002 	    snapname ? DS_MODE_NONE : DS_MODE_EXCLUSIVE, FTAG, &ds);
1003 
1004 	if (err == 0 && snapname) {
1005 		err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1006 		    snapname, 8, 1, &obj);
1007 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1008 		if (err == 0) {
1009 			err = dsl_dataset_open_obj(dd->dd_pool, obj, NULL,
1010 			    DS_MODE_EXCLUSIVE, FTAG, &ds);
1011 		}
1012 	}
1013 	if (err) {
1014 		if (drop_lock)
1015 			rw_exit(&dp->dp_config_rwlock);
1016 		return (err);
1017 	}
1018 
1019 	obj = ds->ds_object;
1020 
1021 	/* Can't delete a branch point. */
1022 	if (ds->ds_phys->ds_num_children > 1) {
1023 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1024 		if (drop_lock)
1025 			rw_exit(&dp->dp_config_rwlock);
1026 		return (EINVAL);
1027 	}
1028 
1029 	/*
1030 	 * Can't delete a head dataset if there are snapshots of it.
1031 	 * (Except if the only snapshots are from the branch we cloned
1032 	 * from.)
1033 	 */
1034 	if (ds->ds_prev != NULL &&
1035 	    ds->ds_prev->ds_phys->ds_next_snap_obj == obj) {
1036 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
1037 		if (drop_lock)
1038 			rw_exit(&dp->dp_config_rwlock);
1039 		return (EINVAL);
1040 	}
1041 
1042 	/*
1043 	 * If we made changes this txg, traverse_dsl_dataset won't find
1044 	 * them.  Try again.
1045 	 */
1046 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
1047 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1048 		if (drop_lock)
1049 			rw_exit(&dp->dp_config_rwlock);
1050 		return (EAGAIN);
1051 	}
1052 
1053 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1054 		if (ds->ds_prev) {
1055 			ds_prev = ds->ds_prev;
1056 		} else {
1057 			err = dsl_dataset_open_obj(dd->dd_pool,
1058 			    ds->ds_phys->ds_prev_snap_obj, NULL,
1059 			    DS_MODE_NONE, FTAG, &ds_prev);
1060 			if (err) {
1061 				dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1062 				if (drop_lock)
1063 					rw_exit(&dp->dp_config_rwlock);
1064 				return (err);
1065 			}
1066 		}
1067 		after_branch_point =
1068 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
1069 
1070 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1071 		if (after_branch_point &&
1072 		    ds->ds_phys->ds_next_snap_obj == 0) {
1073 			/* This clone is toast. */
1074 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1075 			ds_prev->ds_phys->ds_num_children--;
1076 		} else if (!after_branch_point) {
1077 			ds_prev->ds_phys->ds_next_snap_obj =
1078 			    ds->ds_phys->ds_next_snap_obj;
1079 		}
1080 	}
1081 
1082 	/* THE POINT OF NO (unsuccessful) RETURN */
1083 
1084 	ASSERT3P(tx->tx_pool, ==, dd->dd_pool);
1085 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1086 
1087 	if (ds->ds_phys->ds_next_snap_obj != 0) {
1088 		dsl_dataset_t *ds_next;
1089 		uint64_t itor = 0;
1090 
1091 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1092 
1093 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1094 		    ds->ds_phys->ds_next_snap_obj, NULL,
1095 		    DS_MODE_NONE, FTAG, &ds_next));
1096 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1097 
1098 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1099 		ds_next->ds_phys->ds_prev_snap_obj =
1100 		    ds->ds_phys->ds_prev_snap_obj;
1101 		ds_next->ds_phys->ds_prev_snap_txg =
1102 		    ds->ds_phys->ds_prev_snap_txg;
1103 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1104 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1105 
1106 		/*
1107 		 * Transfer to our deadlist (which will become next's
1108 		 * new deadlist) any entries from next's current
1109 		 * deadlist which were born before prev, and free the
1110 		 * other entries.
1111 		 *
1112 		 * XXX we're doing this long task with the config lock held
1113 		 */
1114 		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
1115 		    &bp) == 0) {
1116 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1117 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1118 				    &bp, tx));
1119 				if (ds_prev && !after_branch_point &&
1120 				    bp.blk_birth >
1121 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1122 					ds_prev->ds_phys->ds_unique_bytes +=
1123 					    BP_GET_ASIZE(&bp);
1124 				}
1125 			} else {
1126 				used += BP_GET_ASIZE(&bp);
1127 				compressed += BP_GET_PSIZE(&bp);
1128 				uncompressed += BP_GET_UCSIZE(&bp);
1129 				/* XXX check return value? */
1130 				(void) arc_free(zio, dp->dp_spa, tx->tx_txg,
1131 				    &bp, NULL, NULL, ARC_NOWAIT);
1132 			}
1133 		}
1134 
1135 		/* free next's deadlist */
1136 		bplist_close(&ds_next->ds_deadlist);
1137 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1138 
1139 		/* set next's deadlist to our deadlist */
1140 		ds_next->ds_phys->ds_deadlist_obj =
1141 		    ds->ds_phys->ds_deadlist_obj;
1142 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1143 		    ds_next->ds_phys->ds_deadlist_obj));
1144 		ds->ds_phys->ds_deadlist_obj = 0;
1145 
1146 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1147 			/*
1148 			 * Update next's unique to include blocks which
1149 			 * were previously shared by only this snapshot
1150 			 * and it.  Those blocks will be born after the
1151 			 * prev snap and before this snap, and will have
1152 			 * died after the next snap and before the one
1153 			 * after that (ie. be on the snap after next's
1154 			 * deadlist).
1155 			 *
1156 			 * XXX we're doing this long task with the
1157 			 * config lock held
1158 			 */
1159 			dsl_dataset_t *ds_after_next;
1160 
1161 			VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1162 			    ds_next->ds_phys->ds_next_snap_obj, NULL,
1163 			    DS_MODE_NONE, FTAG, &ds_after_next));
1164 			itor = 0;
1165 			while (bplist_iterate(&ds_after_next->ds_deadlist,
1166 			    &itor, &bp) == 0) {
1167 				if (bp.blk_birth >
1168 				    ds->ds_phys->ds_prev_snap_txg &&
1169 				    bp.blk_birth <=
1170 				    ds->ds_phys->ds_creation_txg) {
1171 					ds_next->ds_phys->ds_unique_bytes +=
1172 					    BP_GET_ASIZE(&bp);
1173 				}
1174 			}
1175 
1176 			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
1177 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1178 		} else {
1179 			/*
1180 			 * It would be nice to update the head dataset's
1181 			 * unique.  To do so we would have to traverse
1182 			 * it for blocks born after ds_prev, which is
1183 			 * pretty expensive just to maintain something
1184 			 * for debugging purposes.
1185 			 */
1186 			ASSERT3P(ds_next->ds_prev, ==, ds);
1187 			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
1188 			    ds_next);
1189 			if (ds_prev) {
1190 				VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1191 				    ds->ds_phys->ds_prev_snap_obj, NULL,
1192 				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
1193 			} else {
1194 				ds_next->ds_prev = NULL;
1195 			}
1196 		}
1197 		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
1198 
1199 		/*
1200 		 * NB: unique_bytes is not accurate for head objsets
1201 		 * because we don't update it when we delete the most
1202 		 * recent snapshot -- see above comment.
1203 		 */
1204 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1205 	} else {
1206 		/*
1207 		 * There's no next snapshot, so this is a head dataset.
1208 		 * Destroy the deadlist.  Unless it's a clone, the
1209 		 * deadlist should be empty.  (If it's a clone, it's
1210 		 * safe to ignore the deadlist contents.)
1211 		 */
1212 		struct killarg ka;
1213 
1214 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1215 		bplist_close(&ds->ds_deadlist);
1216 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1217 		ds->ds_phys->ds_deadlist_obj = 0;
1218 
1219 		/*
1220 		 * Free everything that we point to (that's born after
1221 		 * the previous snapshot, if we are a clone)
1222 		 *
1223 		 * XXX we're doing this long task with the config lock held
1224 		 */
1225 		ka.usedp = &used;
1226 		ka.compressedp = &compressed;
1227 		ka.uncompressedp = &uncompressed;
1228 		ka.zio = zio;
1229 		ka.tx = tx;
1230 		err = traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1231 		    ADVANCE_POST, kill_blkptr, &ka);
1232 		ASSERT3U(err, ==, 0);
1233 	}
1234 
1235 	err = zio_wait(zio);
1236 	ASSERT3U(err, ==, 0);
1237 
1238 	dsl_dir_diduse_space(dd, -used, -compressed, -uncompressed, tx);
1239 
1240 	if (ds->ds_phys->ds_snapnames_zapobj) {
1241 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1242 		ASSERT(err == 0);
1243 	}
1244 
1245 	if (dd->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1246 		/* Erase the link in the dataset */
1247 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
1248 		dd->dd_phys->dd_head_dataset_obj = 0;
1249 		/*
1250 		 * dsl_dir_sync_destroy() called us, they'll destroy
1251 		 * the dataset.
1252 		 */
1253 	} else {
1254 		/* remove from snapshot namespace */
1255 		dsl_dataset_t *ds_head;
1256 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1257 		    dd->dd_phys->dd_head_dataset_obj, NULL,
1258 		    DS_MODE_NONE, FTAG, &ds_head));
1259 #ifdef ZFS_DEBUG
1260 		{
1261 			uint64_t val;
1262 			err = zap_lookup(mos,
1263 			    ds_head->ds_phys->ds_snapnames_zapobj,
1264 			    snapname, 8, 1, &val);
1265 			ASSERT3U(err, ==, 0);
1266 			ASSERT3U(val, ==, obj);
1267 		}
1268 #endif
1269 		err = zap_remove(mos, ds_head->ds_phys->ds_snapnames_zapobj,
1270 		    snapname, tx);
1271 		ASSERT(err == 0);
1272 		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
1273 	}
1274 
1275 	if (ds_prev && ds->ds_prev != ds_prev)
1276 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1277 
1278 	err = dmu_object_free(mos, obj, tx);
1279 	ASSERT(err == 0);
1280 
1281 	/*
1282 	 * Close the objset with mode NONE, thus leaving it with
1283 	 * DOS_REF_MAX set, so that noone can access it.
1284 	 */
1285 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1286 
1287 	if (drop_lock)
1288 		rw_exit(&dp->dp_config_rwlock);
1289 	return (0);
1290 }
1291 
1292 int
1293 dsl_dataset_snapshot_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1294 {
1295 	const char *snapname = arg;
1296 	dsl_pool_t *dp = dd->dd_pool;
1297 	dmu_buf_t *dbuf;
1298 	dsl_dataset_phys_t *dsphys;
1299 	uint64_t dsobj, value;
1300 	objset_t *mos = dp->dp_meta_objset;
1301 	dsl_dataset_t *ds;
1302 	int err;
1303 
1304 	ASSERT(dmu_tx_is_syncing(tx));
1305 
1306 	if (dd->dd_phys->dd_head_dataset_obj == 0)
1307 		return (EINVAL);
1308 	err = dsl_dataset_open_obj(dp, dd->dd_phys->dd_head_dataset_obj, NULL,
1309 	    DS_MODE_NONE, FTAG, &ds);
1310 	if (err)
1311 		return (err);
1312 
1313 	err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1314 	    snapname, 8, 1, &value);
1315 	if (err == 0) {
1316 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1317 		return (EEXIST);
1318 	}
1319 	ASSERT(err == ENOENT);
1320 
1321 	/* The point of no (unsuccessful) return */
1322 
1323 	dprintf_dd(dd, "taking snapshot %s in txg %llu\n",
1324 	    snapname, tx->tx_txg);
1325 
1326 	spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1327 
1328 	rw_enter(&dp->dp_config_rwlock, RW_WRITER);
1329 
1330 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1331 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1332 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1333 	dmu_buf_will_dirty(dbuf, tx);
1334 	dsphys = dbuf->db_data;
1335 	dsphys->ds_dir_obj = dd->dd_object;
1336 	dsphys->ds_fsid_guid = unique_create();
1337 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
1338 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1339 	    sizeof (dsphys->ds_guid));
1340 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1341 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1342 	dsphys->ds_next_snap_obj = ds->ds_object;
1343 	dsphys->ds_num_children = 1;
1344 	dsphys->ds_creation_time = gethrestime_sec();
1345 	dsphys->ds_creation_txg = tx->tx_txg;
1346 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1347 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1348 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1349 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1350 	dsphys->ds_inconsistent = ds->ds_phys->ds_inconsistent;
1351 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1352 	dmu_buf_rele(dbuf, FTAG);
1353 
1354 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1355 		dsl_dataset_t *ds_prev;
1356 
1357 		VERIFY(0 == dsl_dataset_open_obj(dp,
1358 		    ds->ds_phys->ds_prev_snap_obj, NULL,
1359 		    DS_MODE_NONE, FTAG, &ds_prev));
1360 		ASSERT(ds_prev->ds_phys->ds_next_snap_obj ==
1361 		    ds->ds_object ||
1362 		    ds_prev->ds_phys->ds_num_children > 1);
1363 		if (ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1364 			dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1365 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1366 			    ds_prev->ds_phys->ds_creation_txg);
1367 			ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1368 		}
1369 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1370 	} else {
1371 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==, 0);
1372 	}
1373 
1374 	bplist_close(&ds->ds_deadlist);
1375 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1376 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, dsphys->ds_creation_txg);
1377 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1378 	ds->ds_phys->ds_prev_snap_txg = dsphys->ds_creation_txg;
1379 	ds->ds_phys->ds_unique_bytes = 0;
1380 	ds->ds_phys->ds_deadlist_obj =
1381 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1382 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1383 	    ds->ds_phys->ds_deadlist_obj));
1384 
1385 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1386 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1387 	    snapname, 8, 1, &dsobj, tx);
1388 	ASSERT(err == 0);
1389 
1390 	if (ds->ds_prev)
1391 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
1392 	VERIFY(0 == dsl_dataset_open_obj(dp,
1393 	    ds->ds_phys->ds_prev_snap_obj, snapname,
1394 	    DS_MODE_NONE, ds, &ds->ds_prev));
1395 
1396 	rw_exit(&dp->dp_config_rwlock);
1397 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1398 
1399 	return (0);
1400 }
1401 
1402 void
1403 dsl_dataset_sync(dsl_dataset_t *ds, dmu_tx_t *tx)
1404 {
1405 	ASSERT(dmu_tx_is_syncing(tx));
1406 	ASSERT(ds->ds_user_ptr != NULL);
1407 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1408 
1409 	dmu_objset_sync(ds->ds_user_ptr, tx);
1410 	dsl_dir_dirty(ds->ds_dir, tx);
1411 	bplist_close(&ds->ds_deadlist);
1412 
1413 	dmu_buf_rele(ds->ds_dbuf, ds);
1414 }
1415 
1416 void
1417 dsl_dataset_stats(dsl_dataset_t *ds, dmu_objset_stats_t *dds)
1418 {
1419 	/* fill in properties crap */
1420 	dsl_dir_stats(ds->ds_dir, dds);
1421 
1422 	if (ds->ds_phys->ds_num_children != 0) {
1423 		dds->dds_is_snapshot = TRUE;
1424 		dds->dds_num_clones = ds->ds_phys->ds_num_children - 1;
1425 	}
1426 
1427 	dds->dds_inconsistent = ds->ds_phys->ds_inconsistent;
1428 	dds->dds_last_txg = ds->ds_phys->ds_bp.blk_birth;
1429 
1430 	dds->dds_objects_used = ds->ds_phys->ds_bp.blk_fill;
1431 	dds->dds_objects_avail = DN_MAX_OBJECT - dds->dds_objects_used;
1432 
1433 	/* We override the dataset's creation time... they should be the same */
1434 	dds->dds_creation_time = ds->ds_phys->ds_creation_time;
1435 	dds->dds_creation_txg = ds->ds_phys->ds_creation_txg;
1436 	dds->dds_space_refd = ds->ds_phys->ds_used_bytes;
1437 	dds->dds_fsid_guid = ds->ds_phys->ds_fsid_guid;
1438 
1439 	if (ds->ds_phys->ds_next_snap_obj) {
1440 		/*
1441 		 * This is a snapshot; override the dd's space used with
1442 		 * our unique space
1443 		 */
1444 		dds->dds_space_used = ds->ds_phys->ds_unique_bytes;
1445 		dds->dds_compressed_bytes =
1446 		    ds->ds_phys->ds_compressed_bytes;
1447 		dds->dds_uncompressed_bytes =
1448 		    ds->ds_phys->ds_uncompressed_bytes;
1449 	}
1450 }
1451 
1452 dsl_pool_t *
1453 dsl_dataset_pool(dsl_dataset_t *ds)
1454 {
1455 	return (ds->ds_dir->dd_pool);
1456 }
1457 
1458 struct osrenamearg {
1459 	const char *oldname;
1460 	const char *newname;
1461 };
1462 
1463 static int
1464 dsl_dataset_snapshot_rename_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1465 {
1466 	struct osrenamearg *ora = arg;
1467 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1468 	dsl_dir_t *nds;
1469 	const char *tail;
1470 	int err;
1471 	dsl_dataset_t *snds, *fsds;
1472 	uint64_t val;
1473 
1474 	err = dsl_dataset_open_spa(dd->dd_pool->dp_spa, ora->oldname,
1475 	    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &snds);
1476 	if (err)
1477 		return (err);
1478 
1479 	if (snds->ds_dir != dd) {
1480 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1481 		return (EINVAL);
1482 	}
1483 
1484 	/* better be changing a snapshot */
1485 	if (snds->ds_phys->ds_next_snap_obj == 0) {
1486 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1487 		return (EINVAL);
1488 	}
1489 
1490 	/* new fs better exist */
1491 	err = dsl_dir_open_spa(dd->dd_pool->dp_spa, ora->newname,
1492 	    FTAG, &nds, &tail);
1493 	if (err) {
1494 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1495 		return (err);
1496 	}
1497 
1498 	dsl_dir_close(nds, FTAG);
1499 
1500 	/* new name better be in same fs */
1501 	if (nds != dd) {
1502 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1503 		return (EINVAL);
1504 	}
1505 
1506 	/* new name better be a snapshot */
1507 	if (tail == NULL || tail[0] != '@') {
1508 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1509 		return (EINVAL);
1510 	}
1511 
1512 	tail++;
1513 
1514 	err = dsl_dataset_open_obj(dd->dd_pool,
1515 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &fsds);
1516 	if (err) {
1517 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1518 		return (err);
1519 	}
1520 
1521 	/* new name better not be in use */
1522 	err = zap_lookup(mos, fsds->ds_phys->ds_snapnames_zapobj,
1523 	    tail, 8, 1, &val);
1524 	if (err != ENOENT) {
1525 		if (err == 0)
1526 			err = EEXIST;
1527 		dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1528 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1529 		return (EEXIST);
1530 	}
1531 
1532 	/* The point of no (unsuccessful) return */
1533 
1534 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_WRITER);
1535 	VERIFY(0 == dsl_dataset_get_snapname(snds));
1536 	err = zap_remove(mos, fsds->ds_phys->ds_snapnames_zapobj,
1537 	    snds->ds_snapname, tx);
1538 	ASSERT3U(err, ==, 0);
1539 	mutex_enter(&snds->ds_lock);
1540 	(void) strcpy(snds->ds_snapname, tail);
1541 	mutex_exit(&snds->ds_lock);
1542 	err = zap_add(mos, fsds->ds_phys->ds_snapnames_zapobj,
1543 	    snds->ds_snapname, 8, 1, &snds->ds_object, tx);
1544 	ASSERT3U(err, ==, 0);
1545 	rw_exit(&dd->dd_pool->dp_config_rwlock);
1546 
1547 	dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1548 	dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1549 	return (0);
1550 }
1551 
1552 #pragma weak dmu_objset_rename = dsl_dataset_rename
1553 int
1554 dsl_dataset_rename(const char *osname, const char *newname)
1555 {
1556 	dsl_dir_t *dd;
1557 	const char *tail;
1558 	struct osrenamearg ora;
1559 	int err;
1560 
1561 	err = dsl_dir_open(osname, FTAG, &dd, &tail);
1562 	if (err)
1563 		return (err);
1564 	if (tail == NULL) {
1565 		err = dsl_dir_sync_task(dd,
1566 		    dsl_dir_rename_sync, (void*)newname, 1<<12);
1567 		dsl_dir_close(dd, FTAG);
1568 		return (err);
1569 	}
1570 	if (tail[0] != '@') {
1571 		/* the name ended in a nonexistant component */
1572 		dsl_dir_close(dd, FTAG);
1573 		return (ENOENT);
1574 	}
1575 
1576 	ora.oldname = osname;
1577 	ora.newname = newname;
1578 
1579 	err = dsl_dir_sync_task(dd,
1580 	    dsl_dataset_snapshot_rename_sync, &ora, 1<<12);
1581 	dsl_dir_close(dd, FTAG);
1582 	return (err);
1583 }
1584