xref: /titanic_51/usr/src/uts/common/fs/zfs/dsl_dataset.c (revision 0523b0a4ec1f5ec136a264130012c7bb0ede01f2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu_objset.h>
29 #include <sys/dsl_dataset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dmu_traverse.h>
32 #include <sys/dmu_tx.h>
33 #include <sys/arc.h>
34 #include <sys/zio.h>
35 #include <sys/zap.h>
36 #include <sys/unique.h>
37 #include <sys/zfs_context.h>
38 
39 #define	DOS_REF_MAX	(1ULL << 62)
40 
41 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
42 
43 #define	BP_GET_UCSIZE(bp) \
44 	((BP_GET_LEVEL(bp) > 0 || dmu_ot[BP_GET_TYPE(bp)].ot_metadata) ? \
45 	BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp));
46 
47 /*
48  * We use weighted reference counts to express the various forms of exclusion
49  * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
50  * is DOS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
51  * This makes the exclusion logic simple: the total refcnt for all opens cannot
52  * exceed DOS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
53  * weight (DOS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
54  * just over half of the refcnt space, so there can't be more than one, but it
55  * can peacefully coexist with any number of STANDARD opens.
56  */
57 static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
58 	0,			/* DOS_MODE_NONE - invalid		*/
59 	1,			/* DOS_MODE_STANDARD - unlimited number	*/
60 	(DOS_REF_MAX >> 1) + 1,	/* DOS_MODE_PRIMARY - only one of these	*/
61 	DOS_REF_MAX		/* DOS_MODE_EXCLUSIVE - no other opens	*/
62 };
63 
64 
65 void
66 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
67 {
68 	int used = BP_GET_ASIZE(bp);
69 	int compressed = BP_GET_PSIZE(bp);
70 	int uncompressed = BP_GET_UCSIZE(bp);
71 
72 	dprintf_bp(bp, "born, ds=%p\n", ds);
73 
74 	ASSERT(dmu_tx_is_syncing(tx));
75 	/* It could have been compressed away to nothing */
76 	if (BP_IS_HOLE(bp))
77 		return;
78 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
79 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
80 	if (ds == NULL) {
81 		/*
82 		 * Account for the meta-objset space in its placeholder
83 		 * dsl_dir.
84 		 */
85 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
86 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
87 		    used, compressed, uncompressed, tx);
88 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
89 		return;
90 	}
91 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
92 	mutex_enter(&ds->ds_lock);
93 	ds->ds_phys->ds_used_bytes += used;
94 	ds->ds_phys->ds_compressed_bytes += compressed;
95 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
96 	ds->ds_phys->ds_unique_bytes += used;
97 	mutex_exit(&ds->ds_lock);
98 	dsl_dir_diduse_space(ds->ds_dir,
99 	    used, compressed, uncompressed, tx);
100 }
101 
102 void
103 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
104 {
105 	int used = BP_GET_ASIZE(bp);
106 	int compressed = BP_GET_PSIZE(bp);
107 	int uncompressed = BP_GET_UCSIZE(bp);
108 
109 	ASSERT(dmu_tx_is_syncing(tx));
110 	if (BP_IS_HOLE(bp))
111 		return;
112 
113 	ASSERT(used > 0);
114 	if (ds == NULL) {
115 		/*
116 		 * Account for the meta-objset space in its placeholder
117 		 * dataset.
118 		 */
119 		/* XXX this can fail, what do we do when it does? */
120 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
121 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
122 		bzero(bp, sizeof (blkptr_t));
123 
124 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir,
125 		    -used, -compressed, -uncompressed, tx);
126 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
127 		return;
128 	}
129 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
130 
131 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
132 
133 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
134 		dprintf_bp(bp, "freeing: %s", "");
135 		/* XXX check return code? */
136 		(void) arc_free(NULL, tx->tx_pool->dp_spa,
137 		    tx->tx_txg, bp, NULL, NULL, ARC_WAIT);
138 
139 		mutex_enter(&ds->ds_lock);
140 		/* XXX unique_bytes is not accurate for head datasets */
141 		/* ASSERT3U(ds->ds_phys->ds_unique_bytes, >=, used); */
142 		ds->ds_phys->ds_unique_bytes -= used;
143 		mutex_exit(&ds->ds_lock);
144 		dsl_dir_diduse_space(ds->ds_dir,
145 		    -used, -compressed, -uncompressed, tx);
146 	} else {
147 		dprintf_bp(bp, "putting on dead list: %s", "");
148 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
149 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
150 		if (ds->ds_phys->ds_prev_snap_obj != 0) {
151 			ASSERT3U(ds->ds_prev->ds_object, ==,
152 			    ds->ds_phys->ds_prev_snap_obj);
153 			ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
154 			if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
155 			    ds->ds_object &&
156 			    bp->blk_birth >
157 			    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
158 				dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
159 				mutex_enter(&ds->ds_prev->ds_lock);
160 				ds->ds_prev->ds_phys->ds_unique_bytes +=
161 				    used;
162 				mutex_exit(&ds->ds_prev->ds_lock);
163 			}
164 		}
165 	}
166 	bzero(bp, sizeof (blkptr_t));
167 	mutex_enter(&ds->ds_lock);
168 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
169 	ds->ds_phys->ds_used_bytes -= used;
170 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
171 	ds->ds_phys->ds_compressed_bytes -= compressed;
172 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
173 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
174 	mutex_exit(&ds->ds_lock);
175 }
176 
177 uint64_t
178 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
179 {
180 	uint64_t txg;
181 	dsl_dir_t *dd;
182 
183 	if (ds == NULL)
184 		return (0);
185 	/*
186 	 * The snapshot creation could fail, but that would cause an
187 	 * incorrect FALSE return, which would only result in an
188 	 * overestimation of the amount of space that an operation would
189 	 * consume, which is OK.
190 	 *
191 	 * There's also a small window where we could miss a pending
192 	 * snapshot, because we could set the sync task in the quiescing
193 	 * phase.  So this should only be used as a guess.
194 	 */
195 	dd = ds->ds_dir;
196 	mutex_enter(&dd->dd_lock);
197 	if (dd->dd_sync_func == dsl_dataset_snapshot_sync)
198 		txg = dd->dd_sync_txg;
199 	else
200 		txg = ds->ds_phys->ds_prev_snap_txg;
201 	mutex_exit(&dd->dd_lock);
202 
203 	return (txg);
204 }
205 
206 int
207 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
208 {
209 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
210 }
211 
212 /* ARGSUSED */
213 static void
214 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
215 {
216 	dsl_dataset_t *ds = dsv;
217 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
218 
219 	/* open_refcount == DOS_REF_MAX when deleting */
220 	ASSERT(ds->ds_open_refcount == 0 ||
221 	    ds->ds_open_refcount == DOS_REF_MAX);
222 
223 	dprintf_ds(ds, "evicting %s\n", "");
224 
225 	unique_remove(ds->ds_phys->ds_fsid_guid);
226 
227 	if (ds->ds_user_ptr != NULL)
228 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
229 
230 	if (ds->ds_prev) {
231 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
232 		ds->ds_prev = NULL;
233 	}
234 
235 	bplist_close(&ds->ds_deadlist);
236 	dsl_dir_close(ds->ds_dir, ds);
237 
238 	if (list_link_active(&ds->ds_synced_link))
239 		list_remove(&dp->dp_synced_objsets, ds);
240 
241 	kmem_free(ds, sizeof (dsl_dataset_t));
242 }
243 
244 static int
245 dsl_dataset_get_snapname(dsl_dataset_t *ds)
246 {
247 	dsl_dataset_phys_t *headphys;
248 	int err;
249 	dmu_buf_t *headdbuf;
250 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
251 	objset_t *mos = dp->dp_meta_objset;
252 
253 	if (ds->ds_snapname[0])
254 		return (0);
255 	if (ds->ds_phys->ds_next_snap_obj == 0)
256 		return (0);
257 
258 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
259 	    FTAG, &headdbuf);
260 	if (err)
261 		return (err);
262 	headphys = headdbuf->db_data;
263 	err = zap_value_search(dp->dp_meta_objset,
264 	    headphys->ds_snapnames_zapobj, ds->ds_object, ds->ds_snapname);
265 	dmu_buf_rele(headdbuf, FTAG);
266 	return (err);
267 }
268 
269 int
270 dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
271     int mode, void *tag, dsl_dataset_t **dsp)
272 {
273 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
274 	objset_t *mos = dp->dp_meta_objset;
275 	dmu_buf_t *dbuf;
276 	dsl_dataset_t *ds;
277 	int err;
278 
279 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
280 	    dsl_pool_sync_context(dp));
281 
282 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
283 	if (err)
284 		return (err);
285 	ds = dmu_buf_get_user(dbuf);
286 	if (ds == NULL) {
287 		dsl_dataset_t *winner;
288 
289 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
290 		ds->ds_dbuf = dbuf;
291 		ds->ds_object = dsobj;
292 		ds->ds_phys = dbuf->db_data;
293 
294 		err = bplist_open(&ds->ds_deadlist,
295 		    mos, ds->ds_phys->ds_deadlist_obj);
296 		if (err == 0) {
297 			err = dsl_dir_open_obj(dp,
298 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
299 		}
300 		if (err) {
301 			/*
302 			 * we don't really need to close the blist if we
303 			 * just opened it.
304 			 */
305 			kmem_free(ds, sizeof (dsl_dataset_t));
306 			dmu_buf_rele(dbuf, tag);
307 			return (err);
308 		}
309 
310 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
311 			ds->ds_snapname[0] = '\0';
312 			if (ds->ds_phys->ds_prev_snap_obj) {
313 				err = dsl_dataset_open_obj(dp,
314 				    ds->ds_phys->ds_prev_snap_obj, NULL,
315 				    DS_MODE_NONE, ds, &ds->ds_prev);
316 			}
317 		} else {
318 			if (snapname) {
319 #ifdef ZFS_DEBUG
320 				dsl_dataset_phys_t *headphys;
321 				dmu_buf_t *headdbuf;
322 				err = dmu_bonus_hold(mos,
323 				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
324 				    FTAG, &headdbuf);
325 				if (err == 0) {
326 					headphys = headdbuf->db_data;
327 					uint64_t foundobj;
328 					err = zap_lookup(dp->dp_meta_objset,
329 					    headphys->ds_snapnames_zapobj,
330 					    snapname, sizeof (foundobj), 1,
331 					    &foundobj);
332 					ASSERT3U(foundobj, ==, dsobj);
333 					dmu_buf_rele(headdbuf, FTAG);
334 				}
335 #endif
336 				(void) strcat(ds->ds_snapname, snapname);
337 			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
338 				err = dsl_dataset_get_snapname(ds);
339 			}
340 		}
341 
342 		if (err == 0) {
343 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
344 			    dsl_dataset_evict);
345 		}
346 		if (err || winner) {
347 			bplist_close(&ds->ds_deadlist);
348 			if (ds->ds_prev) {
349 				dsl_dataset_close(ds->ds_prev,
350 				    DS_MODE_NONE, ds);
351 			}
352 			dsl_dir_close(ds->ds_dir, ds);
353 			kmem_free(ds, sizeof (dsl_dataset_t));
354 			if (err) {
355 				dmu_buf_rele(dbuf, tag);
356 				return (err);
357 			}
358 			ds = winner;
359 		} else {
360 			uint64_t new =
361 			    unique_insert(ds->ds_phys->ds_fsid_guid);
362 			if (new != ds->ds_phys->ds_fsid_guid) {
363 				/* XXX it won't necessarily be synced... */
364 				ds->ds_phys->ds_fsid_guid = new;
365 			}
366 		}
367 	}
368 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
369 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
370 
371 	mutex_enter(&ds->ds_lock);
372 	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
373 	    ds->ds_phys->ds_restoring && !DS_MODE_IS_RESTORE(mode)) ||
374 	    (ds->ds_open_refcount + weight > DOS_REF_MAX)) {
375 		mutex_exit(&ds->ds_lock);
376 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
377 		return (EBUSY);
378 	}
379 	ds->ds_open_refcount += weight;
380 	mutex_exit(&ds->ds_lock);
381 
382 	*dsp = ds;
383 	return (0);
384 }
385 
386 int
387 dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
388     void *tag, dsl_dataset_t **dsp)
389 {
390 	dsl_dir_t *dd;
391 	dsl_pool_t *dp;
392 	const char *tail;
393 	uint64_t obj;
394 	dsl_dataset_t *ds = NULL;
395 	int err = 0;
396 
397 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
398 	if (err)
399 		return (err);
400 
401 	dp = dd->dd_pool;
402 	obj = dd->dd_phys->dd_head_dataset_obj;
403 	rw_enter(&dp->dp_config_rwlock, RW_READER);
404 	if (obj == 0) {
405 		/* A dataset with no associated objset */
406 		err = ENOENT;
407 		goto out;
408 	}
409 
410 	if (tail != NULL) {
411 		objset_t *mos = dp->dp_meta_objset;
412 
413 		err = dsl_dataset_open_obj(dp, obj, NULL,
414 		    DS_MODE_NONE, tag, &ds);
415 		if (err)
416 			goto out;
417 		obj = ds->ds_phys->ds_snapnames_zapobj;
418 		dsl_dataset_close(ds, DS_MODE_NONE, tag);
419 		ds = NULL;
420 
421 		if (tail[0] != '@') {
422 			err = ENOENT;
423 			goto out;
424 		}
425 		tail++;
426 
427 		/* Look for a snapshot */
428 		if (!DS_MODE_IS_READONLY(mode)) {
429 			err = EROFS;
430 			goto out;
431 		}
432 		dprintf("looking for snapshot '%s'\n", tail);
433 		err = zap_lookup(mos, obj, tail, 8, 1, &obj);
434 		if (err)
435 			goto out;
436 	}
437 	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
438 
439 out:
440 	rw_exit(&dp->dp_config_rwlock);
441 	dsl_dir_close(dd, FTAG);
442 
443 	ASSERT3U((err == 0), ==, (ds != NULL));
444 	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
445 
446 	*dsp = ds;
447 	return (err);
448 }
449 
450 int
451 dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
452 {
453 	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
454 }
455 
456 void
457 dsl_dataset_name(dsl_dataset_t *ds, char *name)
458 {
459 	if (ds == NULL) {
460 		(void) strcpy(name, "mos");
461 	} else {
462 		dsl_dir_name(ds->ds_dir, name);
463 		VERIFY(0 == dsl_dataset_get_snapname(ds));
464 		if (ds->ds_snapname[0]) {
465 			(void) strcat(name, "@");
466 			if (!MUTEX_HELD(&ds->ds_lock)) {
467 				/*
468 				 * We use a "recursive" mutex so that we
469 				 * can call dprintf_ds() with ds_lock held.
470 				 */
471 				mutex_enter(&ds->ds_lock);
472 				(void) strcat(name, ds->ds_snapname);
473 				mutex_exit(&ds->ds_lock);
474 			} else {
475 				(void) strcat(name, ds->ds_snapname);
476 			}
477 		}
478 	}
479 }
480 
481 void
482 dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
483 {
484 	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
485 	mutex_enter(&ds->ds_lock);
486 	ASSERT3U(ds->ds_open_refcount, >=, weight);
487 	ds->ds_open_refcount -= weight;
488 	dprintf_ds(ds, "closing mode %u refcount now 0x%llx\n",
489 	    mode, ds->ds_open_refcount);
490 	mutex_exit(&ds->ds_lock);
491 
492 	dmu_buf_rele(ds->ds_dbuf, tag);
493 }
494 
495 void
496 dsl_dataset_create_root(dsl_pool_t *dp, uint64_t *ddobjp, dmu_tx_t *tx)
497 {
498 	objset_t *mos = dp->dp_meta_objset;
499 	dmu_buf_t *dbuf;
500 	dsl_dataset_phys_t *dsphys;
501 	dsl_dataset_t *ds;
502 	uint64_t dsobj;
503 	dsl_dir_t *dd;
504 
505 	dsl_dir_create_root(mos, ddobjp, tx);
506 	VERIFY(0 == dsl_dir_open_obj(dp, *ddobjp, NULL, FTAG, &dd));
507 
508 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
509 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
510 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
511 	dmu_buf_will_dirty(dbuf, tx);
512 	dsphys = dbuf->db_data;
513 	dsphys->ds_dir_obj = dd->dd_object;
514 	dsphys->ds_fsid_guid = unique_create();
515 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
516 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
517 	    sizeof (dsphys->ds_guid));
518 	dsphys->ds_snapnames_zapobj =
519 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
520 	dsphys->ds_creation_time = gethrestime_sec();
521 	dsphys->ds_creation_txg = tx->tx_txg;
522 	dsphys->ds_deadlist_obj =
523 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
524 	dmu_buf_rele(dbuf, FTAG);
525 
526 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
527 	dd->dd_phys->dd_head_dataset_obj = dsobj;
528 	dsl_dir_close(dd, FTAG);
529 
530 	VERIFY(0 ==
531 	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
532 	(void) dmu_objset_create_impl(dp->dp_spa, ds, DMU_OST_ZFS, tx);
533 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
534 }
535 
536 int
537 dsl_dataset_create_sync(dsl_dir_t *pds, const char *fullname,
538     const char *lastname, dsl_dataset_t *clone_parent, dmu_tx_t *tx)
539 {
540 	int err;
541 	dsl_pool_t *dp = pds->dd_pool;
542 	dmu_buf_t *dbuf;
543 	dsl_dataset_phys_t *dsphys;
544 	uint64_t dsobj;
545 	objset_t *mos = dp->dp_meta_objset;
546 	dsl_dir_t *dd;
547 
548 	if (clone_parent != NULL) {
549 		/*
550 		 * You can't clone across pools.
551 		 */
552 		if (clone_parent->ds_dir->dd_pool != dp)
553 			return (EXDEV);
554 
555 		/*
556 		 * You can only clone snapshots, not the head datasets.
557 		 */
558 		if (clone_parent->ds_phys->ds_num_children == 0)
559 			return (EINVAL);
560 	}
561 
562 	ASSERT(lastname[0] != '@');
563 	ASSERT(dmu_tx_is_syncing(tx));
564 
565 	err = dsl_dir_create_sync(pds, lastname, tx);
566 	if (err)
567 		return (err);
568 	VERIFY(0 == dsl_dir_open_spa(dp->dp_spa, fullname, FTAG, &dd, NULL));
569 
570 	/* This is the point of no (unsuccessful) return */
571 
572 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
573 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
574 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
575 	dmu_buf_will_dirty(dbuf, tx);
576 	dsphys = dbuf->db_data;
577 	dsphys->ds_dir_obj = dd->dd_object;
578 	dsphys->ds_fsid_guid = unique_create();
579 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
580 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
581 	    sizeof (dsphys->ds_guid));
582 	dsphys->ds_snapnames_zapobj =
583 	    zap_create(mos, DMU_OT_DSL_DS_SNAP_MAP, DMU_OT_NONE, 0, tx);
584 	dsphys->ds_creation_time = gethrestime_sec();
585 	dsphys->ds_creation_txg = tx->tx_txg;
586 	dsphys->ds_deadlist_obj =
587 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
588 	if (clone_parent) {
589 		dsphys->ds_prev_snap_obj = clone_parent->ds_object;
590 		dsphys->ds_prev_snap_txg =
591 		    clone_parent->ds_phys->ds_creation_txg;
592 		dsphys->ds_used_bytes =
593 		    clone_parent->ds_phys->ds_used_bytes;
594 		dsphys->ds_compressed_bytes =
595 		    clone_parent->ds_phys->ds_compressed_bytes;
596 		dsphys->ds_uncompressed_bytes =
597 		    clone_parent->ds_phys->ds_uncompressed_bytes;
598 		dsphys->ds_bp = clone_parent->ds_phys->ds_bp;
599 
600 		dmu_buf_will_dirty(clone_parent->ds_dbuf, tx);
601 		clone_parent->ds_phys->ds_num_children++;
602 
603 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
604 		dd->dd_phys->dd_clone_parent_obj = clone_parent->ds_object;
605 	}
606 	dmu_buf_rele(dbuf, FTAG);
607 
608 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
609 	dd->dd_phys->dd_head_dataset_obj = dsobj;
610 	dsl_dir_close(dd, FTAG);
611 
612 	return (0);
613 }
614 
615 
616 int
617 dsl_dataset_destroy(const char *name)
618 {
619 	int err;
620 	dsl_pool_t *dp;
621 	dsl_dir_t *dd;
622 	const char *tail;
623 
624 	err = dsl_dir_open(name, FTAG, &dd, &tail);
625 	if (err)
626 		return (err);
627 
628 	dp = dd->dd_pool;
629 	if (tail != NULL) {
630 		if (tail[0] != '@') {
631 			dsl_dir_close(dd, FTAG);
632 			return (ENOENT);
633 		}
634 		tail++;
635 		/* Just blow away the snapshot */
636 		do {
637 			txg_wait_synced(dp, 0);
638 			err = dsl_dir_sync_task(dd,
639 			    dsl_dataset_destroy_sync, (void*)tail, 0);
640 		} while (err == EAGAIN);
641 		dsl_dir_close(dd, FTAG);
642 	} else {
643 		char buf[MAXNAMELEN];
644 		char *cp;
645 
646 		dsl_dir_t *pds;
647 		if (dd->dd_phys->dd_parent_obj == 0) {
648 			dsl_dir_close(dd, FTAG);
649 			return (EINVAL);
650 		}
651 		/*
652 		 * Make sure it's not dirty before we destroy it.
653 		 */
654 		txg_wait_synced(dd->dd_pool, 0);
655 		/*
656 		 * Blow away the dsl_dir + head dataset.
657 		 * dsl_dir_destroy_sync() will call
658 		 * dsl_dataset_destroy_sync() to destroy the head dataset.
659 		 */
660 		rw_enter(&dp->dp_config_rwlock, RW_READER);
661 		err = dsl_dir_open_obj(dd->dd_pool,
662 		    dd->dd_phys->dd_parent_obj, NULL, FTAG, &pds);
663 		dsl_dir_close(dd, FTAG);
664 		rw_exit(&dp->dp_config_rwlock);
665 		if (err)
666 			return (err);
667 
668 		(void) strcpy(buf, name);
669 		cp = strrchr(buf, '/') + 1;
670 		ASSERT(cp[0] != '\0');
671 		do {
672 			txg_wait_synced(dp, 0);
673 			err = dsl_dir_sync_task(pds,
674 			    dsl_dir_destroy_sync, cp, 0);
675 		} while (err == EAGAIN);
676 		dsl_dir_close(pds, FTAG);
677 	}
678 
679 	return (err);
680 }
681 
682 int
683 dsl_dataset_rollback(const char *name)
684 {
685 	int err;
686 	dsl_dir_t *dd;
687 	const char *tail;
688 
689 	err = dsl_dir_open(name, FTAG, &dd, &tail);
690 	if (err)
691 		return (err);
692 
693 	if (tail != NULL) {
694 		dsl_dir_close(dd, FTAG);
695 		return (EINVAL);
696 	}
697 	do {
698 		txg_wait_synced(dd->dd_pool, 0);
699 		err = dsl_dir_sync_task(dd,
700 		    dsl_dataset_rollback_sync, NULL, 0);
701 	} while (err == EAGAIN);
702 	dsl_dir_close(dd, FTAG);
703 
704 	return (err);
705 }
706 
707 void *
708 dsl_dataset_set_user_ptr(dsl_dataset_t *ds,
709     void *p, dsl_dataset_evict_func_t func)
710 {
711 	void *old;
712 
713 	mutex_enter(&ds->ds_lock);
714 	old = ds->ds_user_ptr;
715 	if (old == NULL) {
716 		ds->ds_user_ptr = p;
717 		ds->ds_user_evict_func = func;
718 	}
719 	mutex_exit(&ds->ds_lock);
720 	return (old);
721 }
722 
723 void *
724 dsl_dataset_get_user_ptr(dsl_dataset_t *ds)
725 {
726 	return (ds->ds_user_ptr);
727 }
728 
729 
730 void
731 dsl_dataset_get_blkptr(dsl_dataset_t *ds, blkptr_t *bp)
732 {
733 	*bp = ds->ds_phys->ds_bp;
734 }
735 
736 void
737 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
738 {
739 	ASSERT(dmu_tx_is_syncing(tx));
740 	/* If it's the meta-objset, set dp_meta_rootbp */
741 	if (ds == NULL) {
742 		tx->tx_pool->dp_meta_rootbp = *bp;
743 	} else {
744 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
745 		ds->ds_phys->ds_bp = *bp;
746 	}
747 }
748 
749 spa_t *
750 dsl_dataset_get_spa(dsl_dataset_t *ds)
751 {
752 	return (ds->ds_dir->dd_pool->dp_spa);
753 }
754 
755 void
756 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
757 {
758 	dsl_pool_t *dp;
759 
760 	if (ds == NULL) /* this is the meta-objset */
761 		return;
762 
763 	ASSERT(ds->ds_user_ptr != NULL);
764 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
765 
766 	dp = ds->ds_dir->dd_pool;
767 
768 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
769 		/* up the hold count until we can be written out */
770 		dmu_buf_add_ref(ds->ds_dbuf, ds);
771 	}
772 }
773 
774 struct killarg {
775 	uint64_t *usedp;
776 	uint64_t *compressedp;
777 	uint64_t *uncompressedp;
778 	zio_t *zio;
779 	dmu_tx_t *tx;
780 };
781 
782 static int
783 kill_blkptr(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
784 {
785 	struct killarg *ka = arg;
786 	blkptr_t *bp = &bc->bc_blkptr;
787 
788 	ASSERT3U(bc->bc_errno, ==, 0);
789 
790 	/*
791 	 * Since this callback is not called concurrently, no lock is
792 	 * needed on the accounting values.
793 	 */
794 	*ka->usedp += BP_GET_ASIZE(bp);
795 	*ka->compressedp += BP_GET_PSIZE(bp);
796 	*ka->uncompressedp += BP_GET_UCSIZE(bp);
797 	/* XXX check for EIO? */
798 	(void) arc_free(ka->zio, spa, ka->tx->tx_txg, bp, NULL, NULL,
799 	    ARC_NOWAIT);
800 	return (0);
801 }
802 
803 /* ARGSUSED */
804 int
805 dsl_dataset_rollback_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
806 {
807 	objset_t *mos = dd->dd_pool->dp_meta_objset;
808 	dsl_dataset_t *ds;
809 	int err;
810 
811 	if (dd->dd_phys->dd_head_dataset_obj == 0)
812 		return (EINVAL);
813 	err = dsl_dataset_open_obj(dd->dd_pool,
814 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &ds);
815 	if (err)
816 		return (err);
817 
818 	if (ds->ds_phys->ds_prev_snap_txg == 0) {
819 		/*
820 		 * There's no previous snapshot.  I suppose we could
821 		 * roll it back to being empty (and re-initialize the
822 		 * upper (ZPL) layer).  But for now there's no way to do
823 		 * this via the user interface.
824 		 */
825 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
826 		return (EINVAL);
827 	}
828 
829 	mutex_enter(&ds->ds_lock);
830 	if (ds->ds_open_refcount > 0) {
831 		mutex_exit(&ds->ds_lock);
832 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
833 		return (EBUSY);
834 	}
835 
836 	/*
837 	 * If we made changes this txg, traverse_dsl_dataset won't find
838 	 * them.  Try again.
839 	 */
840 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
841 		mutex_exit(&ds->ds_lock);
842 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
843 		return (EAGAIN);
844 	}
845 
846 	/* THE POINT OF NO (unsuccessful) RETURN */
847 	ds->ds_open_refcount = DOS_REF_MAX;
848 	mutex_exit(&ds->ds_lock);
849 
850 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
851 
852 	/* Zero out the deadlist. */
853 	dprintf("old deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
854 	bplist_close(&ds->ds_deadlist);
855 	bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
856 	ds->ds_phys->ds_deadlist_obj =
857 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
858 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
859 	    ds->ds_phys->ds_deadlist_obj));
860 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
861 
862 	{
863 		/* Free blkptrs that we gave birth to */
864 		zio_t *zio;
865 		uint64_t used = 0, compressed = 0, uncompressed = 0;
866 		struct killarg ka;
867 
868 		zio = zio_root(tx->tx_pool->dp_spa, NULL, NULL,
869 		    ZIO_FLAG_MUSTSUCCEED);
870 		ka.usedp = &used;
871 		ka.compressedp = &compressed;
872 		ka.uncompressedp = &uncompressed;
873 		ka.zio = zio;
874 		ka.tx = tx;
875 		(void) traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
876 		    ADVANCE_POST, kill_blkptr, &ka);
877 		(void) zio_wait(zio);
878 
879 		dsl_dir_diduse_space(dd,
880 		    -used, -compressed, -uncompressed, tx);
881 	}
882 
883 	/* Change our contents to that of the prev snapshot (finally!) */
884 	ASSERT3U(ds->ds_prev->ds_object, ==, ds->ds_phys->ds_prev_snap_obj);
885 	ds->ds_phys->ds_bp = ds->ds_prev->ds_phys->ds_bp;
886 	ds->ds_phys->ds_used_bytes = ds->ds_prev->ds_phys->ds_used_bytes;
887 	ds->ds_phys->ds_compressed_bytes =
888 	    ds->ds_prev->ds_phys->ds_compressed_bytes;
889 	ds->ds_phys->ds_uncompressed_bytes =
890 	    ds->ds_prev->ds_phys->ds_uncompressed_bytes;
891 	ds->ds_phys->ds_restoring = ds->ds_prev->ds_phys->ds_restoring;
892 	ds->ds_phys->ds_unique_bytes = 0;
893 
894 	dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
895 	ds->ds_prev->ds_phys->ds_unique_bytes = 0;
896 
897 	dprintf("new deadlist obj = %llx\n", ds->ds_phys->ds_deadlist_obj);
898 	ds->ds_open_refcount = 0;
899 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
900 
901 	return (0);
902 }
903 
904 int
905 dsl_dataset_destroy_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
906 {
907 	const char *snapname = arg;
908 	uint64_t used = 0, compressed = 0, uncompressed = 0;
909 	blkptr_t bp;
910 	zio_t *zio;
911 	int err;
912 	int after_branch_point = FALSE;
913 	int drop_lock = FALSE;
914 	dsl_pool_t *dp = dd->dd_pool;
915 	objset_t *mos = dp->dp_meta_objset;
916 	dsl_dataset_t *ds, *ds_prev = NULL;
917 	uint64_t obj;
918 
919 	if (dd->dd_phys->dd_head_dataset_obj == 0)
920 		return (EINVAL);
921 
922 	if (!RW_WRITE_HELD(&dp->dp_config_rwlock)) {
923 		rw_enter(&dp->dp_config_rwlock, RW_WRITER);
924 		drop_lock = TRUE;
925 	}
926 
927 	err = dsl_dataset_open_obj(dd->dd_pool,
928 	    dd->dd_phys->dd_head_dataset_obj, NULL,
929 	    snapname ? DS_MODE_NONE : DS_MODE_EXCLUSIVE, FTAG, &ds);
930 
931 	if (err == 0 && snapname) {
932 		err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
933 		    snapname, 8, 1, &obj);
934 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
935 		if (err == 0) {
936 			err = dsl_dataset_open_obj(dd->dd_pool, obj, NULL,
937 			    DS_MODE_EXCLUSIVE, FTAG, &ds);
938 		}
939 	}
940 	if (err) {
941 		if (drop_lock)
942 			rw_exit(&dp->dp_config_rwlock);
943 		return (err);
944 	}
945 
946 	obj = ds->ds_object;
947 
948 	/* Can't delete a branch point. */
949 	if (ds->ds_phys->ds_num_children > 1) {
950 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
951 		if (drop_lock)
952 			rw_exit(&dp->dp_config_rwlock);
953 		return (EINVAL);
954 	}
955 
956 	/*
957 	 * Can't delete a head dataset if there are snapshots of it.
958 	 * (Except if the only snapshots are from the branch we cloned
959 	 * from.)
960 	 */
961 	if (ds->ds_prev != NULL &&
962 	    ds->ds_prev->ds_phys->ds_next_snap_obj == obj) {
963 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
964 		if (drop_lock)
965 			rw_exit(&dp->dp_config_rwlock);
966 		return (EINVAL);
967 	}
968 
969 	/*
970 	 * If we made changes this txg, traverse_dsl_dataset won't find
971 	 * them.  Try again.
972 	 */
973 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg) {
974 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
975 		if (drop_lock)
976 			rw_exit(&dp->dp_config_rwlock);
977 		return (EAGAIN);
978 	}
979 
980 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
981 		if (ds->ds_prev) {
982 			ds_prev = ds->ds_prev;
983 		} else {
984 			err = dsl_dataset_open_obj(dd->dd_pool,
985 			    ds->ds_phys->ds_prev_snap_obj, NULL,
986 			    DS_MODE_NONE, FTAG, &ds_prev);
987 			if (err) {
988 				dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
989 				if (drop_lock)
990 					rw_exit(&dp->dp_config_rwlock);
991 				return (err);
992 			}
993 		}
994 		after_branch_point =
995 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
996 
997 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
998 		if (after_branch_point &&
999 		    ds->ds_phys->ds_next_snap_obj == 0) {
1000 			/* This clone is toast. */
1001 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
1002 			ds_prev->ds_phys->ds_num_children--;
1003 		} else if (!after_branch_point) {
1004 			ds_prev->ds_phys->ds_next_snap_obj =
1005 			    ds->ds_phys->ds_next_snap_obj;
1006 		}
1007 	}
1008 
1009 	/* THE POINT OF NO (unsuccessful) RETURN */
1010 
1011 	ASSERT3P(tx->tx_pool, ==, dd->dd_pool);
1012 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
1013 
1014 	if (ds->ds_phys->ds_next_snap_obj != 0) {
1015 		dsl_dataset_t *ds_next;
1016 		uint64_t itor = 0;
1017 
1018 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1019 
1020 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1021 		    ds->ds_phys->ds_next_snap_obj, NULL,
1022 		    DS_MODE_NONE, FTAG, &ds_next));
1023 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
1024 
1025 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
1026 		ds_next->ds_phys->ds_prev_snap_obj =
1027 		    ds->ds_phys->ds_prev_snap_obj;
1028 		ds_next->ds_phys->ds_prev_snap_txg =
1029 		    ds->ds_phys->ds_prev_snap_txg;
1030 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1031 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
1032 
1033 		/*
1034 		 * Transfer to our deadlist (which will become next's
1035 		 * new deadlist) any entries from next's current
1036 		 * deadlist which were born before prev, and free the
1037 		 * other entries.
1038 		 *
1039 		 * XXX we're doing this long task with the config lock held
1040 		 */
1041 		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
1042 		    &bp) == 0) {
1043 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
1044 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
1045 				    &bp, tx));
1046 				if (ds_prev && !after_branch_point &&
1047 				    bp.blk_birth >
1048 				    ds_prev->ds_phys->ds_prev_snap_txg) {
1049 					ds_prev->ds_phys->ds_unique_bytes +=
1050 					    BP_GET_ASIZE(&bp);
1051 				}
1052 			} else {
1053 				used += BP_GET_ASIZE(&bp);
1054 				compressed += BP_GET_PSIZE(&bp);
1055 				uncompressed += BP_GET_UCSIZE(&bp);
1056 				/* XXX check return value? */
1057 				(void) arc_free(zio, dp->dp_spa, tx->tx_txg,
1058 				    &bp, NULL, NULL, ARC_NOWAIT);
1059 			}
1060 		}
1061 
1062 		/* free next's deadlist */
1063 		bplist_close(&ds_next->ds_deadlist);
1064 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
1065 
1066 		/* set next's deadlist to our deadlist */
1067 		ds_next->ds_phys->ds_deadlist_obj =
1068 		    ds->ds_phys->ds_deadlist_obj;
1069 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
1070 		    ds_next->ds_phys->ds_deadlist_obj));
1071 		ds->ds_phys->ds_deadlist_obj = 0;
1072 
1073 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
1074 			/*
1075 			 * Update next's unique to include blocks which
1076 			 * were previously shared by only this snapshot
1077 			 * and it.  Those blocks will be born after the
1078 			 * prev snap and before this snap, and will have
1079 			 * died after the next snap and before the one
1080 			 * after that (ie. be on the snap after next's
1081 			 * deadlist).
1082 			 *
1083 			 * XXX we're doing this long task with the
1084 			 * config lock held
1085 			 */
1086 			dsl_dataset_t *ds_after_next;
1087 
1088 			VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1089 			    ds_next->ds_phys->ds_next_snap_obj, NULL,
1090 			    DS_MODE_NONE, FTAG, &ds_after_next));
1091 			itor = 0;
1092 			while (bplist_iterate(&ds_after_next->ds_deadlist,
1093 			    &itor, &bp) == 0) {
1094 				if (bp.blk_birth >
1095 				    ds->ds_phys->ds_prev_snap_txg &&
1096 				    bp.blk_birth <=
1097 				    ds->ds_phys->ds_creation_txg) {
1098 					ds_next->ds_phys->ds_unique_bytes +=
1099 					    BP_GET_ASIZE(&bp);
1100 				}
1101 			}
1102 
1103 			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
1104 			ASSERT3P(ds_next->ds_prev, ==, NULL);
1105 		} else {
1106 			/*
1107 			 * It would be nice to update the head dataset's
1108 			 * unique.  To do so we would have to traverse
1109 			 * it for blocks born after ds_prev, which is
1110 			 * pretty expensive just to maintain something
1111 			 * for debugging purposes.
1112 			 */
1113 			ASSERT3P(ds_next->ds_prev, ==, ds);
1114 			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
1115 			    ds_next);
1116 			if (ds_prev) {
1117 				VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1118 				    ds->ds_phys->ds_prev_snap_obj, NULL,
1119 				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
1120 			} else {
1121 				ds_next->ds_prev = NULL;
1122 			}
1123 		}
1124 		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
1125 
1126 		/*
1127 		 * NB: unique_bytes is not accurate for head objsets
1128 		 * because we don't update it when we delete the most
1129 		 * recent snapshot -- see above comment.
1130 		 */
1131 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
1132 	} else {
1133 		/*
1134 		 * There's no next snapshot, so this is a head dataset.
1135 		 * Destroy the deadlist.  Unless it's a clone, the
1136 		 * deadlist should be empty.  (If it's a clone, it's
1137 		 * safe to ignore the deadlist contents.)
1138 		 */
1139 		struct killarg ka;
1140 
1141 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
1142 		bplist_close(&ds->ds_deadlist);
1143 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
1144 		ds->ds_phys->ds_deadlist_obj = 0;
1145 
1146 		/*
1147 		 * Free everything that we point to (that's born after
1148 		 * the previous snapshot, if we are a clone)
1149 		 *
1150 		 * XXX we're doing this long task with the config lock held
1151 		 */
1152 		ka.usedp = &used;
1153 		ka.compressedp = &compressed;
1154 		ka.uncompressedp = &uncompressed;
1155 		ka.zio = zio;
1156 		ka.tx = tx;
1157 		err = traverse_dsl_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
1158 		    ADVANCE_POST, kill_blkptr, &ka);
1159 		ASSERT3U(err, ==, 0);
1160 	}
1161 
1162 	err = zio_wait(zio);
1163 	ASSERT3U(err, ==, 0);
1164 
1165 	dsl_dir_diduse_space(dd, -used, -compressed, -uncompressed, tx);
1166 
1167 	if (ds->ds_phys->ds_snapnames_zapobj) {
1168 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
1169 		ASSERT(err == 0);
1170 	}
1171 
1172 	if (dd->dd_phys->dd_head_dataset_obj == ds->ds_object) {
1173 		/* Erase the link in the dataset */
1174 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
1175 		dd->dd_phys->dd_head_dataset_obj = 0;
1176 		/*
1177 		 * dsl_dir_sync_destroy() called us, they'll destroy
1178 		 * the dataset.
1179 		 */
1180 	} else {
1181 		/* remove from snapshot namespace */
1182 		dsl_dataset_t *ds_head;
1183 		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
1184 		    dd->dd_phys->dd_head_dataset_obj, NULL,
1185 		    DS_MODE_NONE, FTAG, &ds_head));
1186 #ifdef ZFS_DEBUG
1187 		{
1188 			uint64_t val;
1189 			err = zap_lookup(mos,
1190 			    ds_head->ds_phys->ds_snapnames_zapobj,
1191 			    snapname, 8, 1, &val);
1192 			ASSERT3U(err, ==, 0);
1193 			ASSERT3U(val, ==, obj);
1194 		}
1195 #endif
1196 		err = zap_remove(mos, ds_head->ds_phys->ds_snapnames_zapobj,
1197 		    snapname, tx);
1198 		ASSERT(err == 0);
1199 		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
1200 	}
1201 
1202 	if (ds_prev && ds->ds_prev != ds_prev)
1203 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1204 
1205 	err = dmu_object_free(mos, obj, tx);
1206 	ASSERT(err == 0);
1207 
1208 	/*
1209 	 * Close the objset with mode NONE, thus leaving it with
1210 	 * DOS_REF_MAX set, so that noone can access it.
1211 	 */
1212 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1213 
1214 	if (drop_lock)
1215 		rw_exit(&dp->dp_config_rwlock);
1216 	return (0);
1217 }
1218 
1219 int
1220 dsl_dataset_snapshot_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1221 {
1222 	const char *snapname = arg;
1223 	dsl_pool_t *dp = dd->dd_pool;
1224 	dmu_buf_t *dbuf;
1225 	dsl_dataset_phys_t *dsphys;
1226 	uint64_t dsobj, value;
1227 	objset_t *mos = dp->dp_meta_objset;
1228 	dsl_dataset_t *ds;
1229 	int err;
1230 
1231 	ASSERT(dmu_tx_is_syncing(tx));
1232 
1233 	if (dd->dd_phys->dd_head_dataset_obj == 0)
1234 		return (EINVAL);
1235 	err = dsl_dataset_open_obj(dp, dd->dd_phys->dd_head_dataset_obj, NULL,
1236 	    DS_MODE_NONE, FTAG, &ds);
1237 	if (err)
1238 		return (err);
1239 
1240 	err = zap_lookup(mos, ds->ds_phys->ds_snapnames_zapobj,
1241 	    snapname, 8, 1, &value);
1242 	if (err == 0) {
1243 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1244 		return (EEXIST);
1245 	}
1246 	ASSERT(err == ENOENT);
1247 
1248 	/* The point of no (unsuccessful) return */
1249 
1250 	dprintf_dd(dd, "taking snapshot %s in txg %llu\n",
1251 	    snapname, tx->tx_txg);
1252 
1253 	spa_scrub_restart(dp->dp_spa, tx->tx_txg);
1254 
1255 	rw_enter(&dp->dp_config_rwlock, RW_WRITER);
1256 
1257 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
1258 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
1259 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
1260 	dmu_buf_will_dirty(dbuf, tx);
1261 	dsphys = dbuf->db_data;
1262 	dsphys->ds_dir_obj = dd->dd_object;
1263 	dsphys->ds_fsid_guid = unique_create();
1264 	unique_remove(dsphys->ds_fsid_guid); /* it isn't open yet */
1265 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
1266 	    sizeof (dsphys->ds_guid));
1267 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
1268 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
1269 	dsphys->ds_next_snap_obj = ds->ds_object;
1270 	dsphys->ds_num_children = 1;
1271 	dsphys->ds_creation_time = gethrestime_sec();
1272 	dsphys->ds_creation_txg = tx->tx_txg;
1273 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
1274 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
1275 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
1276 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
1277 	dsphys->ds_restoring = ds->ds_phys->ds_restoring;
1278 	dsphys->ds_bp = ds->ds_phys->ds_bp;
1279 	dmu_buf_rele(dbuf, FTAG);
1280 
1281 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
1282 		dsl_dataset_t *ds_prev;
1283 
1284 		VERIFY(0 == dsl_dataset_open_obj(dp,
1285 		    ds->ds_phys->ds_prev_snap_obj, NULL,
1286 		    DS_MODE_NONE, FTAG, &ds_prev));
1287 		ASSERT(ds_prev->ds_phys->ds_next_snap_obj ==
1288 		    ds->ds_object ||
1289 		    ds_prev->ds_phys->ds_num_children > 1);
1290 		if (ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
1291 			dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
1292 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
1293 			    ds_prev->ds_phys->ds_creation_txg);
1294 			ds_prev->ds_phys->ds_next_snap_obj = dsobj;
1295 		}
1296 		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
1297 	} else {
1298 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==, 0);
1299 	}
1300 
1301 	bplist_close(&ds->ds_deadlist);
1302 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1303 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, dsphys->ds_creation_txg);
1304 	ds->ds_phys->ds_prev_snap_obj = dsobj;
1305 	ds->ds_phys->ds_prev_snap_txg = dsphys->ds_creation_txg;
1306 	ds->ds_phys->ds_unique_bytes = 0;
1307 	ds->ds_phys->ds_deadlist_obj =
1308 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
1309 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
1310 	    ds->ds_phys->ds_deadlist_obj));
1311 
1312 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
1313 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
1314 	    snapname, 8, 1, &dsobj, tx);
1315 	ASSERT(err == 0);
1316 
1317 	if (ds->ds_prev)
1318 		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
1319 	VERIFY(0 == dsl_dataset_open_obj(dp,
1320 	    ds->ds_phys->ds_prev_snap_obj, snapname,
1321 	    DS_MODE_NONE, ds, &ds->ds_prev));
1322 
1323 	rw_exit(&dp->dp_config_rwlock);
1324 	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1325 
1326 	return (0);
1327 }
1328 
1329 void
1330 dsl_dataset_sync(dsl_dataset_t *ds, dmu_tx_t *tx)
1331 {
1332 	ASSERT(dmu_tx_is_syncing(tx));
1333 	ASSERT(ds->ds_user_ptr != NULL);
1334 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
1335 
1336 	dmu_objset_sync(ds->ds_user_ptr, tx);
1337 	dsl_dir_dirty(ds->ds_dir, tx);
1338 	bplist_close(&ds->ds_deadlist);
1339 
1340 	dmu_buf_rele(ds->ds_dbuf, ds);
1341 }
1342 
1343 void
1344 dsl_dataset_stats(dsl_dataset_t *ds, dmu_objset_stats_t *dds)
1345 {
1346 	/* fill in properties crap */
1347 	dsl_dir_stats(ds->ds_dir, dds);
1348 
1349 	if (ds->ds_phys->ds_num_children != 0) {
1350 		dds->dds_is_snapshot = TRUE;
1351 		dds->dds_num_clones = ds->ds_phys->ds_num_children - 1;
1352 	}
1353 
1354 	dds->dds_last_txg = ds->ds_phys->ds_bp.blk_birth;
1355 
1356 	dds->dds_objects_used = ds->ds_phys->ds_bp.blk_fill;
1357 	dds->dds_objects_avail = DN_MAX_OBJECT - dds->dds_objects_used;
1358 
1359 	/* We override the dataset's creation time... they should be the same */
1360 	dds->dds_creation_time = ds->ds_phys->ds_creation_time;
1361 	dds->dds_creation_txg = ds->ds_phys->ds_creation_txg;
1362 	dds->dds_space_refd = ds->ds_phys->ds_used_bytes;
1363 	dds->dds_fsid_guid = ds->ds_phys->ds_fsid_guid;
1364 
1365 	if (ds->ds_phys->ds_next_snap_obj) {
1366 		/*
1367 		 * This is a snapshot; override the dd's space used with
1368 		 * our unique space
1369 		 */
1370 		dds->dds_space_used = ds->ds_phys->ds_unique_bytes;
1371 		dds->dds_compressed_bytes =
1372 		    ds->ds_phys->ds_compressed_bytes;
1373 		dds->dds_uncompressed_bytes =
1374 		    ds->ds_phys->ds_uncompressed_bytes;
1375 	}
1376 }
1377 
1378 dsl_pool_t *
1379 dsl_dataset_pool(dsl_dataset_t *ds)
1380 {
1381 	return (ds->ds_dir->dd_pool);
1382 }
1383 
1384 struct osrenamearg {
1385 	const char *oldname;
1386 	const char *newname;
1387 };
1388 
1389 static int
1390 dsl_dataset_snapshot_rename_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
1391 {
1392 	struct osrenamearg *ora = arg;
1393 	objset_t *mos = dd->dd_pool->dp_meta_objset;
1394 	dsl_dir_t *nds;
1395 	const char *tail;
1396 	int err;
1397 	dsl_dataset_t *snds, *fsds;
1398 	uint64_t val;
1399 
1400 	err = dsl_dataset_open_spa(dd->dd_pool->dp_spa, ora->oldname,
1401 	    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &snds);
1402 	if (err)
1403 		return (err);
1404 
1405 	if (snds->ds_dir != dd) {
1406 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1407 		return (EINVAL);
1408 	}
1409 
1410 	/* better be changing a snapshot */
1411 	if (snds->ds_phys->ds_next_snap_obj == 0) {
1412 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1413 		return (EINVAL);
1414 	}
1415 
1416 	/* new fs better exist */
1417 	err = dsl_dir_open_spa(dd->dd_pool->dp_spa, ora->newname,
1418 	    FTAG, &nds, &tail);
1419 	if (err) {
1420 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1421 		return (err);
1422 	}
1423 
1424 	dsl_dir_close(nds, FTAG);
1425 
1426 	/* new name better be in same fs */
1427 	if (nds != dd) {
1428 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1429 		return (EINVAL);
1430 	}
1431 
1432 	/* new name better be a snapshot */
1433 	if (tail == NULL || tail[0] != '@') {
1434 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1435 		return (EINVAL);
1436 	}
1437 
1438 	tail++;
1439 
1440 	err = dsl_dataset_open_obj(dd->dd_pool,
1441 	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &fsds);
1442 	if (err) {
1443 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1444 		return (err);
1445 	}
1446 
1447 	/* new name better not be in use */
1448 	err = zap_lookup(mos, fsds->ds_phys->ds_snapnames_zapobj,
1449 	    tail, 8, 1, &val);
1450 	if (err != ENOENT) {
1451 		if (err == 0)
1452 			err = EEXIST;
1453 		dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1454 		dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1455 		return (EEXIST);
1456 	}
1457 
1458 	/* The point of no (unsuccessful) return */
1459 
1460 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_WRITER);
1461 	VERIFY(0 == dsl_dataset_get_snapname(snds));
1462 	err = zap_remove(mos, fsds->ds_phys->ds_snapnames_zapobj,
1463 	    snds->ds_snapname, tx);
1464 	ASSERT3U(err, ==, 0);
1465 	mutex_enter(&snds->ds_lock);
1466 	(void) strcpy(snds->ds_snapname, tail);
1467 	mutex_exit(&snds->ds_lock);
1468 	err = zap_add(mos, fsds->ds_phys->ds_snapnames_zapobj,
1469 	    snds->ds_snapname, 8, 1, &snds->ds_object, tx);
1470 	ASSERT3U(err, ==, 0);
1471 	rw_exit(&dd->dd_pool->dp_config_rwlock);
1472 
1473 	dsl_dataset_close(fsds, DS_MODE_NONE, FTAG);
1474 	dsl_dataset_close(snds, DS_MODE_STANDARD, FTAG);
1475 	return (0);
1476 }
1477 
1478 #pragma weak dmu_objset_rename = dsl_dataset_rename
1479 int
1480 dsl_dataset_rename(const char *osname, const char *newname)
1481 {
1482 	dsl_dir_t *dd;
1483 	const char *tail;
1484 	struct osrenamearg ora;
1485 	int err;
1486 
1487 	err = dsl_dir_open(osname, FTAG, &dd, &tail);
1488 	if (err)
1489 		return (err);
1490 	if (tail == NULL) {
1491 		err = dsl_dir_sync_task(dd,
1492 		    dsl_dir_rename_sync, (void*)newname, 1<<12);
1493 		dsl_dir_close(dd, FTAG);
1494 		return (err);
1495 	}
1496 	if (tail[0] != '@') {
1497 		/* the name ended in a nonexistant component */
1498 		dsl_dir_close(dd, FTAG);
1499 		return (ENOENT);
1500 	}
1501 
1502 	ora.oldname = osname;
1503 	ora.newname = newname;
1504 
1505 	err = dsl_dir_sync_task(dd,
1506 	    dsl_dataset_snapshot_rename_sync, &ora, 1<<12);
1507 	dsl_dir_close(dd, FTAG);
1508 	return (err);
1509 }
1510