xref: /titanic_41/usr/src/uts/common/fs/zfs/dmu_objset.c (revision 2fb876ae0cefcbd01f8d8490242aa4501caddbc3)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/zfs_context.h>
29 #include <sys/dmu_objset.h>
30 #include <sys/dsl_dir.h>
31 #include <sys/dsl_dataset.h>
32 #include <sys/dsl_prop.h>
33 #include <sys/dsl_pool.h>
34 #include <sys/dnode.h>
35 #include <sys/dbuf.h>
36 #include <sys/dmu_tx.h>
37 #include <sys/zio_checksum.h>
38 #include <sys/zap.h>
39 #include <sys/zil.h>
40 #include <sys/dmu_impl.h>
41 
42 
43 spa_t *
44 dmu_objset_spa(objset_t *os)
45 {
46 	return (os->os->os_spa);
47 }
48 
49 zilog_t *
50 dmu_objset_zil(objset_t *os)
51 {
52 	return (os->os->os_zil);
53 }
54 
55 dsl_pool_t *
56 dmu_objset_pool(objset_t *os)
57 {
58 	dsl_dataset_t *ds;
59 
60 	if ((ds = os->os->os_dsl_dataset) != NULL && ds->ds_dir)
61 		return (ds->ds_dir->dd_pool);
62 	else
63 		return (spa_get_dsl(os->os->os_spa));
64 }
65 
66 dsl_dataset_t *
67 dmu_objset_ds(objset_t *os)
68 {
69 	return (os->os->os_dsl_dataset);
70 }
71 
72 dmu_objset_type_t
73 dmu_objset_type(objset_t *os)
74 {
75 	return (os->os->os_phys->os_type);
76 }
77 
78 void
79 dmu_objset_name(objset_t *os, char *buf)
80 {
81 	dsl_dataset_name(os->os->os_dsl_dataset, buf);
82 }
83 
84 uint64_t
85 dmu_objset_id(objset_t *os)
86 {
87 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
88 
89 	return (ds ? ds->ds_object : 0);
90 }
91 
92 static void
93 checksum_changed_cb(void *arg, uint64_t newval)
94 {
95 	objset_impl_t *osi = arg;
96 
97 	/*
98 	 * Inheritance should have been done by now.
99 	 */
100 	ASSERT(newval != ZIO_CHECKSUM_INHERIT);
101 
102 	osi->os_checksum = zio_checksum_select(newval, ZIO_CHECKSUM_ON_VALUE);
103 }
104 
105 static void
106 compression_changed_cb(void *arg, uint64_t newval)
107 {
108 	objset_impl_t *osi = arg;
109 
110 	/*
111 	 * Inheritance and range checking should have been done by now.
112 	 */
113 	ASSERT(newval != ZIO_COMPRESS_INHERIT);
114 
115 	osi->os_compress = zio_compress_select(newval, ZIO_COMPRESS_ON_VALUE);
116 }
117 
118 void
119 dmu_objset_byteswap(void *buf, size_t size)
120 {
121 	objset_phys_t *osp = buf;
122 
123 	ASSERT(size == sizeof (objset_phys_t));
124 	dnode_byteswap(&osp->os_meta_dnode);
125 	byteswap_uint64_array(&osp->os_zil_header, sizeof (zil_header_t));
126 	osp->os_type = BSWAP_64(osp->os_type);
127 }
128 
129 int
130 dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
131     objset_impl_t **osip)
132 {
133 	objset_impl_t *winner, *osi;
134 	int i, err, checksum;
135 
136 	osi = kmem_zalloc(sizeof (objset_impl_t), KM_SLEEP);
137 	osi->os.os = osi;
138 	osi->os_dsl_dataset = ds;
139 	osi->os_spa = spa;
140 	if (bp)
141 		osi->os_rootbp = *bp;
142 	osi->os_phys = zio_buf_alloc(sizeof (objset_phys_t));
143 	if (!BP_IS_HOLE(&osi->os_rootbp)) {
144 		zbookmark_t zb;
145 		zb.zb_objset = ds ? ds->ds_object : 0;
146 		zb.zb_object = 0;
147 		zb.zb_level = -1;
148 		zb.zb_blkid = 0;
149 
150 		dprintf_bp(&osi->os_rootbp, "reading %s", "");
151 		err = arc_read(NULL, spa, &osi->os_rootbp,
152 		    dmu_ot[DMU_OT_OBJSET].ot_byteswap,
153 		    arc_bcopy_func, osi->os_phys,
154 		    ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL, ARC_WAIT, &zb);
155 		if (err) {
156 			zio_buf_free(osi->os_phys, sizeof (objset_phys_t));
157 			kmem_free(osi, sizeof (objset_impl_t));
158 			return (err);
159 		}
160 	} else {
161 		bzero(osi->os_phys, sizeof (objset_phys_t));
162 	}
163 
164 	/*
165 	 * Note: the changed_cb will be called once before the register
166 	 * func returns, thus changing the checksum/compression from the
167 	 * default (fletcher2/off).  Snapshots don't need to know, and
168 	 * registering would complicate clone promotion.
169 	 */
170 	if (ds && ds->ds_phys->ds_num_children == 0) {
171 		err = dsl_prop_register(ds, "checksum",
172 		    checksum_changed_cb, osi);
173 		if (err == 0)
174 			err = dsl_prop_register(ds, "compression",
175 			    compression_changed_cb, osi);
176 		if (err) {
177 			zio_buf_free(osi->os_phys, sizeof (objset_phys_t));
178 			kmem_free(osi, sizeof (objset_impl_t));
179 			return (err);
180 		}
181 	} else if (ds == NULL) {
182 		/* It's the meta-objset. */
183 		osi->os_checksum = ZIO_CHECKSUM_FLETCHER_4;
184 		osi->os_compress = ZIO_COMPRESS_LZJB;
185 	}
186 
187 	osi->os_zil = zil_alloc(&osi->os, &osi->os_phys->os_zil_header);
188 
189 	/*
190 	 * Metadata always gets compressed and checksummed.
191 	 * If the data checksum is multi-bit correctable, and it's not
192 	 * a ZBT-style checksum, then it's suitable for metadata as well.
193 	 * Otherwise, the metadata checksum defaults to fletcher4.
194 	 */
195 	checksum = osi->os_checksum;
196 
197 	if (zio_checksum_table[checksum].ci_correctable &&
198 	    !zio_checksum_table[checksum].ci_zbt)
199 		osi->os_md_checksum = checksum;
200 	else
201 		osi->os_md_checksum = ZIO_CHECKSUM_FLETCHER_4;
202 	osi->os_md_compress = ZIO_COMPRESS_LZJB;
203 
204 	for (i = 0; i < TXG_SIZE; i++) {
205 		list_create(&osi->os_dirty_dnodes[i], sizeof (dnode_t),
206 		    offsetof(dnode_t, dn_dirty_link[i]));
207 		list_create(&osi->os_free_dnodes[i], sizeof (dnode_t),
208 		    offsetof(dnode_t, dn_dirty_link[i]));
209 	}
210 	list_create(&osi->os_dnodes, sizeof (dnode_t),
211 	    offsetof(dnode_t, dn_link));
212 	list_create(&osi->os_downgraded_dbufs, sizeof (dmu_buf_impl_t),
213 	    offsetof(dmu_buf_impl_t, db_link));
214 
215 	osi->os_meta_dnode = dnode_special_open(osi,
216 	    &osi->os_phys->os_meta_dnode, DMU_META_DNODE_OBJECT);
217 
218 	if (ds != NULL) {
219 		winner = dsl_dataset_set_user_ptr(ds, osi, dmu_objset_evict);
220 		if (winner) {
221 			dmu_objset_evict(ds, osi);
222 			osi = winner;
223 		}
224 	}
225 
226 	*osip = osi;
227 	return (0);
228 }
229 
230 /* called from zpl */
231 int
232 dmu_objset_open(const char *name, dmu_objset_type_t type, int mode,
233     objset_t **osp)
234 {
235 	dsl_dataset_t *ds;
236 	int err;
237 	objset_t *os;
238 	objset_impl_t *osi;
239 
240 	os = kmem_alloc(sizeof (objset_t), KM_SLEEP);
241 	err = dsl_dataset_open(name, mode, os, &ds);
242 	if (err) {
243 		kmem_free(os, sizeof (objset_t));
244 		return (err);
245 	}
246 
247 	osi = dsl_dataset_get_user_ptr(ds);
248 	if (osi == NULL) {
249 		blkptr_t bp;
250 
251 		dsl_dataset_get_blkptr(ds, &bp);
252 		err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
253 		    ds, &bp, &osi);
254 		if (err) {
255 			dsl_dataset_close(ds, mode, os);
256 			kmem_free(os, sizeof (objset_t));
257 			return (err);
258 		}
259 	}
260 
261 	os->os = osi;
262 	os->os_mode = mode;
263 
264 	if (type != DMU_OST_ANY && type != os->os->os_phys->os_type) {
265 		dmu_objset_close(os);
266 		return (EINVAL);
267 	}
268 	*osp = os;
269 	return (0);
270 }
271 
272 void
273 dmu_objset_close(objset_t *os)
274 {
275 	dsl_dataset_close(os->os->os_dsl_dataset, os->os_mode, os);
276 	kmem_free(os, sizeof (objset_t));
277 }
278 
279 int
280 dmu_objset_evict_dbufs(objset_t *os, int try)
281 {
282 	objset_impl_t *osi = os->os;
283 	dnode_t *dn;
284 
285 	mutex_enter(&osi->os_lock);
286 
287 	/* process the mdn last, since the other dnodes have holds on it */
288 	list_remove(&osi->os_dnodes, osi->os_meta_dnode);
289 	list_insert_tail(&osi->os_dnodes, osi->os_meta_dnode);
290 
291 	/*
292 	 * Find the first dnode with holds.  We have to do this dance
293 	 * because dnode_add_ref() only works if you already have a
294 	 * hold.  If there are no holds then it has no dbufs so OK to
295 	 * skip.
296 	 */
297 	for (dn = list_head(&osi->os_dnodes);
298 	    dn && refcount_is_zero(&dn->dn_holds);
299 	    dn = list_next(&osi->os_dnodes, dn))
300 		continue;
301 	if (dn)
302 		dnode_add_ref(dn, FTAG);
303 
304 	while (dn) {
305 		dnode_t *next_dn = dn;
306 
307 		do {
308 			next_dn = list_next(&osi->os_dnodes, next_dn);
309 		} while (next_dn && refcount_is_zero(&next_dn->dn_holds));
310 		if (next_dn)
311 			dnode_add_ref(next_dn, FTAG);
312 
313 		mutex_exit(&osi->os_lock);
314 		if (dnode_evict_dbufs(dn, try)) {
315 			dnode_rele(dn, FTAG);
316 			if (next_dn)
317 				dnode_rele(next_dn, FTAG);
318 			return (1);
319 		}
320 		dnode_rele(dn, FTAG);
321 		mutex_enter(&osi->os_lock);
322 		dn = next_dn;
323 	}
324 	mutex_exit(&osi->os_lock);
325 	return (0);
326 }
327 
328 void
329 dmu_objset_evict(dsl_dataset_t *ds, void *arg)
330 {
331 	objset_impl_t *osi = arg;
332 	objset_t os;
333 	int i;
334 
335 	for (i = 0; i < TXG_SIZE; i++) {
336 		ASSERT(list_head(&osi->os_dirty_dnodes[i]) == NULL);
337 		ASSERT(list_head(&osi->os_free_dnodes[i]) == NULL);
338 	}
339 
340 	if (ds && ds->ds_phys->ds_num_children == 0) {
341 		VERIFY(0 == dsl_prop_unregister(ds, "checksum",
342 		    checksum_changed_cb, osi));
343 		VERIFY(0 == dsl_prop_unregister(ds, "compression",
344 		    compression_changed_cb, osi));
345 	}
346 
347 	/*
348 	 * We should need only a single pass over the dnode list, since
349 	 * nothing can be added to the list at this point.
350 	 */
351 	os.os = osi;
352 	(void) dmu_objset_evict_dbufs(&os, 0);
353 
354 	ASSERT3P(list_head(&osi->os_dnodes), ==, osi->os_meta_dnode);
355 	ASSERT3P(list_tail(&osi->os_dnodes), ==, osi->os_meta_dnode);
356 	ASSERT3P(list_head(&osi->os_meta_dnode->dn_dbufs), ==, NULL);
357 
358 	dnode_special_close(osi->os_meta_dnode);
359 	zil_free(osi->os_zil);
360 
361 	zio_buf_free(osi->os_phys, sizeof (objset_phys_t));
362 	kmem_free(osi, sizeof (objset_impl_t));
363 }
364 
365 /* called from dsl for meta-objset */
366 objset_impl_t *
367 dmu_objset_create_impl(spa_t *spa, dsl_dataset_t *ds, dmu_objset_type_t type,
368     dmu_tx_t *tx)
369 {
370 	objset_impl_t *osi;
371 	dnode_t *mdn;
372 
373 	ASSERT(dmu_tx_is_syncing(tx));
374 	VERIFY(0 == dmu_objset_open_impl(spa, ds, NULL, &osi));
375 	mdn = osi->os_meta_dnode;
376 
377 	dnode_allocate(mdn, DMU_OT_DNODE, 1 << DNODE_BLOCK_SHIFT,
378 	    DN_MAX_INDBLKSHIFT, DMU_OT_NONE, 0, tx);
379 
380 	/*
381 	 * We don't want to have to increase the meta-dnode's nlevels
382 	 * later, because then we could do it in quescing context while
383 	 * we are also accessing it in open context.
384 	 *
385 	 * This precaution is not necessary for the MOS (ds == NULL),
386 	 * because the MOS is only updated in syncing context.
387 	 * This is most fortunate: the MOS is the only objset that
388 	 * needs to be synced multiple times as spa_sync() iterates
389 	 * to convergence, so minimizing its dn_nlevels matters.
390 	 */
391 	if (ds != NULL) {
392 		int levels = 1;
393 
394 		/*
395 		 * Determine the number of levels necessary for the meta-dnode
396 		 * to contain DN_MAX_OBJECT dnodes.
397 		 */
398 		while ((uint64_t)mdn->dn_nblkptr << (mdn->dn_datablkshift +
399 		    (levels - 1) * (mdn->dn_indblkshift - SPA_BLKPTRSHIFT)) <
400 		    DN_MAX_OBJECT * sizeof (dnode_phys_t))
401 			levels++;
402 
403 		mdn->dn_next_nlevels[tx->tx_txg & TXG_MASK] =
404 		    mdn->dn_nlevels = levels;
405 	}
406 
407 	ASSERT(type != DMU_OST_NONE);
408 	ASSERT(type != DMU_OST_ANY);
409 	ASSERT(type < DMU_OST_NUMTYPES);
410 	osi->os_phys->os_type = type;
411 
412 	dsl_dataset_dirty(ds, tx);
413 
414 	return (osi);
415 }
416 
417 struct oscarg {
418 	void (*userfunc)(objset_t *os, void *arg, dmu_tx_t *tx);
419 	void *userarg;
420 	dsl_dataset_t *clone_parent;
421 	const char *fullname;
422 	const char *lastname;
423 	dmu_objset_type_t type;
424 };
425 
426 static int
427 dmu_objset_create_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
428 {
429 	struct oscarg *oa = arg;
430 	dsl_dataset_t *ds;
431 	int err;
432 	blkptr_t bp;
433 
434 	ASSERT(dmu_tx_is_syncing(tx));
435 
436 	err = dsl_dataset_create_sync(dd, oa->fullname, oa->lastname,
437 	    oa->clone_parent, tx);
438 	dprintf_dd(dd, "fn=%s ln=%s err=%d\n",
439 	    oa->fullname, oa->lastname, err);
440 	if (err)
441 		return (err);
442 
443 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, oa->fullname,
444 	    DS_MODE_STANDARD | DS_MODE_READONLY, FTAG, &ds));
445 	dsl_dataset_get_blkptr(ds, &bp);
446 	if (BP_IS_HOLE(&bp)) {
447 		objset_impl_t *osi;
448 
449 		/* This is an empty dmu_objset; not a clone. */
450 		osi = dmu_objset_create_impl(dsl_dataset_get_spa(ds),
451 		    ds, oa->type, tx);
452 
453 		if (oa->userfunc)
454 			oa->userfunc(&osi->os, oa->userarg, tx);
455 	}
456 	dsl_dataset_close(ds, DS_MODE_STANDARD | DS_MODE_READONLY, FTAG);
457 
458 	return (0);
459 }
460 
461 int
462 dmu_objset_create(const char *name, dmu_objset_type_t type,
463     objset_t *clone_parent,
464     void (*func)(objset_t *os, void *arg, dmu_tx_t *tx), void *arg)
465 {
466 	dsl_dir_t *pds;
467 	const char *tail;
468 	int err = 0;
469 
470 	err = dsl_dir_open(name, FTAG, &pds, &tail);
471 	if (err)
472 		return (err);
473 	if (tail == NULL) {
474 		dsl_dir_close(pds, FTAG);
475 		return (EEXIST);
476 	}
477 
478 	dprintf("name=%s\n", name);
479 
480 	if (tail[0] == '@') {
481 		/*
482 		 * If we're creating a snapshot, make sure everything
483 		 * they might want is on disk.  XXX Sketchy to know
484 		 * about snapshots here, better to put in DSL.
485 		 */
486 		objset_t *os;
487 		size_t plen = strchr(name, '@') - name + 1;
488 		char *pbuf = kmem_alloc(plen, KM_SLEEP);
489 		bcopy(name, pbuf, plen - 1);
490 		pbuf[plen - 1] = '\0';
491 
492 		err = dmu_objset_open(pbuf, DMU_OST_ANY, DS_MODE_STANDARD, &os);
493 		if (err == 0) {
494 			err = zil_suspend(dmu_objset_zil(os));
495 			if (err == 0) {
496 				err = dsl_dir_sync_task(pds,
497 				    dsl_dataset_snapshot_sync,
498 				    (void*)(tail+1), 16*1024);
499 				zil_resume(dmu_objset_zil(os));
500 			}
501 			dmu_objset_close(os);
502 		}
503 		kmem_free(pbuf, plen);
504 	} else {
505 		struct oscarg oa = { 0 };
506 		oa.userfunc = func;
507 		oa.userarg = arg;
508 		oa.fullname = name;
509 		oa.lastname = tail;
510 		oa.type = type;
511 		if (clone_parent != NULL) {
512 			/*
513 			 * You can't clone to a different type.
514 			 */
515 			if (clone_parent->os->os_phys->os_type != type) {
516 				dsl_dir_close(pds, FTAG);
517 				return (EINVAL);
518 			}
519 			oa.clone_parent = clone_parent->os->os_dsl_dataset;
520 		}
521 		err = dsl_dir_sync_task(pds, dmu_objset_create_sync, &oa,
522 		    256*1024);
523 	}
524 	dsl_dir_close(pds, FTAG);
525 	return (err);
526 }
527 
528 int
529 dmu_objset_destroy(const char *name)
530 {
531 	objset_t *os;
532 	int error;
533 
534 	/*
535 	 * If it looks like we'll be able to destroy it, and there's
536 	 * an unplayed replay log sitting around, destroy the log.
537 	 * It would be nicer to do this in dsl_dataset_destroy_sync(),
538 	 * but the replay log objset is modified in open context.
539 	 */
540 	error = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_EXCLUSIVE, &os);
541 	if (error == 0) {
542 		zil_destroy(dmu_objset_zil(os), B_FALSE);
543 		dmu_objset_close(os);
544 	}
545 
546 	/* XXX uncache everything? */
547 	return (dsl_dataset_destroy(name));
548 }
549 
550 int
551 dmu_objset_rollback(const char *name)
552 {
553 	int err;
554 	objset_t *os;
555 
556 	err = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_EXCLUSIVE, &os);
557 	if (err == 0) {
558 		err = zil_suspend(dmu_objset_zil(os));
559 		if (err == 0)
560 			zil_resume(dmu_objset_zil(os));
561 		dmu_objset_close(os);
562 		if (err == 0) {
563 			/* XXX uncache everything? */
564 			err = dsl_dataset_rollback(name);
565 		}
566 	}
567 	return (err);
568 }
569 
570 static void
571 dmu_objset_sync_dnodes(objset_impl_t *os, list_t *list, dmu_tx_t *tx)
572 {
573 	dnode_t *dn = list_head(list);
574 	int level, err;
575 
576 	for (level = 0; dn = list_head(list); level++) {
577 		zio_t *zio;
578 		zio = zio_root(os->os_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
579 
580 		ASSERT3U(level, <=, DN_MAX_LEVELS);
581 
582 		while (dn) {
583 			dnode_t *next = list_next(list, dn);
584 
585 			list_remove(list, dn);
586 			if (dnode_sync(dn, level, zio, tx) == 0) {
587 				/*
588 				 * This dnode requires syncing at higher
589 				 * levels; put it back onto the list.
590 				 */
591 				if (next)
592 					list_insert_before(list, next, dn);
593 				else
594 					list_insert_tail(list, dn);
595 			}
596 			dn = next;
597 		}
598 		err = zio_wait(zio);
599 		ASSERT(err == 0);
600 	}
601 }
602 
603 /* ARGSUSED */
604 static void
605 killer(zio_t *zio, arc_buf_t *abuf, void *arg)
606 {
607 	objset_impl_t *os = arg;
608 	objset_phys_t *osphys = zio->io_data;
609 	dnode_phys_t *dnp = &osphys->os_meta_dnode;
610 	int i;
611 
612 	ASSERT3U(zio->io_error, ==, 0);
613 
614 	/*
615 	 * Update rootbp fill count.
616 	 */
617 	os->os_rootbp.blk_fill = 1;	/* count the meta-dnode */
618 	for (i = 0; i < dnp->dn_nblkptr; i++)
619 		os->os_rootbp.blk_fill += dnp->dn_blkptr[i].blk_fill;
620 
621 	BP_SET_TYPE(zio->io_bp, DMU_OT_OBJSET);
622 	BP_SET_LEVEL(zio->io_bp, 0);
623 
624 	if (!DVA_EQUAL(BP_IDENTITY(zio->io_bp),
625 	    BP_IDENTITY(&zio->io_bp_orig))) {
626 		dsl_dataset_block_kill(os->os_dsl_dataset, &zio->io_bp_orig,
627 		    os->os_synctx);
628 		dsl_dataset_block_born(os->os_dsl_dataset, zio->io_bp,
629 		    os->os_synctx);
630 	}
631 }
632 
633 
634 /* called from dsl */
635 void
636 dmu_objset_sync(objset_impl_t *os, dmu_tx_t *tx)
637 {
638 	extern taskq_t *dbuf_tq;
639 	int txgoff;
640 	list_t *dirty_list;
641 	int err;
642 	zbookmark_t zb;
643 	arc_buf_t *abuf =
644 	    arc_buf_alloc(os->os_spa, sizeof (objset_phys_t), FTAG);
645 
646 	ASSERT(dmu_tx_is_syncing(tx));
647 	ASSERT(os->os_synctx == NULL);
648 	/* XXX the write_done callback should really give us the tx... */
649 	os->os_synctx = tx;
650 
651 	dprintf_ds(os->os_dsl_dataset, "txg=%llu\n", tx->tx_txg);
652 
653 	txgoff = tx->tx_txg & TXG_MASK;
654 
655 	dmu_objset_sync_dnodes(os, &os->os_free_dnodes[txgoff], tx);
656 	dmu_objset_sync_dnodes(os, &os->os_dirty_dnodes[txgoff], tx);
657 
658 	/*
659 	 * Free intent log blocks up to this tx.
660 	 */
661 	zil_sync(os->os_zil, tx);
662 
663 	/*
664 	 * Sync meta-dnode
665 	 */
666 	dirty_list = &os->os_dirty_dnodes[txgoff];
667 	ASSERT(list_head(dirty_list) == NULL);
668 	list_insert_tail(dirty_list, os->os_meta_dnode);
669 	dmu_objset_sync_dnodes(os, dirty_list, tx);
670 
671 	/*
672 	 * Sync the root block.
673 	 */
674 	bcopy(os->os_phys, abuf->b_data, sizeof (objset_phys_t));
675 	zb.zb_objset = os->os_dsl_dataset ? os->os_dsl_dataset->ds_object : 0;
676 	zb.zb_object = 0;
677 	zb.zb_level = -1;
678 	zb.zb_blkid = 0;
679 	err = arc_write(NULL, os->os_spa, os->os_md_checksum,
680 	    os->os_md_compress,
681 	    dmu_get_replication_level(os->os_spa, &zb, DMU_OT_OBJSET),
682 	    tx->tx_txg, &os->os_rootbp, abuf, killer, os,
683 	    ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, ARC_WAIT, &zb);
684 	ASSERT(err == 0);
685 	VERIFY(arc_buf_remove_ref(abuf, FTAG) == 1);
686 
687 	dsl_dataset_set_blkptr(os->os_dsl_dataset, &os->os_rootbp, tx);
688 
689 	ASSERT3P(os->os_synctx, ==, tx);
690 	taskq_wait(dbuf_tq);
691 	os->os_synctx = NULL;
692 }
693 
694 void
695 dmu_objset_stats(objset_t *os, dmu_objset_stats_t *dds)
696 {
697 	if (os->os->os_dsl_dataset != NULL) {
698 		dsl_dataset_stats(os->os->os_dsl_dataset, dds);
699 	} else {
700 		ASSERT(os->os->os_phys->os_type == DMU_OST_META);
701 		bzero(dds, sizeof (*dds));
702 	}
703 	dds->dds_type = os->os->os_phys->os_type;
704 }
705 
706 int
707 dmu_objset_is_snapshot(objset_t *os)
708 {
709 	if (os->os->os_dsl_dataset != NULL)
710 		return (dsl_dataset_is_snapshot(os->os->os_dsl_dataset));
711 	else
712 		return (B_FALSE);
713 }
714 
715 int
716 dmu_snapshot_list_next(objset_t *os, int namelen, char *name,
717     uint64_t *idp, uint64_t *offp)
718 {
719 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
720 	zap_cursor_t cursor;
721 	zap_attribute_t attr;
722 
723 	if (ds->ds_phys->ds_snapnames_zapobj == 0)
724 		return (ENOENT);
725 
726 	zap_cursor_init_serialized(&cursor,
727 	    ds->ds_dir->dd_pool->dp_meta_objset,
728 	    ds->ds_phys->ds_snapnames_zapobj, *offp);
729 
730 	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
731 		zap_cursor_fini(&cursor);
732 		return (ENOENT);
733 	}
734 
735 	if (strlen(attr.za_name) + 1 > namelen) {
736 		zap_cursor_fini(&cursor);
737 		return (ENAMETOOLONG);
738 	}
739 
740 	(void) strcpy(name, attr.za_name);
741 	if (idp)
742 		*idp = attr.za_first_integer;
743 	zap_cursor_advance(&cursor);
744 	*offp = zap_cursor_serialize(&cursor);
745 	zap_cursor_fini(&cursor);
746 
747 	return (0);
748 }
749 
750 int
751 dmu_dir_list_next(objset_t *os, int namelen, char *name,
752     uint64_t *idp, uint64_t *offp)
753 {
754 	dsl_dir_t *dd = os->os->os_dsl_dataset->ds_dir;
755 	zap_cursor_t cursor;
756 	zap_attribute_t attr;
757 
758 	if (dd->dd_phys->dd_child_dir_zapobj == 0)
759 		return (ENOENT);
760 
761 	/* there is no next dir on a snapshot! */
762 	if (os->os->os_dsl_dataset->ds_object !=
763 	    dd->dd_phys->dd_head_dataset_obj)
764 		return (ENOENT);
765 
766 	zap_cursor_init_serialized(&cursor,
767 	    dd->dd_pool->dp_meta_objset,
768 	    dd->dd_phys->dd_child_dir_zapobj, *offp);
769 
770 	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
771 		zap_cursor_fini(&cursor);
772 		return (ENOENT);
773 	}
774 
775 	if (strlen(attr.za_name) + 1 > namelen) {
776 		zap_cursor_fini(&cursor);
777 		return (ENAMETOOLONG);
778 	}
779 
780 	(void) strcpy(name, attr.za_name);
781 	if (idp)
782 		*idp = attr.za_first_integer;
783 	zap_cursor_advance(&cursor);
784 	*offp = zap_cursor_serialize(&cursor);
785 	zap_cursor_fini(&cursor);
786 
787 	return (0);
788 }
789 
790 /*
791  * Find all objsets under name, and for each, call 'func(child_name, arg)'.
792  */
793 void
794 dmu_objset_find(char *name, void func(char *, void *), void *arg, int flags)
795 {
796 	dsl_dir_t *dd;
797 	objset_t *os;
798 	uint64_t snapobj;
799 	zap_cursor_t zc;
800 	zap_attribute_t attr;
801 	char *child;
802 	int do_self, err;
803 
804 	err = dsl_dir_open(name, FTAG, &dd, NULL);
805 	if (err)
806 		return;
807 
808 	do_self = (dd->dd_phys->dd_head_dataset_obj != 0);
809 
810 	/*
811 	 * Iterate over all children.
812 	 */
813 	if (dd->dd_phys->dd_child_dir_zapobj != 0) {
814 		for (zap_cursor_init(&zc, dd->dd_pool->dp_meta_objset,
815 		    dd->dd_phys->dd_child_dir_zapobj);
816 		    zap_cursor_retrieve(&zc, &attr) == 0;
817 		    (void) zap_cursor_advance(&zc)) {
818 			ASSERT(attr.za_integer_length == sizeof (uint64_t));
819 			ASSERT(attr.za_num_integers == 1);
820 
821 			/*
822 			 * No separating '/' because parent's name ends in /.
823 			 */
824 			child = kmem_alloc(MAXPATHLEN, KM_SLEEP);
825 			/* XXX could probably just use name here */
826 			dsl_dir_name(dd, child);
827 			(void) strcat(child, "/");
828 			(void) strcat(child, attr.za_name);
829 			dmu_objset_find(child, func, arg, flags);
830 			kmem_free(child, MAXPATHLEN);
831 		}
832 		zap_cursor_fini(&zc);
833 	}
834 
835 	/*
836 	 * Iterate over all snapshots.
837 	 */
838 	if ((flags & DS_FIND_SNAPSHOTS) &&
839 	    dmu_objset_open(name, DMU_OST_ANY,
840 	    DS_MODE_STANDARD | DS_MODE_READONLY, &os) == 0) {
841 
842 		snapobj = os->os->os_dsl_dataset->ds_phys->ds_snapnames_zapobj;
843 		dmu_objset_close(os);
844 
845 		for (zap_cursor_init(&zc, dd->dd_pool->dp_meta_objset, snapobj);
846 		    zap_cursor_retrieve(&zc, &attr) == 0;
847 		    (void) zap_cursor_advance(&zc)) {
848 			ASSERT(attr.za_integer_length == sizeof (uint64_t));
849 			ASSERT(attr.za_num_integers == 1);
850 
851 			child = kmem_alloc(MAXPATHLEN, KM_SLEEP);
852 			/* XXX could probably just use name here */
853 			dsl_dir_name(dd, child);
854 			(void) strcat(child, "@");
855 			(void) strcat(child, attr.za_name);
856 			func(child, arg);
857 			kmem_free(child, MAXPATHLEN);
858 		}
859 		zap_cursor_fini(&zc);
860 	}
861 
862 	dsl_dir_close(dd, FTAG);
863 
864 	/*
865 	 * Apply to self if appropriate.
866 	 */
867 	if (do_self)
868 		func(name, arg);
869 }
870