xref: /titanic_51/usr/src/uts/common/fs/zfs/dmu_objset.c (revision b7661ccca92e6bf5160f4d5d2601efaeaa1f5161)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/zfs_context.h>
29 #include <sys/dmu_objset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dsl_dataset.h>
32 #include <sys/dsl_prop.h>
33 #include <sys/dsl_pool.h>
34 #include <sys/dsl_synctask.h>
35 #include <sys/dnode.h>
36 #include <sys/dbuf.h>
37 #include <sys/zvol.h>
38 #include <sys/dmu_tx.h>
39 #include <sys/zio_checksum.h>
40 #include <sys/zap.h>
41 #include <sys/zil.h>
42 #include <sys/dmu_impl.h>
43 
44 
45 spa_t *
46 dmu_objset_spa(objset_t *os)
47 {
48 	return (os->os->os_spa);
49 }
50 
51 zilog_t *
52 dmu_objset_zil(objset_t *os)
53 {
54 	return (os->os->os_zil);
55 }
56 
57 dsl_pool_t *
58 dmu_objset_pool(objset_t *os)
59 {
60 	dsl_dataset_t *ds;
61 
62 	if ((ds = os->os->os_dsl_dataset) != NULL && ds->ds_dir)
63 		return (ds->ds_dir->dd_pool);
64 	else
65 		return (spa_get_dsl(os->os->os_spa));
66 }
67 
68 dsl_dataset_t *
69 dmu_objset_ds(objset_t *os)
70 {
71 	return (os->os->os_dsl_dataset);
72 }
73 
74 dmu_objset_type_t
75 dmu_objset_type(objset_t *os)
76 {
77 	return (os->os->os_phys->os_type);
78 }
79 
80 void
81 dmu_objset_name(objset_t *os, char *buf)
82 {
83 	dsl_dataset_name(os->os->os_dsl_dataset, buf);
84 }
85 
86 uint64_t
87 dmu_objset_id(objset_t *os)
88 {
89 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
90 
91 	return (ds ? ds->ds_object : 0);
92 }
93 
94 static void
95 checksum_changed_cb(void *arg, uint64_t newval)
96 {
97 	objset_impl_t *osi = arg;
98 
99 	/*
100 	 * Inheritance should have been done by now.
101 	 */
102 	ASSERT(newval != ZIO_CHECKSUM_INHERIT);
103 
104 	osi->os_checksum = zio_checksum_select(newval, ZIO_CHECKSUM_ON_VALUE);
105 }
106 
107 static void
108 compression_changed_cb(void *arg, uint64_t newval)
109 {
110 	objset_impl_t *osi = arg;
111 
112 	/*
113 	 * Inheritance and range checking should have been done by now.
114 	 */
115 	ASSERT(newval != ZIO_COMPRESS_INHERIT);
116 
117 	osi->os_compress = zio_compress_select(newval, ZIO_COMPRESS_ON_VALUE);
118 }
119 
120 static void
121 copies_changed_cb(void *arg, uint64_t newval)
122 {
123 	objset_impl_t *osi = arg;
124 
125 	/*
126 	 * Inheritance and range checking should have been done by now.
127 	 */
128 	ASSERT(newval > 0);
129 	ASSERT(newval <= spa_max_replication(osi->os_spa));
130 
131 	osi->os_copies = newval;
132 }
133 
134 void
135 dmu_objset_byteswap(void *buf, size_t size)
136 {
137 	objset_phys_t *osp = buf;
138 
139 	ASSERT(size == sizeof (objset_phys_t));
140 	dnode_byteswap(&osp->os_meta_dnode);
141 	byteswap_uint64_array(&osp->os_zil_header, sizeof (zil_header_t));
142 	osp->os_type = BSWAP_64(osp->os_type);
143 }
144 
145 int
146 dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
147     objset_impl_t **osip)
148 {
149 	objset_impl_t *winner, *osi;
150 	int i, err, checksum;
151 
152 	osi = kmem_zalloc(sizeof (objset_impl_t), KM_SLEEP);
153 	osi->os.os = osi;
154 	osi->os_dsl_dataset = ds;
155 	osi->os_spa = spa;
156 	osi->os_rootbp = bp;
157 	if (!BP_IS_HOLE(osi->os_rootbp)) {
158 		uint32_t aflags = ARC_WAIT;
159 		zbookmark_t zb;
160 		zb.zb_objset = ds ? ds->ds_object : 0;
161 		zb.zb_object = 0;
162 		zb.zb_level = -1;
163 		zb.zb_blkid = 0;
164 
165 		dprintf_bp(osi->os_rootbp, "reading %s", "");
166 		err = arc_read(NULL, spa, osi->os_rootbp,
167 		    dmu_ot[DMU_OT_OBJSET].ot_byteswap,
168 		    arc_getbuf_func, &osi->os_phys_buf,
169 		    ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL, &aflags, &zb);
170 		if (err) {
171 			kmem_free(osi, sizeof (objset_impl_t));
172 			return (err);
173 		}
174 		osi->os_phys = osi->os_phys_buf->b_data;
175 		arc_release(osi->os_phys_buf, &osi->os_phys_buf);
176 	} else {
177 		osi->os_phys_buf = arc_buf_alloc(spa, sizeof (objset_phys_t),
178 		    &osi->os_phys_buf, ARC_BUFC_METADATA);
179 		osi->os_phys = osi->os_phys_buf->b_data;
180 		bzero(osi->os_phys, sizeof (objset_phys_t));
181 	}
182 
183 	/*
184 	 * Note: the changed_cb will be called once before the register
185 	 * func returns, thus changing the checksum/compression from the
186 	 * default (fletcher2/off).  Snapshots don't need to know, and
187 	 * registering would complicate clone promotion.
188 	 */
189 	if (ds && ds->ds_phys->ds_num_children == 0) {
190 		err = dsl_prop_register(ds, "checksum",
191 		    checksum_changed_cb, osi);
192 		if (err == 0)
193 			err = dsl_prop_register(ds, "compression",
194 			    compression_changed_cb, osi);
195 		if (err == 0)
196 			err = dsl_prop_register(ds, "copies",
197 			    copies_changed_cb, osi);
198 		if (err) {
199 			VERIFY(arc_buf_remove_ref(osi->os_phys_buf,
200 			    &osi->os_phys_buf) == 1);
201 			kmem_free(osi, sizeof (objset_impl_t));
202 			return (err);
203 		}
204 	} else if (ds == NULL) {
205 		/* It's the meta-objset. */
206 		osi->os_checksum = ZIO_CHECKSUM_FLETCHER_4;
207 		osi->os_compress = ZIO_COMPRESS_LZJB;
208 		osi->os_copies = spa_max_replication(spa);
209 	}
210 
211 	osi->os_zil = zil_alloc(&osi->os, &osi->os_phys->os_zil_header);
212 
213 	/*
214 	 * Metadata always gets compressed and checksummed.
215 	 * If the data checksum is multi-bit correctable, and it's not
216 	 * a ZBT-style checksum, then it's suitable for metadata as well.
217 	 * Otherwise, the metadata checksum defaults to fletcher4.
218 	 */
219 	checksum = osi->os_checksum;
220 
221 	if (zio_checksum_table[checksum].ci_correctable &&
222 	    !zio_checksum_table[checksum].ci_zbt)
223 		osi->os_md_checksum = checksum;
224 	else
225 		osi->os_md_checksum = ZIO_CHECKSUM_FLETCHER_4;
226 	osi->os_md_compress = ZIO_COMPRESS_LZJB;
227 
228 	for (i = 0; i < TXG_SIZE; i++) {
229 		list_create(&osi->os_dirty_dnodes[i], sizeof (dnode_t),
230 		    offsetof(dnode_t, dn_dirty_link[i]));
231 		list_create(&osi->os_free_dnodes[i], sizeof (dnode_t),
232 		    offsetof(dnode_t, dn_dirty_link[i]));
233 	}
234 	list_create(&osi->os_dnodes, sizeof (dnode_t),
235 	    offsetof(dnode_t, dn_link));
236 	list_create(&osi->os_downgraded_dbufs, sizeof (dmu_buf_impl_t),
237 	    offsetof(dmu_buf_impl_t, db_link));
238 
239 	mutex_init(&osi->os_lock, NULL, MUTEX_DEFAULT, NULL);
240 	mutex_init(&osi->os_obj_lock, NULL, MUTEX_DEFAULT, NULL);
241 
242 	osi->os_meta_dnode = dnode_special_open(osi,
243 	    &osi->os_phys->os_meta_dnode, DMU_META_DNODE_OBJECT);
244 
245 	if (ds != NULL) {
246 		winner = dsl_dataset_set_user_ptr(ds, osi, dmu_objset_evict);
247 		if (winner) {
248 			dmu_objset_evict(ds, osi);
249 			osi = winner;
250 		}
251 	}
252 
253 	*osip = osi;
254 	return (0);
255 }
256 
257 /* called from zpl */
258 int
259 dmu_objset_open(const char *name, dmu_objset_type_t type, int mode,
260     objset_t **osp)
261 {
262 	dsl_dataset_t *ds;
263 	int err;
264 	objset_t *os;
265 	objset_impl_t *osi;
266 
267 	os = kmem_alloc(sizeof (objset_t), KM_SLEEP);
268 	err = dsl_dataset_open(name, mode, os, &ds);
269 	if (err) {
270 		kmem_free(os, sizeof (objset_t));
271 		return (err);
272 	}
273 
274 	osi = dsl_dataset_get_user_ptr(ds);
275 	if (osi == NULL) {
276 		err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
277 		    ds, &ds->ds_phys->ds_bp, &osi);
278 		if (err) {
279 			dsl_dataset_close(ds, mode, os);
280 			kmem_free(os, sizeof (objset_t));
281 			return (err);
282 		}
283 	}
284 
285 	os->os = osi;
286 	os->os_mode = mode;
287 
288 	if (type != DMU_OST_ANY && type != os->os->os_phys->os_type) {
289 		dmu_objset_close(os);
290 		return (EINVAL);
291 	}
292 	*osp = os;
293 	return (0);
294 }
295 
296 void
297 dmu_objset_close(objset_t *os)
298 {
299 	dsl_dataset_close(os->os->os_dsl_dataset, os->os_mode, os);
300 	kmem_free(os, sizeof (objset_t));
301 }
302 
303 int
304 dmu_objset_evict_dbufs(objset_t *os, int try)
305 {
306 	objset_impl_t *osi = os->os;
307 	dnode_t *dn;
308 
309 	mutex_enter(&osi->os_lock);
310 
311 	/* process the mdn last, since the other dnodes have holds on it */
312 	list_remove(&osi->os_dnodes, osi->os_meta_dnode);
313 	list_insert_tail(&osi->os_dnodes, osi->os_meta_dnode);
314 
315 	/*
316 	 * Find the first dnode with holds.  We have to do this dance
317 	 * because dnode_add_ref() only works if you already have a
318 	 * hold.  If there are no holds then it has no dbufs so OK to
319 	 * skip.
320 	 */
321 	for (dn = list_head(&osi->os_dnodes);
322 	    dn && refcount_is_zero(&dn->dn_holds);
323 	    dn = list_next(&osi->os_dnodes, dn))
324 		continue;
325 	if (dn)
326 		dnode_add_ref(dn, FTAG);
327 
328 	while (dn) {
329 		dnode_t *next_dn = dn;
330 
331 		do {
332 			next_dn = list_next(&osi->os_dnodes, next_dn);
333 		} while (next_dn && refcount_is_zero(&next_dn->dn_holds));
334 		if (next_dn)
335 			dnode_add_ref(next_dn, FTAG);
336 
337 		mutex_exit(&osi->os_lock);
338 		if (dnode_evict_dbufs(dn, try)) {
339 			dnode_rele(dn, FTAG);
340 			if (next_dn)
341 				dnode_rele(next_dn, FTAG);
342 			return (1);
343 		}
344 		dnode_rele(dn, FTAG);
345 		mutex_enter(&osi->os_lock);
346 		dn = next_dn;
347 	}
348 	mutex_exit(&osi->os_lock);
349 	return (0);
350 }
351 
352 void
353 dmu_objset_evict(dsl_dataset_t *ds, void *arg)
354 {
355 	objset_impl_t *osi = arg;
356 	objset_t os;
357 	int i;
358 
359 	for (i = 0; i < TXG_SIZE; i++) {
360 		ASSERT(list_head(&osi->os_dirty_dnodes[i]) == NULL);
361 		ASSERT(list_head(&osi->os_free_dnodes[i]) == NULL);
362 	}
363 
364 	if (ds && ds->ds_phys->ds_num_children == 0) {
365 		VERIFY(0 == dsl_prop_unregister(ds, "checksum",
366 		    checksum_changed_cb, osi));
367 		VERIFY(0 == dsl_prop_unregister(ds, "compression",
368 		    compression_changed_cb, osi));
369 		VERIFY(0 == dsl_prop_unregister(ds, "copies",
370 		    copies_changed_cb, osi));
371 	}
372 
373 	/*
374 	 * We should need only a single pass over the dnode list, since
375 	 * nothing can be added to the list at this point.
376 	 */
377 	os.os = osi;
378 	(void) dmu_objset_evict_dbufs(&os, 0);
379 
380 	ASSERT3P(list_head(&osi->os_dnodes), ==, osi->os_meta_dnode);
381 	ASSERT3P(list_tail(&osi->os_dnodes), ==, osi->os_meta_dnode);
382 	ASSERT3P(list_head(&osi->os_meta_dnode->dn_dbufs), ==, NULL);
383 
384 	dnode_special_close(osi->os_meta_dnode);
385 	zil_free(osi->os_zil);
386 
387 	VERIFY(arc_buf_remove_ref(osi->os_phys_buf, &osi->os_phys_buf) == 1);
388 	mutex_destroy(&osi->os_lock);
389 	mutex_destroy(&osi->os_obj_lock);
390 	kmem_free(osi, sizeof (objset_impl_t));
391 }
392 
393 /* called from dsl for meta-objset */
394 objset_impl_t *
395 dmu_objset_create_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
396     dmu_objset_type_t type, dmu_tx_t *tx)
397 {
398 	objset_impl_t *osi;
399 	dnode_t *mdn;
400 
401 	ASSERT(dmu_tx_is_syncing(tx));
402 	VERIFY(0 == dmu_objset_open_impl(spa, ds, bp, &osi));
403 	mdn = osi->os_meta_dnode;
404 
405 	dnode_allocate(mdn, DMU_OT_DNODE, 1 << DNODE_BLOCK_SHIFT,
406 	    DN_MAX_INDBLKSHIFT, DMU_OT_NONE, 0, tx);
407 
408 	/*
409 	 * We don't want to have to increase the meta-dnode's nlevels
410 	 * later, because then we could do it in quescing context while
411 	 * we are also accessing it in open context.
412 	 *
413 	 * This precaution is not necessary for the MOS (ds == NULL),
414 	 * because the MOS is only updated in syncing context.
415 	 * This is most fortunate: the MOS is the only objset that
416 	 * needs to be synced multiple times as spa_sync() iterates
417 	 * to convergence, so minimizing its dn_nlevels matters.
418 	 */
419 	if (ds != NULL) {
420 		int levels = 1;
421 
422 		/*
423 		 * Determine the number of levels necessary for the meta-dnode
424 		 * to contain DN_MAX_OBJECT dnodes.
425 		 */
426 		while ((uint64_t)mdn->dn_nblkptr << (mdn->dn_datablkshift +
427 		    (levels - 1) * (mdn->dn_indblkshift - SPA_BLKPTRSHIFT)) <
428 		    DN_MAX_OBJECT * sizeof (dnode_phys_t))
429 			levels++;
430 
431 		mdn->dn_next_nlevels[tx->tx_txg & TXG_MASK] =
432 		    mdn->dn_nlevels = levels;
433 	}
434 
435 	ASSERT(type != DMU_OST_NONE);
436 	ASSERT(type != DMU_OST_ANY);
437 	ASSERT(type < DMU_OST_NUMTYPES);
438 	osi->os_phys->os_type = type;
439 
440 	dsl_dataset_dirty(ds, tx);
441 
442 	return (osi);
443 }
444 
445 struct oscarg {
446 	void (*userfunc)(objset_t *os, void *arg, dmu_tx_t *tx);
447 	void *userarg;
448 	dsl_dataset_t *clone_parent;
449 	const char *lastname;
450 	dmu_objset_type_t type;
451 };
452 
453 /* ARGSUSED */
454 static int
455 dmu_objset_create_check(void *arg1, void *arg2, dmu_tx_t *tx)
456 {
457 	dsl_dir_t *dd = arg1;
458 	struct oscarg *oa = arg2;
459 	objset_t *mos = dd->dd_pool->dp_meta_objset;
460 	int err;
461 	uint64_t ddobj;
462 
463 	err = zap_lookup(mos, dd->dd_phys->dd_child_dir_zapobj,
464 	    oa->lastname, sizeof (uint64_t), 1, &ddobj);
465 	if (err != ENOENT)
466 		return (err ? err : EEXIST);
467 
468 	if (oa->clone_parent != NULL) {
469 		/*
470 		 * You can't clone across pools.
471 		 */
472 		if (oa->clone_parent->ds_dir->dd_pool != dd->dd_pool)
473 			return (EXDEV);
474 
475 		/*
476 		 * You can only clone snapshots, not the head datasets.
477 		 */
478 		if (oa->clone_parent->ds_phys->ds_num_children == 0)
479 			return (EINVAL);
480 	}
481 	return (0);
482 }
483 
484 static void
485 dmu_objset_create_sync(void *arg1, void *arg2, dmu_tx_t *tx)
486 {
487 	dsl_dir_t *dd = arg1;
488 	struct oscarg *oa = arg2;
489 	dsl_dataset_t *ds;
490 	blkptr_t *bp;
491 	uint64_t dsobj;
492 
493 	ASSERT(dmu_tx_is_syncing(tx));
494 
495 	dsobj = dsl_dataset_create_sync(dd, oa->lastname,
496 	    oa->clone_parent, tx);
497 
498 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, dsobj, NULL,
499 	    DS_MODE_STANDARD | DS_MODE_READONLY, FTAG, &ds));
500 	bp = dsl_dataset_get_blkptr(ds);
501 	if (BP_IS_HOLE(bp)) {
502 		objset_impl_t *osi;
503 
504 		/* This is an empty dmu_objset; not a clone. */
505 		osi = dmu_objset_create_impl(dsl_dataset_get_spa(ds),
506 		    ds, bp, oa->type, tx);
507 
508 		if (oa->userfunc)
509 			oa->userfunc(&osi->os, oa->userarg, tx);
510 	}
511 	dsl_dataset_close(ds, DS_MODE_STANDARD | DS_MODE_READONLY, FTAG);
512 }
513 
514 int
515 dmu_objset_create(const char *name, dmu_objset_type_t type,
516     objset_t *clone_parent,
517     void (*func)(objset_t *os, void *arg, dmu_tx_t *tx), void *arg)
518 {
519 	dsl_dir_t *pdd;
520 	const char *tail;
521 	int err = 0;
522 	struct oscarg oa = { 0 };
523 
524 	ASSERT(strchr(name, '@') == NULL);
525 	err = dsl_dir_open(name, FTAG, &pdd, &tail);
526 	if (err)
527 		return (err);
528 	if (tail == NULL) {
529 		dsl_dir_close(pdd, FTAG);
530 		return (EEXIST);
531 	}
532 
533 	dprintf("name=%s\n", name);
534 
535 	oa.userfunc = func;
536 	oa.userarg = arg;
537 	oa.lastname = tail;
538 	oa.type = type;
539 	if (clone_parent != NULL) {
540 		/*
541 		 * You can't clone to a different type.
542 		 */
543 		if (clone_parent->os->os_phys->os_type != type) {
544 			dsl_dir_close(pdd, FTAG);
545 			return (EINVAL);
546 		}
547 		oa.clone_parent = clone_parent->os->os_dsl_dataset;
548 	}
549 	err = dsl_sync_task_do(pdd->dd_pool, dmu_objset_create_check,
550 	    dmu_objset_create_sync, pdd, &oa, 5);
551 	dsl_dir_close(pdd, FTAG);
552 	return (err);
553 }
554 
555 int
556 dmu_objset_destroy(const char *name)
557 {
558 	objset_t *os;
559 	int error;
560 
561 	/*
562 	 * If it looks like we'll be able to destroy it, and there's
563 	 * an unplayed replay log sitting around, destroy the log.
564 	 * It would be nicer to do this in dsl_dataset_destroy_sync(),
565 	 * but the replay log objset is modified in open context.
566 	 */
567 	error = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_EXCLUSIVE, &os);
568 	if (error == 0) {
569 		zil_destroy(dmu_objset_zil(os), B_FALSE);
570 		dmu_objset_close(os);
571 	}
572 
573 	return (dsl_dataset_destroy(name));
574 }
575 
576 int
577 dmu_objset_rollback(const char *name)
578 {
579 	int err;
580 	objset_t *os;
581 
582 	err = dmu_objset_open(name, DMU_OST_ANY,
583 	    DS_MODE_EXCLUSIVE | DS_MODE_INCONSISTENT, &os);
584 	if (err == 0) {
585 		err = zil_suspend(dmu_objset_zil(os));
586 		if (err == 0)
587 			zil_resume(dmu_objset_zil(os));
588 		if (err == 0) {
589 			/* XXX uncache everything? */
590 			err = dsl_dataset_rollback(os->os->os_dsl_dataset);
591 		}
592 		dmu_objset_close(os);
593 	}
594 	return (err);
595 }
596 
597 struct snaparg {
598 	dsl_sync_task_group_t *dstg;
599 	char *snapname;
600 	char failed[MAXPATHLEN];
601 };
602 
603 static int
604 dmu_objset_snapshot_one(char *name, void *arg)
605 {
606 	struct snaparg *sn = arg;
607 	objset_t *os;
608 	dmu_objset_stats_t stat;
609 	int err;
610 
611 	(void) strcpy(sn->failed, name);
612 
613 	err = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_STANDARD, &os);
614 	if (err != 0)
615 		return (err);
616 
617 	/*
618 	 * If the objset is in an inconsistent state, return busy.
619 	 */
620 	dmu_objset_fast_stat(os, &stat);
621 	if (stat.dds_inconsistent) {
622 		dmu_objset_close(os);
623 		return (EBUSY);
624 	}
625 
626 	/*
627 	 * NB: we need to wait for all in-flight changes to get to disk,
628 	 * so that we snapshot those changes.  zil_suspend does this as
629 	 * a side effect.
630 	 */
631 	err = zil_suspend(dmu_objset_zil(os));
632 	if (err == 0) {
633 		dsl_sync_task_create(sn->dstg, dsl_dataset_snapshot_check,
634 		    dsl_dataset_snapshot_sync, os, sn->snapname, 3);
635 	} else {
636 		dmu_objset_close(os);
637 	}
638 
639 	return (err);
640 }
641 
642 int
643 dmu_objset_snapshot(char *fsname, char *snapname, boolean_t recursive)
644 {
645 	dsl_sync_task_t *dst;
646 	struct snaparg sn = { 0 };
647 	char *cp;
648 	spa_t *spa;
649 	int err;
650 
651 	(void) strcpy(sn.failed, fsname);
652 
653 	cp = strchr(fsname, '/');
654 	if (cp) {
655 		*cp = '\0';
656 		err = spa_open(fsname, &spa, FTAG);
657 		*cp = '/';
658 	} else {
659 		err = spa_open(fsname, &spa, FTAG);
660 	}
661 	if (err)
662 		return (err);
663 
664 	sn.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
665 	sn.snapname = snapname;
666 
667 	if (recursive) {
668 		err = dmu_objset_find(fsname,
669 		    dmu_objset_snapshot_one, &sn, DS_FIND_CHILDREN);
670 	} else {
671 		err = dmu_objset_snapshot_one(fsname, &sn);
672 	}
673 
674 	if (err)
675 		goto out;
676 
677 	err = dsl_sync_task_group_wait(sn.dstg);
678 
679 	for (dst = list_head(&sn.dstg->dstg_tasks); dst;
680 	    dst = list_next(&sn.dstg->dstg_tasks, dst)) {
681 		objset_t *os = dst->dst_arg1;
682 		if (dst->dst_err)
683 			dmu_objset_name(os, sn.failed);
684 		zil_resume(dmu_objset_zil(os));
685 		dmu_objset_close(os);
686 	}
687 out:
688 	if (err)
689 		(void) strcpy(fsname, sn.failed);
690 	dsl_sync_task_group_destroy(sn.dstg);
691 	spa_close(spa, FTAG);
692 	return (err);
693 }
694 
695 static void
696 dmu_objset_sync_dnodes(list_t *list, dmu_tx_t *tx)
697 {
698 	dnode_t *dn;
699 
700 	while (dn = list_head(list)) {
701 		ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
702 		ASSERT(dn->dn_dbuf->db_data_pending);
703 		/*
704 		 * Initialize dn_zio outside dnode_sync()
705 		 * to accomodate meta-dnode
706 		 */
707 		dn->dn_zio = dn->dn_dbuf->db_data_pending->dr_zio;
708 		ASSERT(dn->dn_zio);
709 
710 		ASSERT3U(dn->dn_nlevels, <=, DN_MAX_LEVELS);
711 		list_remove(list, dn);
712 		dnode_sync(dn, tx);
713 	}
714 }
715 
716 /* ARGSUSED */
717 static void
718 ready(zio_t *zio, arc_buf_t *abuf, void *arg)
719 {
720 	objset_impl_t *os = arg;
721 	blkptr_t *bp = os->os_rootbp;
722 	dnode_phys_t *dnp = &os->os_phys->os_meta_dnode;
723 	int i;
724 
725 	/*
726 	 * Update rootbp fill count.
727 	 */
728 	bp->blk_fill = 1;	/* count the meta-dnode */
729 	for (i = 0; i < dnp->dn_nblkptr; i++)
730 		bp->blk_fill += dnp->dn_blkptr[i].blk_fill;
731 }
732 
733 /* ARGSUSED */
734 static void
735 killer(zio_t *zio, arc_buf_t *abuf, void *arg)
736 {
737 	objset_impl_t *os = arg;
738 
739 	ASSERT3U(zio->io_error, ==, 0);
740 
741 	BP_SET_TYPE(zio->io_bp, DMU_OT_OBJSET);
742 	BP_SET_LEVEL(zio->io_bp, 0);
743 
744 	if (!DVA_EQUAL(BP_IDENTITY(zio->io_bp),
745 	    BP_IDENTITY(&zio->io_bp_orig))) {
746 		if (zio->io_bp_orig.blk_birth == os->os_synctx->tx_txg)
747 			dsl_dataset_block_kill(os->os_dsl_dataset,
748 			    &zio->io_bp_orig, NULL, os->os_synctx);
749 		dsl_dataset_block_born(os->os_dsl_dataset, zio->io_bp,
750 		    os->os_synctx);
751 	}
752 	arc_release(os->os_phys_buf, &os->os_phys_buf);
753 }
754 
755 /* called from dsl */
756 void
757 dmu_objset_sync(objset_impl_t *os, zio_t *pio, dmu_tx_t *tx)
758 {
759 	int txgoff;
760 	zbookmark_t zb;
761 	zio_t *zio;
762 	list_t *list;
763 	dbuf_dirty_record_t *dr;
764 	int zio_flags;
765 
766 	dprintf_ds(os->os_dsl_dataset, "txg=%llu\n", tx->tx_txg);
767 
768 	ASSERT(dmu_tx_is_syncing(tx));
769 	/* XXX the write_done callback should really give us the tx... */
770 	os->os_synctx = tx;
771 
772 	if (os->os_dsl_dataset == NULL) {
773 		/*
774 		 * This is the MOS.  If we have upgraded,
775 		 * spa_max_replication() could change, so reset
776 		 * os_copies here.
777 		 */
778 		os->os_copies = spa_max_replication(os->os_spa);
779 	}
780 
781 	/*
782 	 * Create the root block IO
783 	 */
784 	zb.zb_objset = os->os_dsl_dataset ? os->os_dsl_dataset->ds_object : 0;
785 	zb.zb_object = 0;
786 	zb.zb_level = -1;
787 	zb.zb_blkid = 0;
788 	zio_flags = ZIO_FLAG_MUSTSUCCEED;
789 	if (dmu_ot[DMU_OT_OBJSET].ot_metadata || zb.zb_level != 0)
790 		zio_flags |= ZIO_FLAG_METADATA;
791 	if (BP_IS_OLDER(os->os_rootbp, tx->tx_txg))
792 		dsl_dataset_block_kill(os->os_dsl_dataset,
793 		    os->os_rootbp, pio, tx);
794 	zio = arc_write(pio, os->os_spa, os->os_md_checksum,
795 	    os->os_md_compress,
796 	    dmu_get_replication_level(os, &zb, DMU_OT_OBJSET),
797 	    tx->tx_txg, os->os_rootbp, os->os_phys_buf, ready, killer, os,
798 	    ZIO_PRIORITY_ASYNC_WRITE, zio_flags, &zb);
799 
800 	/*
801 	 * Sync meta-dnode - the parent IO for the sync is the root block
802 	 */
803 	os->os_meta_dnode->dn_zio = zio;
804 	dnode_sync(os->os_meta_dnode, tx);
805 
806 	txgoff = tx->tx_txg & TXG_MASK;
807 
808 	dmu_objset_sync_dnodes(&os->os_free_dnodes[txgoff], tx);
809 	dmu_objset_sync_dnodes(&os->os_dirty_dnodes[txgoff], tx);
810 
811 	list = &os->os_meta_dnode->dn_dirty_records[txgoff];
812 	while (dr = list_head(list)) {
813 		ASSERT(dr->dr_dbuf->db_level == 0);
814 		list_remove(list, dr);
815 		if (dr->dr_zio)
816 			zio_nowait(dr->dr_zio);
817 	}
818 	/*
819 	 * Free intent log blocks up to this tx.
820 	 */
821 	zil_sync(os->os_zil, tx);
822 	zio_nowait(zio);
823 }
824 
825 void
826 dmu_objset_space(objset_t *os, uint64_t *refdbytesp, uint64_t *availbytesp,
827     uint64_t *usedobjsp, uint64_t *availobjsp)
828 {
829 	dsl_dataset_space(os->os->os_dsl_dataset, refdbytesp, availbytesp,
830 	    usedobjsp, availobjsp);
831 }
832 
833 uint64_t
834 dmu_objset_fsid_guid(objset_t *os)
835 {
836 	return (dsl_dataset_fsid_guid(os->os->os_dsl_dataset));
837 }
838 
839 void
840 dmu_objset_fast_stat(objset_t *os, dmu_objset_stats_t *stat)
841 {
842 	stat->dds_type = os->os->os_phys->os_type;
843 	if (os->os->os_dsl_dataset)
844 		dsl_dataset_fast_stat(os->os->os_dsl_dataset, stat);
845 }
846 
847 void
848 dmu_objset_stats(objset_t *os, nvlist_t *nv)
849 {
850 	ASSERT(os->os->os_dsl_dataset ||
851 	    os->os->os_phys->os_type == DMU_OST_META);
852 
853 	if (os->os->os_dsl_dataset != NULL)
854 		dsl_dataset_stats(os->os->os_dsl_dataset, nv);
855 
856 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_TYPE,
857 	    os->os->os_phys->os_type);
858 }
859 
860 int
861 dmu_objset_is_snapshot(objset_t *os)
862 {
863 	if (os->os->os_dsl_dataset != NULL)
864 		return (dsl_dataset_is_snapshot(os->os->os_dsl_dataset));
865 	else
866 		return (B_FALSE);
867 }
868 
869 int
870 dmu_snapshot_list_next(objset_t *os, int namelen, char *name,
871     uint64_t *idp, uint64_t *offp)
872 {
873 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
874 	zap_cursor_t cursor;
875 	zap_attribute_t attr;
876 
877 	if (ds->ds_phys->ds_snapnames_zapobj == 0)
878 		return (ENOENT);
879 
880 	zap_cursor_init_serialized(&cursor,
881 	    ds->ds_dir->dd_pool->dp_meta_objset,
882 	    ds->ds_phys->ds_snapnames_zapobj, *offp);
883 
884 	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
885 		zap_cursor_fini(&cursor);
886 		return (ENOENT);
887 	}
888 
889 	if (strlen(attr.za_name) + 1 > namelen) {
890 		zap_cursor_fini(&cursor);
891 		return (ENAMETOOLONG);
892 	}
893 
894 	(void) strcpy(name, attr.za_name);
895 	if (idp)
896 		*idp = attr.za_first_integer;
897 	zap_cursor_advance(&cursor);
898 	*offp = zap_cursor_serialize(&cursor);
899 	zap_cursor_fini(&cursor);
900 
901 	return (0);
902 }
903 
904 int
905 dmu_dir_list_next(objset_t *os, int namelen, char *name,
906     uint64_t *idp, uint64_t *offp)
907 {
908 	dsl_dir_t *dd = os->os->os_dsl_dataset->ds_dir;
909 	zap_cursor_t cursor;
910 	zap_attribute_t attr;
911 
912 	/* there is no next dir on a snapshot! */
913 	if (os->os->os_dsl_dataset->ds_object !=
914 	    dd->dd_phys->dd_head_dataset_obj)
915 		return (ENOENT);
916 
917 	zap_cursor_init_serialized(&cursor,
918 	    dd->dd_pool->dp_meta_objset,
919 	    dd->dd_phys->dd_child_dir_zapobj, *offp);
920 
921 	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
922 		zap_cursor_fini(&cursor);
923 		return (ENOENT);
924 	}
925 
926 	if (strlen(attr.za_name) + 1 > namelen) {
927 		zap_cursor_fini(&cursor);
928 		return (ENAMETOOLONG);
929 	}
930 
931 	(void) strcpy(name, attr.za_name);
932 	if (idp)
933 		*idp = attr.za_first_integer;
934 	zap_cursor_advance(&cursor);
935 	*offp = zap_cursor_serialize(&cursor);
936 	zap_cursor_fini(&cursor);
937 
938 	return (0);
939 }
940 
941 /*
942  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
943  */
944 int
945 dmu_objset_find(char *name, int func(char *, void *), void *arg, int flags)
946 {
947 	dsl_dir_t *dd;
948 	objset_t *os;
949 	uint64_t snapobj;
950 	zap_cursor_t zc;
951 	zap_attribute_t *attr;
952 	char *child;
953 	int do_self, err;
954 
955 	err = dsl_dir_open(name, FTAG, &dd, NULL);
956 	if (err)
957 		return (err);
958 
959 	/* NB: the $MOS dir doesn't have a head dataset */
960 	do_self = (dd->dd_phys->dd_head_dataset_obj != 0);
961 	attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
962 
963 	/*
964 	 * Iterate over all children.
965 	 */
966 	if (flags & DS_FIND_CHILDREN) {
967 		for (zap_cursor_init(&zc, dd->dd_pool->dp_meta_objset,
968 		    dd->dd_phys->dd_child_dir_zapobj);
969 		    zap_cursor_retrieve(&zc, attr) == 0;
970 		    (void) zap_cursor_advance(&zc)) {
971 			ASSERT(attr->za_integer_length == sizeof (uint64_t));
972 			ASSERT(attr->za_num_integers == 1);
973 
974 			/*
975 			 * No separating '/' because parent's name ends in /.
976 			 */
977 			child = kmem_alloc(MAXPATHLEN, KM_SLEEP);
978 			/* XXX could probably just use name here */
979 			dsl_dir_name(dd, child);
980 			(void) strcat(child, "/");
981 			(void) strcat(child, attr->za_name);
982 			err = dmu_objset_find(child, func, arg, flags);
983 			kmem_free(child, MAXPATHLEN);
984 			if (err)
985 				break;
986 		}
987 		zap_cursor_fini(&zc);
988 
989 		if (err) {
990 			dsl_dir_close(dd, FTAG);
991 			kmem_free(attr, sizeof (zap_attribute_t));
992 			return (err);
993 		}
994 	}
995 
996 	/*
997 	 * Iterate over all snapshots.
998 	 */
999 	if ((flags & DS_FIND_SNAPSHOTS) &&
1000 	    dmu_objset_open(name, DMU_OST_ANY,
1001 	    DS_MODE_STANDARD | DS_MODE_READONLY, &os) == 0) {
1002 
1003 		snapobj = os->os->os_dsl_dataset->ds_phys->ds_snapnames_zapobj;
1004 		dmu_objset_close(os);
1005 
1006 		for (zap_cursor_init(&zc, dd->dd_pool->dp_meta_objset, snapobj);
1007 		    zap_cursor_retrieve(&zc, attr) == 0;
1008 		    (void) zap_cursor_advance(&zc)) {
1009 			ASSERT(attr->za_integer_length == sizeof (uint64_t));
1010 			ASSERT(attr->za_num_integers == 1);
1011 
1012 			child = kmem_alloc(MAXPATHLEN, KM_SLEEP);
1013 			/* XXX could probably just use name here */
1014 			dsl_dir_name(dd, child);
1015 			(void) strcat(child, "@");
1016 			(void) strcat(child, attr->za_name);
1017 			err = func(child, arg);
1018 			kmem_free(child, MAXPATHLEN);
1019 			if (err)
1020 				break;
1021 		}
1022 		zap_cursor_fini(&zc);
1023 	}
1024 
1025 	dsl_dir_close(dd, FTAG);
1026 	kmem_free(attr, sizeof (zap_attribute_t));
1027 
1028 	if (err)
1029 		return (err);
1030 
1031 	/*
1032 	 * Apply to self if appropriate.
1033 	 */
1034 	if (do_self)
1035 		err = func(name, arg);
1036 	return (err);
1037 }
1038