xref: /titanic_50/usr/src/uts/common/fs/zfs/dmu_objset.c (revision c1ecd8b9404ee0d96d93f02e82c441b9bb149a3d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/cred.h>
29 #include <sys/zfs_context.h>
30 #include <sys/dmu_objset.h>
31 #include <sys/dsl_dir.h>
32 #include <sys/dsl_dataset.h>
33 #include <sys/dsl_prop.h>
34 #include <sys/dsl_pool.h>
35 #include <sys/dsl_synctask.h>
36 #include <sys/dsl_deleg.h>
37 #include <sys/dnode.h>
38 #include <sys/dbuf.h>
39 #include <sys/zvol.h>
40 #include <sys/dmu_tx.h>
41 #include <sys/zio_checksum.h>
42 #include <sys/zap.h>
43 #include <sys/zil.h>
44 #include <sys/dmu_impl.h>
45 #include <sys/zfs_ioctl.h>
46 
47 spa_t *
48 dmu_objset_spa(objset_t *os)
49 {
50 	return (os->os->os_spa);
51 }
52 
53 zilog_t *
54 dmu_objset_zil(objset_t *os)
55 {
56 	return (os->os->os_zil);
57 }
58 
59 dsl_pool_t *
60 dmu_objset_pool(objset_t *os)
61 {
62 	dsl_dataset_t *ds;
63 
64 	if ((ds = os->os->os_dsl_dataset) != NULL && ds->ds_dir)
65 		return (ds->ds_dir->dd_pool);
66 	else
67 		return (spa_get_dsl(os->os->os_spa));
68 }
69 
70 dsl_dataset_t *
71 dmu_objset_ds(objset_t *os)
72 {
73 	return (os->os->os_dsl_dataset);
74 }
75 
76 dmu_objset_type_t
77 dmu_objset_type(objset_t *os)
78 {
79 	return (os->os->os_phys->os_type);
80 }
81 
82 void
83 dmu_objset_name(objset_t *os, char *buf)
84 {
85 	dsl_dataset_name(os->os->os_dsl_dataset, buf);
86 }
87 
88 uint64_t
89 dmu_objset_id(objset_t *os)
90 {
91 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
92 
93 	return (ds ? ds->ds_object : 0);
94 }
95 
96 static void
97 checksum_changed_cb(void *arg, uint64_t newval)
98 {
99 	objset_impl_t *osi = arg;
100 
101 	/*
102 	 * Inheritance should have been done by now.
103 	 */
104 	ASSERT(newval != ZIO_CHECKSUM_INHERIT);
105 
106 	osi->os_checksum = zio_checksum_select(newval, ZIO_CHECKSUM_ON_VALUE);
107 }
108 
109 static void
110 compression_changed_cb(void *arg, uint64_t newval)
111 {
112 	objset_impl_t *osi = arg;
113 
114 	/*
115 	 * Inheritance and range checking should have been done by now.
116 	 */
117 	ASSERT(newval != ZIO_COMPRESS_INHERIT);
118 
119 	osi->os_compress = zio_compress_select(newval, ZIO_COMPRESS_ON_VALUE);
120 }
121 
122 static void
123 copies_changed_cb(void *arg, uint64_t newval)
124 {
125 	objset_impl_t *osi = arg;
126 
127 	/*
128 	 * Inheritance and range checking should have been done by now.
129 	 */
130 	ASSERT(newval > 0);
131 	ASSERT(newval <= spa_max_replication(osi->os_spa));
132 
133 	osi->os_copies = newval;
134 }
135 
136 static void
137 primary_cache_changed_cb(void *arg, uint64_t newval)
138 {
139 	objset_impl_t *osi = arg;
140 
141 	/*
142 	 * Inheritance and range checking should have been done by now.
143 	 */
144 	ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
145 	    newval == ZFS_CACHE_METADATA);
146 
147 	osi->os_primary_cache = newval;
148 }
149 
150 static void
151 secondary_cache_changed_cb(void *arg, uint64_t newval)
152 {
153 	objset_impl_t *osi = arg;
154 
155 	/*
156 	 * Inheritance and range checking should have been done by now.
157 	 */
158 	ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
159 	    newval == ZFS_CACHE_METADATA);
160 
161 	osi->os_secondary_cache = newval;
162 }
163 
164 void
165 dmu_objset_byteswap(void *buf, size_t size)
166 {
167 	objset_phys_t *osp = buf;
168 
169 	ASSERT(size == sizeof (objset_phys_t));
170 	dnode_byteswap(&osp->os_meta_dnode);
171 	byteswap_uint64_array(&osp->os_zil_header, sizeof (zil_header_t));
172 	osp->os_type = BSWAP_64(osp->os_type);
173 }
174 
175 int
176 dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
177     objset_impl_t **osip)
178 {
179 	objset_impl_t *osi;
180 	int i, err;
181 
182 	ASSERT(ds == NULL || MUTEX_HELD(&ds->ds_opening_lock));
183 
184 	osi = kmem_zalloc(sizeof (objset_impl_t), KM_SLEEP);
185 	osi->os.os = osi;
186 	osi->os_dsl_dataset = ds;
187 	osi->os_spa = spa;
188 	osi->os_rootbp = bp;
189 	if (!BP_IS_HOLE(osi->os_rootbp)) {
190 		uint32_t aflags = ARC_WAIT;
191 		zbookmark_t zb;
192 		zb.zb_objset = ds ? ds->ds_object : 0;
193 		zb.zb_object = 0;
194 		zb.zb_level = -1;
195 		zb.zb_blkid = 0;
196 		if (DMU_OS_IS_L2CACHEABLE(osi))
197 			aflags |= ARC_L2CACHE;
198 
199 		dprintf_bp(osi->os_rootbp, "reading %s", "");
200 		/*
201 		 * NB: when bprewrite scrub can change the bp,
202 		 * and this is called from dmu_objset_open_ds_os, the bp
203 		 * could change, and we'll need a lock.
204 		 */
205 		err = arc_read_nolock(NULL, spa, osi->os_rootbp,
206 		    arc_getbuf_func, &osi->os_phys_buf,
207 		    ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL, &aflags, &zb);
208 		if (err) {
209 			kmem_free(osi, sizeof (objset_impl_t));
210 			return (err);
211 		}
212 		osi->os_phys = osi->os_phys_buf->b_data;
213 	} else {
214 		osi->os_phys_buf = arc_buf_alloc(spa, sizeof (objset_phys_t),
215 		    &osi->os_phys_buf, ARC_BUFC_METADATA);
216 		osi->os_phys = osi->os_phys_buf->b_data;
217 		bzero(osi->os_phys, sizeof (objset_phys_t));
218 	}
219 
220 	/*
221 	 * Note: the changed_cb will be called once before the register
222 	 * func returns, thus changing the checksum/compression from the
223 	 * default (fletcher2/off).  Snapshots don't need to know about
224 	 * checksum/compression/copies.
225 	 */
226 	if (ds) {
227 		err = dsl_prop_register(ds, "primarycache",
228 		    primary_cache_changed_cb, osi);
229 		if (err == 0)
230 			err = dsl_prop_register(ds, "secondarycache",
231 			    secondary_cache_changed_cb, osi);
232 		if (!dsl_dataset_is_snapshot(ds)) {
233 			if (err == 0)
234 				err = dsl_prop_register(ds, "checksum",
235 				    checksum_changed_cb, osi);
236 			if (err == 0)
237 				err = dsl_prop_register(ds, "compression",
238 				    compression_changed_cb, osi);
239 			if (err == 0)
240 				err = dsl_prop_register(ds, "copies",
241 				    copies_changed_cb, osi);
242 		}
243 		if (err) {
244 			VERIFY(arc_buf_remove_ref(osi->os_phys_buf,
245 			    &osi->os_phys_buf) == 1);
246 			kmem_free(osi, sizeof (objset_impl_t));
247 			return (err);
248 		}
249 	} else if (ds == NULL) {
250 		/* It's the meta-objset. */
251 		osi->os_checksum = ZIO_CHECKSUM_FLETCHER_4;
252 		osi->os_compress = ZIO_COMPRESS_LZJB;
253 		osi->os_copies = spa_max_replication(spa);
254 		osi->os_primary_cache = ZFS_CACHE_ALL;
255 		osi->os_secondary_cache = ZFS_CACHE_ALL;
256 	}
257 
258 	osi->os_zil_header = osi->os_phys->os_zil_header;
259 	osi->os_zil = zil_alloc(&osi->os, &osi->os_zil_header);
260 
261 	for (i = 0; i < TXG_SIZE; i++) {
262 		list_create(&osi->os_dirty_dnodes[i], sizeof (dnode_t),
263 		    offsetof(dnode_t, dn_dirty_link[i]));
264 		list_create(&osi->os_free_dnodes[i], sizeof (dnode_t),
265 		    offsetof(dnode_t, dn_dirty_link[i]));
266 	}
267 	list_create(&osi->os_dnodes, sizeof (dnode_t),
268 	    offsetof(dnode_t, dn_link));
269 	list_create(&osi->os_downgraded_dbufs, sizeof (dmu_buf_impl_t),
270 	    offsetof(dmu_buf_impl_t, db_link));
271 
272 	mutex_init(&osi->os_lock, NULL, MUTEX_DEFAULT, NULL);
273 	mutex_init(&osi->os_obj_lock, NULL, MUTEX_DEFAULT, NULL);
274 	mutex_init(&osi->os_user_ptr_lock, NULL, MUTEX_DEFAULT, NULL);
275 
276 	osi->os_meta_dnode = dnode_special_open(osi,
277 	    &osi->os_phys->os_meta_dnode, DMU_META_DNODE_OBJECT);
278 
279 	/*
280 	 * We should be the only thread trying to do this because we
281 	 * have ds_opening_lock
282 	 */
283 	if (ds) {
284 		VERIFY(NULL == dsl_dataset_set_user_ptr(ds, osi,
285 		    dmu_objset_evict));
286 	}
287 
288 	*osip = osi;
289 	return (0);
290 }
291 
292 static int
293 dmu_objset_open_ds_os(dsl_dataset_t *ds, objset_t *os, dmu_objset_type_t type)
294 {
295 	objset_impl_t *osi;
296 
297 	mutex_enter(&ds->ds_opening_lock);
298 	osi = dsl_dataset_get_user_ptr(ds);
299 	if (osi == NULL) {
300 		int err;
301 
302 		err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
303 		    ds, &ds->ds_phys->ds_bp, &osi);
304 		if (err) {
305 			mutex_exit(&ds->ds_opening_lock);
306 			return (err);
307 		}
308 	}
309 	mutex_exit(&ds->ds_opening_lock);
310 
311 	os->os = osi;
312 	os->os_mode = DS_MODE_NOHOLD;
313 
314 	if (type != DMU_OST_ANY && type != os->os->os_phys->os_type)
315 		return (EINVAL);
316 	return (0);
317 }
318 
319 int
320 dmu_objset_open_ds(dsl_dataset_t *ds, dmu_objset_type_t type, objset_t **osp)
321 {
322 	objset_t *os;
323 	int err;
324 
325 	os = kmem_alloc(sizeof (objset_t), KM_SLEEP);
326 	err = dmu_objset_open_ds_os(ds, os, type);
327 	if (err)
328 		kmem_free(os, sizeof (objset_t));
329 	else
330 		*osp = os;
331 	return (err);
332 }
333 
334 /* called from zpl */
335 int
336 dmu_objset_open(const char *name, dmu_objset_type_t type, int mode,
337     objset_t **osp)
338 {
339 	objset_t *os;
340 	dsl_dataset_t *ds;
341 	int err;
342 
343 	ASSERT(DS_MODE_TYPE(mode) == DS_MODE_USER ||
344 	    DS_MODE_TYPE(mode) == DS_MODE_OWNER);
345 
346 	os = kmem_alloc(sizeof (objset_t), KM_SLEEP);
347 	if (DS_MODE_TYPE(mode) == DS_MODE_USER)
348 		err = dsl_dataset_hold(name, os, &ds);
349 	else
350 		err = dsl_dataset_own(name, mode, os, &ds);
351 	if (err) {
352 		kmem_free(os, sizeof (objset_t));
353 		return (err);
354 	}
355 
356 	err = dmu_objset_open_ds_os(ds, os, type);
357 	if (err) {
358 		if (DS_MODE_TYPE(mode) == DS_MODE_USER)
359 			dsl_dataset_rele(ds, os);
360 		else
361 			dsl_dataset_disown(ds, os);
362 		kmem_free(os, sizeof (objset_t));
363 	} else {
364 		os->os_mode = mode;
365 		*osp = os;
366 	}
367 	return (err);
368 }
369 
370 void
371 dmu_objset_close(objset_t *os)
372 {
373 	ASSERT(DS_MODE_TYPE(os->os_mode) == DS_MODE_USER ||
374 	    DS_MODE_TYPE(os->os_mode) == DS_MODE_OWNER ||
375 	    DS_MODE_TYPE(os->os_mode) == DS_MODE_NOHOLD);
376 
377 	if (DS_MODE_TYPE(os->os_mode) == DS_MODE_USER)
378 		dsl_dataset_rele(os->os->os_dsl_dataset, os);
379 	else if (DS_MODE_TYPE(os->os_mode) == DS_MODE_OWNER)
380 		dsl_dataset_disown(os->os->os_dsl_dataset, os);
381 	kmem_free(os, sizeof (objset_t));
382 }
383 
384 int
385 dmu_objset_evict_dbufs(objset_t *os)
386 {
387 	objset_impl_t *osi = os->os;
388 	dnode_t *dn;
389 
390 	mutex_enter(&osi->os_lock);
391 
392 	/* process the mdn last, since the other dnodes have holds on it */
393 	list_remove(&osi->os_dnodes, osi->os_meta_dnode);
394 	list_insert_tail(&osi->os_dnodes, osi->os_meta_dnode);
395 
396 	/*
397 	 * Find the first dnode with holds.  We have to do this dance
398 	 * because dnode_add_ref() only works if you already have a
399 	 * hold.  If there are no holds then it has no dbufs so OK to
400 	 * skip.
401 	 */
402 	for (dn = list_head(&osi->os_dnodes);
403 	    dn && !dnode_add_ref(dn, FTAG);
404 	    dn = list_next(&osi->os_dnodes, dn))
405 		continue;
406 
407 	while (dn) {
408 		dnode_t *next_dn = dn;
409 
410 		do {
411 			next_dn = list_next(&osi->os_dnodes, next_dn);
412 		} while (next_dn && !dnode_add_ref(next_dn, FTAG));
413 
414 		mutex_exit(&osi->os_lock);
415 		dnode_evict_dbufs(dn);
416 		dnode_rele(dn, FTAG);
417 		mutex_enter(&osi->os_lock);
418 		dn = next_dn;
419 	}
420 	mutex_exit(&osi->os_lock);
421 	return (list_head(&osi->os_dnodes) != osi->os_meta_dnode);
422 }
423 
424 void
425 dmu_objset_evict(dsl_dataset_t *ds, void *arg)
426 {
427 	objset_impl_t *osi = arg;
428 	objset_t os;
429 	int i;
430 
431 	for (i = 0; i < TXG_SIZE; i++) {
432 		ASSERT(list_head(&osi->os_dirty_dnodes[i]) == NULL);
433 		ASSERT(list_head(&osi->os_free_dnodes[i]) == NULL);
434 	}
435 
436 	if (ds) {
437 		if (!dsl_dataset_is_snapshot(ds)) {
438 			VERIFY(0 == dsl_prop_unregister(ds, "checksum",
439 			    checksum_changed_cb, osi));
440 			VERIFY(0 == dsl_prop_unregister(ds, "compression",
441 			    compression_changed_cb, osi));
442 			VERIFY(0 == dsl_prop_unregister(ds, "copies",
443 			    copies_changed_cb, osi));
444 		}
445 		VERIFY(0 == dsl_prop_unregister(ds, "primarycache",
446 		    primary_cache_changed_cb, osi));
447 		VERIFY(0 == dsl_prop_unregister(ds, "secondarycache",
448 		    secondary_cache_changed_cb, osi));
449 	}
450 
451 	/*
452 	 * We should need only a single pass over the dnode list, since
453 	 * nothing can be added to the list at this point.
454 	 */
455 	os.os = osi;
456 	(void) dmu_objset_evict_dbufs(&os);
457 
458 	ASSERT3P(list_head(&osi->os_dnodes), ==, osi->os_meta_dnode);
459 	ASSERT3P(list_tail(&osi->os_dnodes), ==, osi->os_meta_dnode);
460 	ASSERT3P(list_head(&osi->os_meta_dnode->dn_dbufs), ==, NULL);
461 
462 	dnode_special_close(osi->os_meta_dnode);
463 	zil_free(osi->os_zil);
464 
465 	VERIFY(arc_buf_remove_ref(osi->os_phys_buf, &osi->os_phys_buf) == 1);
466 	mutex_destroy(&osi->os_lock);
467 	mutex_destroy(&osi->os_obj_lock);
468 	mutex_destroy(&osi->os_user_ptr_lock);
469 	kmem_free(osi, sizeof (objset_impl_t));
470 }
471 
472 /* called from dsl for meta-objset */
473 objset_impl_t *
474 dmu_objset_create_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
475     dmu_objset_type_t type, dmu_tx_t *tx)
476 {
477 	objset_impl_t *osi;
478 	dnode_t *mdn;
479 
480 	ASSERT(dmu_tx_is_syncing(tx));
481 	if (ds)
482 		mutex_enter(&ds->ds_opening_lock);
483 	VERIFY(0 == dmu_objset_open_impl(spa, ds, bp, &osi));
484 	if (ds)
485 		mutex_exit(&ds->ds_opening_lock);
486 	mdn = osi->os_meta_dnode;
487 
488 	dnode_allocate(mdn, DMU_OT_DNODE, 1 << DNODE_BLOCK_SHIFT,
489 	    DN_MAX_INDBLKSHIFT, DMU_OT_NONE, 0, tx);
490 
491 	/*
492 	 * We don't want to have to increase the meta-dnode's nlevels
493 	 * later, because then we could do it in quescing context while
494 	 * we are also accessing it in open context.
495 	 *
496 	 * This precaution is not necessary for the MOS (ds == NULL),
497 	 * because the MOS is only updated in syncing context.
498 	 * This is most fortunate: the MOS is the only objset that
499 	 * needs to be synced multiple times as spa_sync() iterates
500 	 * to convergence, so minimizing its dn_nlevels matters.
501 	 */
502 	if (ds != NULL) {
503 		int levels = 1;
504 
505 		/*
506 		 * Determine the number of levels necessary for the meta-dnode
507 		 * to contain DN_MAX_OBJECT dnodes.
508 		 */
509 		while ((uint64_t)mdn->dn_nblkptr << (mdn->dn_datablkshift +
510 		    (levels - 1) * (mdn->dn_indblkshift - SPA_BLKPTRSHIFT)) <
511 		    DN_MAX_OBJECT * sizeof (dnode_phys_t))
512 			levels++;
513 
514 		mdn->dn_next_nlevels[tx->tx_txg & TXG_MASK] =
515 		    mdn->dn_nlevels = levels;
516 	}
517 
518 	ASSERT(type != DMU_OST_NONE);
519 	ASSERT(type != DMU_OST_ANY);
520 	ASSERT(type < DMU_OST_NUMTYPES);
521 	osi->os_phys->os_type = type;
522 
523 	dsl_dataset_dirty(ds, tx);
524 
525 	return (osi);
526 }
527 
528 struct oscarg {
529 	void (*userfunc)(objset_t *os, void *arg, cred_t *cr, dmu_tx_t *tx);
530 	void *userarg;
531 	dsl_dataset_t *clone_parent;
532 	const char *lastname;
533 	dmu_objset_type_t type;
534 	uint64_t flags;
535 };
536 
537 /*ARGSUSED*/
538 static int
539 dmu_objset_create_check(void *arg1, void *arg2, dmu_tx_t *tx)
540 {
541 	dsl_dir_t *dd = arg1;
542 	struct oscarg *oa = arg2;
543 	objset_t *mos = dd->dd_pool->dp_meta_objset;
544 	int err;
545 	uint64_t ddobj;
546 
547 	err = zap_lookup(mos, dd->dd_phys->dd_child_dir_zapobj,
548 	    oa->lastname, sizeof (uint64_t), 1, &ddobj);
549 	if (err != ENOENT)
550 		return (err ? err : EEXIST);
551 
552 	if (oa->clone_parent != NULL) {
553 		/*
554 		 * You can't clone across pools.
555 		 */
556 		if (oa->clone_parent->ds_dir->dd_pool != dd->dd_pool)
557 			return (EXDEV);
558 
559 		/*
560 		 * You can only clone snapshots, not the head datasets.
561 		 */
562 		if (oa->clone_parent->ds_phys->ds_num_children == 0)
563 			return (EINVAL);
564 	}
565 
566 	return (0);
567 }
568 
569 static void
570 dmu_objset_create_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
571 {
572 	dsl_dir_t *dd = arg1;
573 	struct oscarg *oa = arg2;
574 	dsl_dataset_t *ds;
575 	blkptr_t *bp;
576 	uint64_t dsobj;
577 
578 	ASSERT(dmu_tx_is_syncing(tx));
579 
580 	dsobj = dsl_dataset_create_sync(dd, oa->lastname,
581 	    oa->clone_parent, oa->flags, cr, tx);
582 
583 	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool, dsobj, FTAG, &ds));
584 	bp = dsl_dataset_get_blkptr(ds);
585 	if (BP_IS_HOLE(bp)) {
586 		objset_impl_t *osi;
587 
588 		/* This is an empty dmu_objset; not a clone. */
589 		osi = dmu_objset_create_impl(dsl_dataset_get_spa(ds),
590 		    ds, bp, oa->type, tx);
591 
592 		if (oa->userfunc)
593 			oa->userfunc(&osi->os, oa->userarg, cr, tx);
594 	}
595 
596 	spa_history_internal_log(LOG_DS_CREATE, dd->dd_pool->dp_spa,
597 	    tx, cr, "dataset = %llu", dsobj);
598 
599 	dsl_dataset_rele(ds, FTAG);
600 }
601 
602 int
603 dmu_objset_create(const char *name, dmu_objset_type_t type,
604     objset_t *clone_parent, uint64_t flags,
605     void (*func)(objset_t *os, void *arg, cred_t *cr, dmu_tx_t *tx), void *arg)
606 {
607 	dsl_dir_t *pdd;
608 	const char *tail;
609 	int err = 0;
610 	struct oscarg oa = { 0 };
611 
612 	ASSERT(strchr(name, '@') == NULL);
613 	err = dsl_dir_open(name, FTAG, &pdd, &tail);
614 	if (err)
615 		return (err);
616 	if (tail == NULL) {
617 		dsl_dir_close(pdd, FTAG);
618 		return (EEXIST);
619 	}
620 
621 	dprintf("name=%s\n", name);
622 
623 	oa.userfunc = func;
624 	oa.userarg = arg;
625 	oa.lastname = tail;
626 	oa.type = type;
627 	oa.flags = flags;
628 
629 	if (clone_parent != NULL) {
630 		/*
631 		 * You can't clone to a different type.
632 		 */
633 		if (clone_parent->os->os_phys->os_type != type) {
634 			dsl_dir_close(pdd, FTAG);
635 			return (EINVAL);
636 		}
637 		oa.clone_parent = clone_parent->os->os_dsl_dataset;
638 	}
639 	err = dsl_sync_task_do(pdd->dd_pool, dmu_objset_create_check,
640 	    dmu_objset_create_sync, pdd, &oa, 5);
641 	dsl_dir_close(pdd, FTAG);
642 	return (err);
643 }
644 
645 int
646 dmu_objset_destroy(const char *name)
647 {
648 	objset_t *os;
649 	int error;
650 
651 	/*
652 	 * If it looks like we'll be able to destroy it, and there's
653 	 * an unplayed replay log sitting around, destroy the log.
654 	 * It would be nicer to do this in dsl_dataset_destroy_sync(),
655 	 * but the replay log objset is modified in open context.
656 	 */
657 	error = dmu_objset_open(name, DMU_OST_ANY,
658 	    DS_MODE_OWNER|DS_MODE_READONLY|DS_MODE_INCONSISTENT, &os);
659 	if (error == 0) {
660 		dsl_dataset_t *ds = os->os->os_dsl_dataset;
661 		zil_destroy(dmu_objset_zil(os), B_FALSE);
662 
663 		error = dsl_dataset_destroy(ds, os);
664 		/*
665 		 * dsl_dataset_destroy() closes the ds.
666 		 */
667 		kmem_free(os, sizeof (objset_t));
668 	}
669 
670 	return (error);
671 }
672 
673 /*
674  * This will close the objset.
675  */
676 int
677 dmu_objset_rollback(objset_t *os)
678 {
679 	int err;
680 	dsl_dataset_t *ds;
681 
682 	ds = os->os->os_dsl_dataset;
683 
684 	if (!dsl_dataset_tryown(ds, TRUE, os)) {
685 		dmu_objset_close(os);
686 		return (EBUSY);
687 	}
688 
689 	err = dsl_dataset_rollback(ds, os->os->os_phys->os_type);
690 
691 	/*
692 	 * NB: we close the objset manually because the rollback
693 	 * actually implicitly called dmu_objset_evict(), thus freeing
694 	 * the objset_impl_t.
695 	 */
696 	dsl_dataset_disown(ds, os);
697 	kmem_free(os, sizeof (objset_t));
698 	return (err);
699 }
700 
701 struct snaparg {
702 	dsl_sync_task_group_t *dstg;
703 	char *snapname;
704 	char failed[MAXPATHLEN];
705 	boolean_t checkperms;
706 	list_t objsets;
707 };
708 
709 struct osnode {
710 	list_node_t node;
711 	objset_t *os;
712 };
713 
714 static int
715 dmu_objset_snapshot_one(char *name, void *arg)
716 {
717 	struct snaparg *sn = arg;
718 	objset_t *os;
719 	int err;
720 
721 	(void) strcpy(sn->failed, name);
722 
723 	/*
724 	 * Check permissions only when requested.  This only applies when
725 	 * doing a recursive snapshot.  The permission checks for the starting
726 	 * dataset have already been performed in zfs_secpolicy_snapshot()
727 	 */
728 	if (sn->checkperms == B_TRUE &&
729 	    (err = zfs_secpolicy_snapshot_perms(name, CRED())))
730 		return (err);
731 
732 	err = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_USER, &os);
733 	if (err != 0)
734 		return (err);
735 
736 	/* If the objset is in an inconsistent state, return busy */
737 	if (os->os->os_dsl_dataset->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) {
738 		dmu_objset_close(os);
739 		return (EBUSY);
740 	}
741 
742 	/*
743 	 * NB: we need to wait for all in-flight changes to get to disk,
744 	 * so that we snapshot those changes.  zil_suspend does this as
745 	 * a side effect.
746 	 */
747 	err = zil_suspend(dmu_objset_zil(os));
748 	if (err == 0) {
749 		struct osnode *osn;
750 		dsl_sync_task_create(sn->dstg, dsl_dataset_snapshot_check,
751 		    dsl_dataset_snapshot_sync, os->os->os_dsl_dataset,
752 		    sn->snapname, 3);
753 		osn = kmem_alloc(sizeof (struct osnode), KM_SLEEP);
754 		osn->os = os;
755 		list_insert_tail(&sn->objsets, osn);
756 	} else {
757 		dmu_objset_close(os);
758 	}
759 
760 	return (err);
761 }
762 
763 int
764 dmu_objset_snapshot(char *fsname, char *snapname, boolean_t recursive)
765 {
766 	dsl_sync_task_t *dst;
767 	struct osnode *osn;
768 	struct snaparg sn = { 0 };
769 	spa_t *spa;
770 	int err;
771 
772 	(void) strcpy(sn.failed, fsname);
773 
774 	err = spa_open(fsname, &spa, FTAG);
775 	if (err)
776 		return (err);
777 
778 	sn.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
779 	sn.snapname = snapname;
780 	list_create(&sn.objsets, sizeof (struct osnode),
781 	    offsetof(struct osnode, node));
782 
783 	if (recursive) {
784 		sn.checkperms = B_TRUE;
785 		err = dmu_objset_find(fsname,
786 		    dmu_objset_snapshot_one, &sn, DS_FIND_CHILDREN);
787 	} else {
788 		sn.checkperms = B_FALSE;
789 		err = dmu_objset_snapshot_one(fsname, &sn);
790 	}
791 
792 	if (err)
793 		goto out;
794 
795 	err = dsl_sync_task_group_wait(sn.dstg);
796 
797 	for (dst = list_head(&sn.dstg->dstg_tasks); dst;
798 	    dst = list_next(&sn.dstg->dstg_tasks, dst)) {
799 		dsl_dataset_t *ds = dst->dst_arg1;
800 		if (dst->dst_err)
801 			dsl_dataset_name(ds, sn.failed);
802 	}
803 
804 out:
805 	while (osn = list_head(&sn.objsets)) {
806 		list_remove(&sn.objsets, osn);
807 		zil_resume(dmu_objset_zil(osn->os));
808 		dmu_objset_close(osn->os);
809 		kmem_free(osn, sizeof (struct osnode));
810 	}
811 	list_destroy(&sn.objsets);
812 
813 	if (err)
814 		(void) strcpy(fsname, sn.failed);
815 	dsl_sync_task_group_destroy(sn.dstg);
816 	spa_close(spa, FTAG);
817 	return (err);
818 }
819 
820 static void
821 dmu_objset_sync_dnodes(list_t *list, dmu_tx_t *tx)
822 {
823 	dnode_t *dn;
824 
825 	while (dn = list_head(list)) {
826 		ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
827 		ASSERT(dn->dn_dbuf->db_data_pending);
828 		/*
829 		 * Initialize dn_zio outside dnode_sync()
830 		 * to accomodate meta-dnode
831 		 */
832 		dn->dn_zio = dn->dn_dbuf->db_data_pending->dr_zio;
833 		ASSERT(dn->dn_zio);
834 
835 		ASSERT3U(dn->dn_nlevels, <=, DN_MAX_LEVELS);
836 		list_remove(list, dn);
837 		dnode_sync(dn, tx);
838 	}
839 }
840 
841 /* ARGSUSED */
842 static void
843 ready(zio_t *zio, arc_buf_t *abuf, void *arg)
844 {
845 	objset_impl_t *os = arg;
846 	blkptr_t *bp = os->os_rootbp;
847 	dnode_phys_t *dnp = &os->os_phys->os_meta_dnode;
848 	int i;
849 
850 	ASSERT(bp == zio->io_bp);
851 
852 	/*
853 	 * Update rootbp fill count.
854 	 */
855 	bp->blk_fill = 1;	/* count the meta-dnode */
856 	for (i = 0; i < dnp->dn_nblkptr; i++)
857 		bp->blk_fill += dnp->dn_blkptr[i].blk_fill;
858 
859 	BP_SET_TYPE(bp, DMU_OT_OBJSET);
860 	BP_SET_LEVEL(bp, 0);
861 
862 	/* We must do this after we've set the bp's type and level */
863 	if (!DVA_EQUAL(BP_IDENTITY(bp),
864 	    BP_IDENTITY(&zio->io_bp_orig))) {
865 		if (zio->io_bp_orig.blk_birth == os->os_synctx->tx_txg)
866 			(void) dsl_dataset_block_kill(os->os_dsl_dataset,
867 			    &zio->io_bp_orig, NULL, os->os_synctx);
868 		dsl_dataset_block_born(os->os_dsl_dataset, bp, os->os_synctx);
869 	}
870 }
871 
872 /* called from dsl */
873 void
874 dmu_objset_sync(objset_impl_t *os, zio_t *pio, dmu_tx_t *tx)
875 {
876 	int txgoff;
877 	zbookmark_t zb;
878 	writeprops_t wp = { 0 };
879 	zio_t *zio;
880 	list_t *list;
881 	dbuf_dirty_record_t *dr;
882 
883 	dprintf_ds(os->os_dsl_dataset, "txg=%llu\n", tx->tx_txg);
884 
885 	ASSERT(dmu_tx_is_syncing(tx));
886 	/* XXX the write_done callback should really give us the tx... */
887 	os->os_synctx = tx;
888 
889 	if (os->os_dsl_dataset == NULL) {
890 		/*
891 		 * This is the MOS.  If we have upgraded,
892 		 * spa_max_replication() could change, so reset
893 		 * os_copies here.
894 		 */
895 		os->os_copies = spa_max_replication(os->os_spa);
896 	}
897 
898 	/*
899 	 * Create the root block IO
900 	 */
901 	zb.zb_objset = os->os_dsl_dataset ? os->os_dsl_dataset->ds_object : 0;
902 	zb.zb_object = 0;
903 	zb.zb_level = -1;
904 	zb.zb_blkid = 0;
905 	if (BP_IS_OLDER(os->os_rootbp, tx->tx_txg)) {
906 		(void) dsl_dataset_block_kill(os->os_dsl_dataset,
907 		    os->os_rootbp, pio, tx);
908 	}
909 	wp.wp_type = DMU_OT_OBJSET;
910 	wp.wp_copies = os->os_copies;
911 	wp.wp_level = (uint8_t)-1;
912 	wp.wp_oschecksum = os->os_checksum;
913 	wp.wp_oscompress = os->os_compress;
914 	arc_release(os->os_phys_buf, &os->os_phys_buf);
915 	zio = arc_write(pio, os->os_spa, &wp,
916 	    DMU_OS_IS_L2CACHEABLE(os), tx->tx_txg, os->os_rootbp,
917 	    os->os_phys_buf, ready, NULL, os, ZIO_PRIORITY_ASYNC_WRITE,
918 	    ZIO_FLAG_MUSTSUCCEED | ZIO_FLAG_METADATA, &zb);
919 
920 	/*
921 	 * Sync meta-dnode - the parent IO for the sync is the root block
922 	 */
923 	os->os_meta_dnode->dn_zio = zio;
924 	dnode_sync(os->os_meta_dnode, tx);
925 
926 	txgoff = tx->tx_txg & TXG_MASK;
927 
928 	dmu_objset_sync_dnodes(&os->os_free_dnodes[txgoff], tx);
929 	dmu_objset_sync_dnodes(&os->os_dirty_dnodes[txgoff], tx);
930 
931 	list = &os->os_meta_dnode->dn_dirty_records[txgoff];
932 	while (dr = list_head(list)) {
933 		ASSERT(dr->dr_dbuf->db_level == 0);
934 		list_remove(list, dr);
935 		if (dr->dr_zio)
936 			zio_nowait(dr->dr_zio);
937 	}
938 	/*
939 	 * Free intent log blocks up to this tx.
940 	 */
941 	zil_sync(os->os_zil, tx);
942 	os->os_phys->os_zil_header = os->os_zil_header;
943 	zio_nowait(zio);
944 }
945 
946 void
947 dmu_objset_space(objset_t *os, uint64_t *refdbytesp, uint64_t *availbytesp,
948     uint64_t *usedobjsp, uint64_t *availobjsp)
949 {
950 	dsl_dataset_space(os->os->os_dsl_dataset, refdbytesp, availbytesp,
951 	    usedobjsp, availobjsp);
952 }
953 
954 uint64_t
955 dmu_objset_fsid_guid(objset_t *os)
956 {
957 	return (dsl_dataset_fsid_guid(os->os->os_dsl_dataset));
958 }
959 
960 void
961 dmu_objset_fast_stat(objset_t *os, dmu_objset_stats_t *stat)
962 {
963 	stat->dds_type = os->os->os_phys->os_type;
964 	if (os->os->os_dsl_dataset)
965 		dsl_dataset_fast_stat(os->os->os_dsl_dataset, stat);
966 }
967 
968 void
969 dmu_objset_stats(objset_t *os, nvlist_t *nv)
970 {
971 	ASSERT(os->os->os_dsl_dataset ||
972 	    os->os->os_phys->os_type == DMU_OST_META);
973 
974 	if (os->os->os_dsl_dataset != NULL)
975 		dsl_dataset_stats(os->os->os_dsl_dataset, nv);
976 
977 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_TYPE,
978 	    os->os->os_phys->os_type);
979 }
980 
981 int
982 dmu_objset_is_snapshot(objset_t *os)
983 {
984 	if (os->os->os_dsl_dataset != NULL)
985 		return (dsl_dataset_is_snapshot(os->os->os_dsl_dataset));
986 	else
987 		return (B_FALSE);
988 }
989 
990 int
991 dmu_snapshot_realname(objset_t *os, char *name, char *real, int maxlen,
992     boolean_t *conflict)
993 {
994 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
995 	uint64_t ignored;
996 
997 	if (ds->ds_phys->ds_snapnames_zapobj == 0)
998 		return (ENOENT);
999 
1000 	return (zap_lookup_norm(ds->ds_dir->dd_pool->dp_meta_objset,
1001 	    ds->ds_phys->ds_snapnames_zapobj, name, 8, 1, &ignored, MT_FIRST,
1002 	    real, maxlen, conflict));
1003 }
1004 
1005 int
1006 dmu_snapshot_list_next(objset_t *os, int namelen, char *name,
1007     uint64_t *idp, uint64_t *offp, boolean_t *case_conflict)
1008 {
1009 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
1010 	zap_cursor_t cursor;
1011 	zap_attribute_t attr;
1012 
1013 	if (ds->ds_phys->ds_snapnames_zapobj == 0)
1014 		return (ENOENT);
1015 
1016 	zap_cursor_init_serialized(&cursor,
1017 	    ds->ds_dir->dd_pool->dp_meta_objset,
1018 	    ds->ds_phys->ds_snapnames_zapobj, *offp);
1019 
1020 	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1021 		zap_cursor_fini(&cursor);
1022 		return (ENOENT);
1023 	}
1024 
1025 	if (strlen(attr.za_name) + 1 > namelen) {
1026 		zap_cursor_fini(&cursor);
1027 		return (ENAMETOOLONG);
1028 	}
1029 
1030 	(void) strcpy(name, attr.za_name);
1031 	if (idp)
1032 		*idp = attr.za_first_integer;
1033 	if (case_conflict)
1034 		*case_conflict = attr.za_normalization_conflict;
1035 	zap_cursor_advance(&cursor);
1036 	*offp = zap_cursor_serialize(&cursor);
1037 	zap_cursor_fini(&cursor);
1038 
1039 	return (0);
1040 }
1041 
1042 int
1043 dmu_dir_list_next(objset_t *os, int namelen, char *name,
1044     uint64_t *idp, uint64_t *offp)
1045 {
1046 	dsl_dir_t *dd = os->os->os_dsl_dataset->ds_dir;
1047 	zap_cursor_t cursor;
1048 	zap_attribute_t attr;
1049 
1050 	/* there is no next dir on a snapshot! */
1051 	if (os->os->os_dsl_dataset->ds_object !=
1052 	    dd->dd_phys->dd_head_dataset_obj)
1053 		return (ENOENT);
1054 
1055 	zap_cursor_init_serialized(&cursor,
1056 	    dd->dd_pool->dp_meta_objset,
1057 	    dd->dd_phys->dd_child_dir_zapobj, *offp);
1058 
1059 	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1060 		zap_cursor_fini(&cursor);
1061 		return (ENOENT);
1062 	}
1063 
1064 	if (strlen(attr.za_name) + 1 > namelen) {
1065 		zap_cursor_fini(&cursor);
1066 		return (ENAMETOOLONG);
1067 	}
1068 
1069 	(void) strcpy(name, attr.za_name);
1070 	if (idp)
1071 		*idp = attr.za_first_integer;
1072 	zap_cursor_advance(&cursor);
1073 	*offp = zap_cursor_serialize(&cursor);
1074 	zap_cursor_fini(&cursor);
1075 
1076 	return (0);
1077 }
1078 
1079 struct findarg {
1080 	int (*func)(char *, void *);
1081 	void *arg;
1082 };
1083 
1084 /* ARGSUSED */
1085 static int
1086 findfunc(spa_t *spa, uint64_t dsobj, const char *dsname, void *arg)
1087 {
1088 	struct findarg *fa = arg;
1089 	return (fa->func((char *)dsname, fa->arg));
1090 }
1091 
1092 /*
1093  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1094  * Perhaps change all callers to use dmu_objset_find_spa()?
1095  */
1096 int
1097 dmu_objset_find(char *name, int func(char *, void *), void *arg, int flags)
1098 {
1099 	struct findarg fa;
1100 	fa.func = func;
1101 	fa.arg = arg;
1102 	return (dmu_objset_find_spa(NULL, name, findfunc, &fa, flags));
1103 }
1104 
1105 /*
1106  * Find all objsets under name, call func on each
1107  */
1108 int
1109 dmu_objset_find_spa(spa_t *spa, const char *name,
1110     int func(spa_t *, uint64_t, const char *, void *), void *arg, int flags)
1111 {
1112 	dsl_dir_t *dd;
1113 	dsl_pool_t *dp;
1114 	dsl_dataset_t *ds;
1115 	zap_cursor_t zc;
1116 	zap_attribute_t *attr;
1117 	char *child;
1118 	uint64_t thisobj;
1119 	int err;
1120 
1121 	if (name == NULL)
1122 		name = spa_name(spa);
1123 	err = dsl_dir_open_spa(spa, name, FTAG, &dd, NULL);
1124 	if (err)
1125 		return (err);
1126 
1127 	/* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1128 	if (dd->dd_myname[0] == '$') {
1129 		dsl_dir_close(dd, FTAG);
1130 		return (0);
1131 	}
1132 
1133 	thisobj = dd->dd_phys->dd_head_dataset_obj;
1134 	attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1135 	dp = dd->dd_pool;
1136 
1137 	/*
1138 	 * Iterate over all children.
1139 	 */
1140 	if (flags & DS_FIND_CHILDREN) {
1141 		for (zap_cursor_init(&zc, dp->dp_meta_objset,
1142 		    dd->dd_phys->dd_child_dir_zapobj);
1143 		    zap_cursor_retrieve(&zc, attr) == 0;
1144 		    (void) zap_cursor_advance(&zc)) {
1145 			ASSERT(attr->za_integer_length == sizeof (uint64_t));
1146 			ASSERT(attr->za_num_integers == 1);
1147 
1148 			child = kmem_alloc(MAXPATHLEN, KM_SLEEP);
1149 			(void) strcpy(child, name);
1150 			(void) strcat(child, "/");
1151 			(void) strcat(child, attr->za_name);
1152 			err = dmu_objset_find_spa(spa, child, func, arg, flags);
1153 			kmem_free(child, MAXPATHLEN);
1154 			if (err)
1155 				break;
1156 		}
1157 		zap_cursor_fini(&zc);
1158 
1159 		if (err) {
1160 			dsl_dir_close(dd, FTAG);
1161 			kmem_free(attr, sizeof (zap_attribute_t));
1162 			return (err);
1163 		}
1164 	}
1165 
1166 	/*
1167 	 * Iterate over all snapshots.
1168 	 */
1169 	if (flags & DS_FIND_SNAPSHOTS) {
1170 		if (!dsl_pool_sync_context(dp))
1171 			rw_enter(&dp->dp_config_rwlock, RW_READER);
1172 		err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1173 		if (!dsl_pool_sync_context(dp))
1174 			rw_exit(&dp->dp_config_rwlock);
1175 
1176 		if (err == 0) {
1177 			uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
1178 			dsl_dataset_rele(ds, FTAG);
1179 
1180 			for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1181 			    zap_cursor_retrieve(&zc, attr) == 0;
1182 			    (void) zap_cursor_advance(&zc)) {
1183 				ASSERT(attr->za_integer_length ==
1184 				    sizeof (uint64_t));
1185 				ASSERT(attr->za_num_integers == 1);
1186 
1187 				child = kmem_alloc(MAXPATHLEN, KM_SLEEP);
1188 				(void) strcpy(child, name);
1189 				(void) strcat(child, "@");
1190 				(void) strcat(child, attr->za_name);
1191 				err = func(spa, attr->za_first_integer,
1192 				    child, arg);
1193 				kmem_free(child, MAXPATHLEN);
1194 				if (err)
1195 					break;
1196 			}
1197 			zap_cursor_fini(&zc);
1198 		}
1199 	}
1200 
1201 	dsl_dir_close(dd, FTAG);
1202 	kmem_free(attr, sizeof (zap_attribute_t));
1203 
1204 	if (err)
1205 		return (err);
1206 
1207 	/*
1208 	 * Apply to self if appropriate.
1209 	 */
1210 	err = func(spa, thisobj, name, arg);
1211 	return (err);
1212 }
1213 
1214 void
1215 dmu_objset_set_user(objset_t *os, void *user_ptr)
1216 {
1217 	ASSERT(MUTEX_HELD(&os->os->os_user_ptr_lock));
1218 	os->os->os_user_ptr = user_ptr;
1219 }
1220 
1221 void *
1222 dmu_objset_get_user(objset_t *os)
1223 {
1224 	ASSERT(MUTEX_HELD(&os->os->os_user_ptr_lock));
1225 	return (os->os->os_user_ptr);
1226 }
1227