xref: /titanic_51/usr/src/uts/common/fs/zfs/dmu.c (revision 724365f7556fc4201fdb11766ebc6bd918523130)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dmu_zfetch.h>
40 #include <sys/zfs_ioctl.h>
41 #include <sys/zap.h>
42 #include <sys/zio_checksum.h>
43 
44 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
45 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
46 	{	zap_byteswap,		TRUE,	"object directory"	},
47 	{	byteswap_uint64_array,	TRUE,	"object array"		},
48 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
49 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
50 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
51 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
52 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
53 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
54 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
55 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
56 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
57 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
58 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
59 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
60 	{	zap_byteswap,		TRUE,	"DSL props"		},
61 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
62 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
63 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
64 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
65 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
66 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
67 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
68 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
69 	{	zap_byteswap,		TRUE,	"zvol prop"		},
70 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
71 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
72 	{	zap_byteswap,		TRUE,	"other ZAP"		},
73 	{	zap_byteswap,		TRUE,	"persistent error log"	},
74 };
75 
76 int
77 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
78     void *tag, dmu_buf_t **dbp)
79 {
80 	dnode_t *dn;
81 	uint64_t blkid;
82 	dmu_buf_impl_t *db;
83 	int err;
84 
85 	err = dnode_hold(os->os, object, FTAG, &dn);
86 	if (err)
87 		return (err);
88 	blkid = dbuf_whichblock(dn, offset);
89 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
90 	db = dbuf_hold(dn, blkid, tag);
91 	rw_exit(&dn->dn_struct_rwlock);
92 	if (db == NULL) {
93 		err = EIO;
94 	} else {
95 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
96 		if (err) {
97 			dbuf_rele(db, tag);
98 			db = NULL;
99 		}
100 	}
101 
102 	dnode_rele(dn, FTAG);
103 	*dbp = &db->db;
104 	return (err);
105 }
106 
107 int
108 dmu_bonus_max(void)
109 {
110 	return (DN_MAX_BONUSLEN);
111 }
112 
113 /*
114  * returns ENOENT, EIO, or 0.
115  */
116 int
117 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
118 {
119 	dnode_t *dn;
120 	int err, count;
121 	dmu_buf_impl_t *db;
122 
123 	err = dnode_hold(os->os, object, FTAG, &dn);
124 	if (err)
125 		return (err);
126 
127 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
128 	if (dn->dn_bonus == NULL) {
129 		rw_exit(&dn->dn_struct_rwlock);
130 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
131 		if (dn->dn_bonus == NULL)
132 			dn->dn_bonus = dbuf_create_bonus(dn);
133 	}
134 	db = dn->dn_bonus;
135 	rw_exit(&dn->dn_struct_rwlock);
136 	mutex_enter(&db->db_mtx);
137 	count = refcount_add(&db->db_holds, tag);
138 	mutex_exit(&db->db_mtx);
139 	if (count == 1)
140 		dnode_add_ref(dn, db);
141 	dnode_rele(dn, FTAG);
142 
143 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
144 
145 	*dbp = &db->db;
146 	return (0);
147 }
148 
149 int
150 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
151     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
152 {
153 	dnode_t *dn;
154 	dmu_buf_t **dbp;
155 	uint64_t blkid, nblks, i;
156 	uint32_t flags;
157 	int err;
158 	zio_t *zio;
159 
160 	ASSERT(length <= DMU_MAX_ACCESS);
161 
162 	if (length == 0) {
163 		if (numbufsp)
164 			*numbufsp = 0;
165 		*dbpp = NULL;
166 		return (0);
167 	}
168 
169 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
170 	if (length > zfetch_array_rd_sz)
171 		flags |= DB_RF_NOPREFETCH;
172 
173 	err = dnode_hold(os->os, object, FTAG, &dn);
174 	if (err)
175 		return (err);
176 
177 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
178 	if (dn->dn_datablkshift) {
179 		int blkshift = dn->dn_datablkshift;
180 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
181 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
182 	} else {
183 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
184 		nblks = 1;
185 	}
186 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
187 
188 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
189 	blkid = dbuf_whichblock(dn, offset);
190 	for (i = 0; i < nblks; i++) {
191 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
192 		if (db == NULL) {
193 			rw_exit(&dn->dn_struct_rwlock);
194 			dmu_buf_rele_array(dbp, nblks, tag);
195 			dnode_rele(dn, FTAG);
196 			zio_nowait(zio);
197 			return (EIO);
198 		}
199 		/* initiate async i/o */
200 		if (read && db->db_state == DB_UNCACHED) {
201 			rw_exit(&dn->dn_struct_rwlock);
202 			(void) dbuf_read(db, zio, flags);
203 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
204 		}
205 		dbp[i] = &db->db;
206 	}
207 	rw_exit(&dn->dn_struct_rwlock);
208 	dnode_rele(dn, FTAG);
209 
210 	/* wait for async i/o */
211 	err = zio_wait(zio);
212 	if (err) {
213 		dmu_buf_rele_array(dbp, nblks, tag);
214 		return (err);
215 	}
216 
217 	/* wait for other io to complete */
218 	if (read) {
219 		for (i = 0; i < nblks; i++) {
220 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
221 			mutex_enter(&db->db_mtx);
222 			while (db->db_state == DB_READ ||
223 			    db->db_state == DB_FILL)
224 				cv_wait(&db->db_changed, &db->db_mtx);
225 			if (db->db_state == DB_UNCACHED)
226 				err = EIO;
227 			mutex_exit(&db->db_mtx);
228 			if (err) {
229 				dmu_buf_rele_array(dbp, nblks, tag);
230 				return (err);
231 			}
232 		}
233 	}
234 
235 	*numbufsp = nblks;
236 	*dbpp = dbp;
237 	return (0);
238 }
239 
240 void
241 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
242 {
243 	int i;
244 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
245 
246 	if (numbufs == 0)
247 		return;
248 
249 	for (i = 0; i < numbufs; i++) {
250 		if (dbp[i])
251 			dbuf_rele(dbp[i], tag);
252 	}
253 
254 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
255 }
256 
257 void
258 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
259 {
260 	dnode_t *dn;
261 	uint64_t blkid;
262 	int nblks, i, err;
263 
264 	if (len == 0) {  /* they're interested in the bonus buffer */
265 		dn = os->os->os_meta_dnode;
266 
267 		if (object == 0 || object >= DN_MAX_OBJECT)
268 			return;
269 
270 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
271 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
272 		dbuf_prefetch(dn, blkid);
273 		rw_exit(&dn->dn_struct_rwlock);
274 		return;
275 	}
276 
277 	/*
278 	 * XXX - Note, if the dnode for the requested object is not
279 	 * already cached, we will do a *synchronous* read in the
280 	 * dnode_hold() call.  The same is true for any indirects.
281 	 */
282 	err = dnode_hold(os->os, object, FTAG, &dn);
283 	if (err != 0)
284 		return;
285 
286 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
287 	if (dn->dn_datablkshift) {
288 		int blkshift = dn->dn_datablkshift;
289 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
290 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
291 	} else {
292 		nblks = (offset < dn->dn_datablksz);
293 	}
294 
295 	if (nblks != 0) {
296 		blkid = dbuf_whichblock(dn, offset);
297 		for (i = 0; i < nblks; i++)
298 			dbuf_prefetch(dn, blkid+i);
299 	}
300 
301 	rw_exit(&dn->dn_struct_rwlock);
302 
303 	dnode_rele(dn, FTAG);
304 }
305 
306 int
307 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
308     uint64_t size, dmu_tx_t *tx)
309 {
310 	dnode_t *dn;
311 	int err = dnode_hold(os->os, object, FTAG, &dn);
312 	if (err)
313 		return (err);
314 	ASSERT(offset < UINT64_MAX);
315 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
316 	dnode_free_range(dn, offset, size, tx);
317 	dnode_rele(dn, FTAG);
318 	return (0);
319 }
320 
321 int
322 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
323     void *buf)
324 {
325 	dnode_t *dn;
326 	dmu_buf_t **dbp;
327 	int numbufs, i, err;
328 
329 	/*
330 	 * Deal with odd block sizes, where there can't be data past the
331 	 * first block.
332 	 */
333 	err = dnode_hold(os->os, object, FTAG, &dn);
334 	if (err)
335 		return (err);
336 	if (dn->dn_datablkshift == 0) {
337 		int newsz = offset > dn->dn_datablksz ? 0 :
338 		    MIN(size, dn->dn_datablksz - offset);
339 		bzero((char *)buf + newsz, size - newsz);
340 		size = newsz;
341 	}
342 	dnode_rele(dn, FTAG);
343 
344 	while (size > 0) {
345 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
346 		int err;
347 
348 		/*
349 		 * NB: we could do this block-at-a-time, but it's nice
350 		 * to be reading in parallel.
351 		 */
352 		err = dmu_buf_hold_array(os, object, offset, mylen,
353 		    TRUE, FTAG, &numbufs, &dbp);
354 		if (err)
355 			return (err);
356 
357 		for (i = 0; i < numbufs; i++) {
358 			int tocpy;
359 			int bufoff;
360 			dmu_buf_t *db = dbp[i];
361 
362 			ASSERT(size > 0);
363 
364 			bufoff = offset - db->db_offset;
365 			tocpy = (int)MIN(db->db_size - bufoff, size);
366 
367 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
368 
369 			offset += tocpy;
370 			size -= tocpy;
371 			buf = (char *)buf + tocpy;
372 		}
373 		dmu_buf_rele_array(dbp, numbufs, FTAG);
374 	}
375 	return (0);
376 }
377 
378 void
379 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
380     const void *buf, dmu_tx_t *tx)
381 {
382 	dmu_buf_t **dbp;
383 	int numbufs, i;
384 
385 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
386 	    FALSE, FTAG, &numbufs, &dbp));
387 
388 	for (i = 0; i < numbufs; i++) {
389 		int tocpy;
390 		int bufoff;
391 		dmu_buf_t *db = dbp[i];
392 
393 		ASSERT(size > 0);
394 
395 		bufoff = offset - db->db_offset;
396 		tocpy = (int)MIN(db->db_size - bufoff, size);
397 
398 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
399 
400 		if (tocpy == db->db_size)
401 			dmu_buf_will_fill(db, tx);
402 		else
403 			dmu_buf_will_dirty(db, tx);
404 
405 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
406 
407 		if (tocpy == db->db_size)
408 			dmu_buf_fill_done(db, tx);
409 
410 		offset += tocpy;
411 		size -= tocpy;
412 		buf = (char *)buf + tocpy;
413 	}
414 	dmu_buf_rele_array(dbp, numbufs, FTAG);
415 }
416 
417 #ifdef _KERNEL
418 int
419 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
420     uio_t *uio, dmu_tx_t *tx)
421 {
422 	dmu_buf_t **dbp;
423 	int numbufs, i;
424 	int err = 0;
425 
426 	err = dmu_buf_hold_array(os, object, offset, size,
427 	    FALSE, FTAG, &numbufs, &dbp);
428 	if (err)
429 		return (err);
430 
431 	for (i = 0; i < numbufs; i++) {
432 		int tocpy;
433 		int bufoff;
434 		dmu_buf_t *db = dbp[i];
435 
436 		ASSERT(size > 0);
437 
438 		bufoff = offset - db->db_offset;
439 		tocpy = (int)MIN(db->db_size - bufoff, size);
440 
441 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
442 
443 		if (tocpy == db->db_size)
444 			dmu_buf_will_fill(db, tx);
445 		else
446 			dmu_buf_will_dirty(db, tx);
447 
448 		/*
449 		 * XXX uiomove could block forever (eg. nfs-backed
450 		 * pages).  There needs to be a uiolockdown() function
451 		 * to lock the pages in memory, so that uiomove won't
452 		 * block.
453 		 */
454 		err = uiomove((char *)db->db_data + bufoff, tocpy,
455 		    UIO_WRITE, uio);
456 
457 		if (tocpy == db->db_size)
458 			dmu_buf_fill_done(db, tx);
459 
460 		if (err)
461 			break;
462 
463 		offset += tocpy;
464 		size -= tocpy;
465 	}
466 	dmu_buf_rele_array(dbp, numbufs, FTAG);
467 	return (err);
468 }
469 #endif
470 
471 struct backuparg {
472 	dmu_replay_record_t *drr;
473 	vnode_t *vp;
474 	objset_t *os;
475 	zio_cksum_t zc;
476 	int err;
477 };
478 
479 static int
480 dump_bytes(struct backuparg *ba, void *buf, int len)
481 {
482 	ssize_t resid; /* have to get resid to get detailed errno */
483 	ASSERT3U(len % 8, ==, 0);
484 
485 	fletcher_4_incremental_native(buf, len, &ba->zc);
486 	ba->err = vn_rdwr(UIO_WRITE, ba->vp,
487 	    (caddr_t)buf, len,
488 	    0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
489 	return (ba->err);
490 }
491 
492 static int
493 dump_free(struct backuparg *ba, uint64_t object, uint64_t offset,
494     uint64_t length)
495 {
496 	/* write a FREE record */
497 	bzero(ba->drr, sizeof (dmu_replay_record_t));
498 	ba->drr->drr_type = DRR_FREE;
499 	ba->drr->drr_u.drr_free.drr_object = object;
500 	ba->drr->drr_u.drr_free.drr_offset = offset;
501 	ba->drr->drr_u.drr_free.drr_length = length;
502 
503 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
504 		return (EINTR);
505 	return (0);
506 }
507 
508 static int
509 dump_data(struct backuparg *ba, dmu_object_type_t type,
510     uint64_t object, uint64_t offset, int blksz, void *data)
511 {
512 	/* write a DATA record */
513 	bzero(ba->drr, sizeof (dmu_replay_record_t));
514 	ba->drr->drr_type = DRR_WRITE;
515 	ba->drr->drr_u.drr_write.drr_object = object;
516 	ba->drr->drr_u.drr_write.drr_type = type;
517 	ba->drr->drr_u.drr_write.drr_offset = offset;
518 	ba->drr->drr_u.drr_write.drr_length = blksz;
519 
520 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
521 		return (EINTR);
522 	if (dump_bytes(ba, data, blksz))
523 		return (EINTR);
524 	return (0);
525 }
526 
527 static int
528 dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs)
529 {
530 	/* write a FREEOBJECTS record */
531 	bzero(ba->drr, sizeof (dmu_replay_record_t));
532 	ba->drr->drr_type = DRR_FREEOBJECTS;
533 	ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj;
534 	ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs;
535 
536 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
537 		return (EINTR);
538 	return (0);
539 }
540 
541 static int
542 dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp)
543 {
544 	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
545 		return (dump_freeobjects(ba, object, 1));
546 
547 	/* write an OBJECT record */
548 	bzero(ba->drr, sizeof (dmu_replay_record_t));
549 	ba->drr->drr_type = DRR_OBJECT;
550 	ba->drr->drr_u.drr_object.drr_object = object;
551 	ba->drr->drr_u.drr_object.drr_type = dnp->dn_type;
552 	ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype;
553 	ba->drr->drr_u.drr_object.drr_blksz =
554 	    dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
555 	ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen;
556 	ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum;
557 	ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress;
558 
559 	if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)))
560 		return (EINTR);
561 
562 	if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)))
563 		return (EINTR);
564 
565 	/* free anything past the end of the file */
566 	if (dump_free(ba, object, (dnp->dn_maxblkid + 1) *
567 	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL))
568 		return (EINTR);
569 	if (ba->err)
570 		return (EINTR);
571 	return (0);
572 }
573 
574 #define	BP_SPAN(dnp, level) \
575 	(((uint64_t)dnp->dn_datablkszsec) << (SPA_MINBLOCKSHIFT + \
576 	(level) * (dnp->dn_indblkshift - SPA_BLKPTRSHIFT)))
577 
578 static int
579 backup_cb(traverse_blk_cache_t *bc, spa_t *spa, void *arg)
580 {
581 	struct backuparg *ba = arg;
582 	uint64_t object = bc->bc_bookmark.zb_object;
583 	int level = bc->bc_bookmark.zb_level;
584 	uint64_t blkid = bc->bc_bookmark.zb_blkid;
585 	blkptr_t *bp = bc->bc_blkptr.blk_birth ? &bc->bc_blkptr : NULL;
586 	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
587 	void *data = bc->bc_data;
588 	int err = 0;
589 
590 	if (issig(JUSTLOOKING) && issig(FORREAL))
591 		return (EINTR);
592 
593 	ASSERT(data || bp == NULL);
594 
595 	if (bp == NULL && object == 0) {
596 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
597 		uint64_t dnobj = (blkid * span) >> DNODE_SHIFT;
598 		err = dump_freeobjects(ba, dnobj, span >> DNODE_SHIFT);
599 	} else if (bp == NULL) {
600 		uint64_t span = BP_SPAN(bc->bc_dnode, level);
601 		err = dump_free(ba, object, blkid * span, span);
602 	} else if (data && level == 0 && type == DMU_OT_DNODE) {
603 		dnode_phys_t *blk = data;
604 		int i;
605 		int blksz = BP_GET_LSIZE(bp);
606 
607 		for (i = 0; i < blksz >> DNODE_SHIFT; i++) {
608 			uint64_t dnobj =
609 			    (blkid << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)) + i;
610 			err = dump_dnode(ba, dnobj, blk+i);
611 			if (err)
612 				break;
613 		}
614 	} else if (level == 0 &&
615 	    type != DMU_OT_DNODE && type != DMU_OT_OBJSET) {
616 		int blksz = BP_GET_LSIZE(bp);
617 		if (data == NULL) {
618 			arc_buf_t *abuf;
619 			zbookmark_t zb;
620 
621 			zb.zb_objset = ba->os->os->os_dsl_dataset->ds_object;
622 			zb.zb_object = object;
623 			zb.zb_level = level;
624 			zb.zb_blkid = blkid;
625 			(void) arc_read(NULL, spa, bp,
626 			    dmu_ot[type].ot_byteswap, arc_getbuf_func, &abuf,
627 			    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_MUSTSUCCEED,
628 			    ARC_WAIT, &zb);
629 
630 			if (abuf) {
631 				err = dump_data(ba, type, object, blkid * blksz,
632 				    blksz, abuf->b_data);
633 				(void) arc_buf_remove_ref(abuf, &abuf);
634 			}
635 		} else {
636 			err = dump_data(ba, type, object, blkid * blksz,
637 			    blksz, data);
638 		}
639 	}
640 
641 	ASSERT(err == 0 || err == EINTR);
642 	return (err);
643 }
644 
645 int
646 dmu_sendbackup(objset_t *tosnap, objset_t *fromsnap, vnode_t *vp)
647 {
648 	dsl_dataset_t *ds = tosnap->os->os_dsl_dataset;
649 	dsl_dataset_t *fromds = fromsnap ? fromsnap->os->os_dsl_dataset : NULL;
650 	dmu_replay_record_t *drr;
651 	struct backuparg ba;
652 	int err;
653 
654 	/* tosnap must be a snapshot */
655 	if (ds->ds_phys->ds_next_snap_obj == 0)
656 		return (EINVAL);
657 
658 	/* fromsnap must be an earlier snapshot from the same fs as tosnap */
659 	if (fromds && (ds->ds_dir != fromds->ds_dir ||
660 	    fromds->ds_phys->ds_creation_txg >=
661 	    ds->ds_phys->ds_creation_txg))
662 		return (EXDEV);
663 
664 	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
665 	drr->drr_type = DRR_BEGIN;
666 	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
667 	drr->drr_u.drr_begin.drr_version = DMU_BACKUP_VERSION;
668 	drr->drr_u.drr_begin.drr_creation_time =
669 	    ds->ds_phys->ds_creation_time;
670 	drr->drr_u.drr_begin.drr_type = tosnap->os->os_phys->os_type;
671 	drr->drr_u.drr_begin.drr_toguid = ds->ds_phys->ds_guid;
672 	if (fromds)
673 		drr->drr_u.drr_begin.drr_fromguid = fromds->ds_phys->ds_guid;
674 	dsl_dataset_name(ds, drr->drr_u.drr_begin.drr_toname);
675 
676 	ba.drr = drr;
677 	ba.vp = vp;
678 	ba.os = tosnap;
679 	ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0);
680 
681 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) {
682 		kmem_free(drr, sizeof (dmu_replay_record_t));
683 		return (ba.err);
684 	}
685 
686 	err = traverse_dsl_dataset(ds,
687 	    fromds ? fromds->ds_phys->ds_creation_txg : 0,
688 	    ADVANCE_PRE | ADVANCE_HOLES | ADVANCE_DATA | ADVANCE_NOLOCK,
689 	    backup_cb, &ba);
690 
691 	if (err) {
692 		if (err == EINTR && ba.err)
693 			err = ba.err;
694 		return (err);
695 	}
696 
697 	bzero(drr, sizeof (dmu_replay_record_t));
698 	drr->drr_type = DRR_END;
699 	drr->drr_u.drr_end.drr_checksum = ba.zc;
700 
701 	if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)))
702 		return (ba.err);
703 
704 	kmem_free(drr, sizeof (dmu_replay_record_t));
705 
706 	return (0);
707 }
708 
709 struct restorearg {
710 	int err;
711 	int byteswap;
712 	vnode_t *vp;
713 	char *buf;
714 	uint64_t voff;
715 	int buflen; /* number of valid bytes in buf */
716 	int bufoff; /* next offset to read */
717 	int bufsize; /* amount of memory allocated for buf */
718 	zio_cksum_t zc;
719 };
720 
721 static int
722 replay_incremental_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
723 {
724 	struct drr_begin *drrb = arg;
725 	dsl_dataset_t *ds = NULL;
726 	dsl_dataset_t *ds_prev = NULL;
727 	const char *snapname;
728 	int err = EINVAL;
729 	uint64_t val;
730 
731 	/* this must be a filesytem */
732 	if (dd->dd_phys->dd_head_dataset_obj == 0)
733 		goto die;
734 
735 	err = dsl_dataset_open_obj(dd->dd_pool,
736 	    dd->dd_phys->dd_head_dataset_obj,
737 	    NULL, DS_MODE_EXCLUSIVE, FTAG, &ds);
738 	if (err)
739 		goto die;
740 
741 	if (ds == NULL) {
742 		err = EBUSY;
743 		goto die;
744 	}
745 
746 	/* must already be a snapshot of this fs */
747 	if (ds->ds_phys->ds_prev_snap_obj == 0) {
748 		err = ENODEV;
749 		goto die;
750 	}
751 
752 	/* most recent snapshot must match fromguid */
753 	err = dsl_dataset_open_obj(dd->dd_pool,
754 	    ds->ds_phys->ds_prev_snap_obj, NULL,
755 	    DS_MODE_STANDARD | DS_MODE_READONLY, FTAG, &ds_prev);
756 	if (err)
757 		goto die;
758 	if (ds_prev->ds_phys->ds_guid != drrb->drr_fromguid) {
759 		err = ENODEV;
760 		goto die;
761 	}
762 
763 	/* must not have any changes since most recent snapshot */
764 	if (ds->ds_phys->ds_bp.blk_birth >
765 	    ds_prev->ds_phys->ds_creation_txg) {
766 		err = ETXTBSY;
767 		goto die;
768 	}
769 
770 	/* new snapshot name must not exist */
771 	snapname = strrchr(drrb->drr_toname, '@');
772 	if (snapname == NULL) {
773 		err = EEXIST;
774 		goto die;
775 	}
776 	snapname++;
777 	err = zap_lookup(dd->dd_pool->dp_meta_objset,
778 	    ds->ds_phys->ds_snapnames_zapobj, snapname, 8, 1, &val);
779 	if (err != ENOENT) {
780 		if (err == 0)
781 			err = EEXIST;
782 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
783 		dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
784 		return (err);
785 	}
786 
787 	dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
788 
789 	/* The point of no (unsuccessful) return. */
790 
791 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
792 	ds->ds_phys->ds_inconsistent = TRUE;
793 
794 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
795 	return (0);
796 
797 die:
798 	if (ds_prev)
799 		dsl_dataset_close(ds_prev, DS_MODE_STANDARD, FTAG);
800 	if (ds)
801 		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
802 	return (err);
803 }
804 
805 static int
806 replay_full_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
807 {
808 	struct drr_begin *drrb = arg;
809 	int err;
810 	char *fsfullname, *fslastname, *cp;
811 	dsl_dataset_t *ds;
812 
813 	fsfullname = kmem_alloc(MAXNAMELEN, KM_SLEEP);
814 	(void) strncpy(fsfullname, drrb->drr_toname, MAXNAMELEN);
815 	cp = strchr(fsfullname, '@');
816 	if (cp == NULL) {
817 		kmem_free(fsfullname, MAXNAMELEN);
818 		return (EINVAL);
819 	}
820 	*cp = '\0';
821 	fslastname = strrchr(fsfullname, '/');
822 	if (fslastname == NULL) {
823 		kmem_free(fsfullname, MAXNAMELEN);
824 		return (EINVAL);
825 	}
826 	fslastname++;
827 
828 	err = dsl_dataset_create_sync(dd, fsfullname, fslastname, NULL, tx);
829 	if (err) {
830 		kmem_free(fsfullname, MAXNAMELEN);
831 		return (err);
832 	}
833 
834 	/* the point of no (unsuccessful) return */
835 
836 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, fsfullname,
837 	    DS_MODE_EXCLUSIVE, FTAG, &ds));
838 	kmem_free(fsfullname, MAXNAMELEN);
839 
840 	(void) dmu_objset_create_impl(dsl_dataset_get_spa(ds),
841 	    ds, drrb->drr_type, tx);
842 
843 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
844 	ds->ds_phys->ds_inconsistent = TRUE;
845 
846 	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
847 	return (0);
848 }
849 
850 static int
851 replay_end_sync(dsl_dir_t *dd, void *arg, dmu_tx_t *tx)
852 {
853 	struct drr_begin *drrb = arg;
854 	int err;
855 	char *snapname;
856 	dsl_dataset_t *ds;
857 
858 	/* XXX verify that drr_toname is in dd */
859 
860 	snapname = strchr(drrb->drr_toname, '@');
861 	if (snapname == NULL)
862 		return (EINVAL);
863 	snapname++;
864 
865 	/* create snapshot */
866 	err = dsl_dataset_snapshot_sync(dd, snapname, tx);
867 	if (err)
868 		return (err);
869 
870 	/* set snapshot's creation time and guid */
871 	VERIFY(0 == dsl_dataset_open_spa(dd->dd_pool->dp_spa, drrb->drr_toname,
872 	    DS_MODE_PRIMARY | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
873 	    FTAG, &ds));
874 
875 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
876 	ds->ds_phys->ds_creation_time = drrb->drr_creation_time;
877 	ds->ds_phys->ds_guid = drrb->drr_toguid;
878 	ds->ds_phys->ds_inconsistent = FALSE;
879 
880 	dsl_dataset_close(ds, DS_MODE_PRIMARY, FTAG);
881 
882 	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
883 	    dd->dd_phys->dd_head_dataset_obj,
884 	    NULL, DS_MODE_STANDARD | DS_MODE_INCONSISTENT, FTAG, &ds));
885 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
886 	ds->ds_phys->ds_inconsistent = FALSE;
887 	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
888 
889 	return (0);
890 }
891 
892 void *
893 restore_read(struct restorearg *ra, int len)
894 {
895 	void *rv;
896 
897 	/* some things will require 8-byte alignment, so everything must */
898 	ASSERT3U(len % 8, ==, 0);
899 
900 	while (ra->buflen - ra->bufoff < len) {
901 		ssize_t resid;
902 		int leftover = ra->buflen - ra->bufoff;
903 
904 		(void) memmove(ra->buf, ra->buf + ra->bufoff, leftover);
905 		ra->err = vn_rdwr(UIO_READ, ra->vp,
906 		    (caddr_t)ra->buf + leftover, ra->bufsize - leftover,
907 		    ra->voff, UIO_SYSSPACE, FAPPEND,
908 		    RLIM64_INFINITY, CRED(), &resid);
909 
910 		ra->voff += ra->bufsize - leftover - resid;
911 		ra->buflen = ra->bufsize - resid;
912 		ra->bufoff = 0;
913 		if (resid == ra->bufsize - leftover)
914 			ra->err = EINVAL;
915 		if (ra->err)
916 			return (NULL);
917 		/* Could compute checksum here? */
918 	}
919 
920 	ASSERT3U(ra->bufoff % 8, ==, 0);
921 	ASSERT3U(ra->buflen - ra->bufoff, >=, len);
922 	rv = ra->buf + ra->bufoff;
923 	ra->bufoff += len;
924 	if (ra->byteswap)
925 		fletcher_4_incremental_byteswap(rv, len, &ra->zc);
926 	else
927 		fletcher_4_incremental_native(rv, len, &ra->zc);
928 	return (rv);
929 }
930 
931 static void
932 backup_byteswap(dmu_replay_record_t *drr)
933 {
934 #define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
935 #define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
936 	drr->drr_type = BSWAP_32(drr->drr_type);
937 	switch (drr->drr_type) {
938 	case DRR_BEGIN:
939 		DO64(drr_begin.drr_magic);
940 		DO64(drr_begin.drr_version);
941 		DO64(drr_begin.drr_creation_time);
942 		DO32(drr_begin.drr_type);
943 		DO64(drr_begin.drr_toguid);
944 		DO64(drr_begin.drr_fromguid);
945 		break;
946 	case DRR_OBJECT:
947 		DO64(drr_object.drr_object);
948 		/* DO64(drr_object.drr_allocation_txg); */
949 		DO32(drr_object.drr_type);
950 		DO32(drr_object.drr_bonustype);
951 		DO32(drr_object.drr_blksz);
952 		DO32(drr_object.drr_bonuslen);
953 		break;
954 	case DRR_FREEOBJECTS:
955 		DO64(drr_freeobjects.drr_firstobj);
956 		DO64(drr_freeobjects.drr_numobjs);
957 		break;
958 	case DRR_WRITE:
959 		DO64(drr_write.drr_object);
960 		DO32(drr_write.drr_type);
961 		DO64(drr_write.drr_offset);
962 		DO64(drr_write.drr_length);
963 		break;
964 	case DRR_FREE:
965 		DO64(drr_free.drr_object);
966 		DO64(drr_free.drr_offset);
967 		DO64(drr_free.drr_length);
968 		break;
969 	case DRR_END:
970 		DO64(drr_end.drr_checksum.zc_word[0]);
971 		DO64(drr_end.drr_checksum.zc_word[1]);
972 		DO64(drr_end.drr_checksum.zc_word[2]);
973 		DO64(drr_end.drr_checksum.zc_word[3]);
974 		break;
975 	}
976 #undef DO64
977 #undef DO32
978 }
979 
980 static int
981 restore_object(struct restorearg *ra, objset_t *os, struct drr_object *drro)
982 {
983 	int err;
984 	dmu_tx_t *tx;
985 
986 	err = dmu_object_info(os, drro->drr_object, NULL);
987 
988 	if (err != 0 && err != ENOENT)
989 		return (EINVAL);
990 
991 	if (drro->drr_type == DMU_OT_NONE ||
992 	    drro->drr_type >= DMU_OT_NUMTYPES ||
993 	    drro->drr_bonustype >= DMU_OT_NUMTYPES ||
994 	    drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS ||
995 	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
996 	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
997 	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
998 	    drro->drr_blksz > SPA_MAXBLOCKSIZE ||
999 	    drro->drr_bonuslen > DN_MAX_BONUSLEN) {
1000 		return (EINVAL);
1001 	}
1002 
1003 	tx = dmu_tx_create(os);
1004 
1005 	if (err == ENOENT) {
1006 		/* currently free, want to be allocated */
1007 		dmu_tx_hold_bonus(tx, DMU_NEW_OBJECT);
1008 		dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, 1);
1009 		err = dmu_tx_assign(tx, TXG_WAIT);
1010 		if (err) {
1011 			dmu_tx_abort(tx);
1012 			return (err);
1013 		}
1014 		err = dmu_object_claim(os, drro->drr_object,
1015 		    drro->drr_type, drro->drr_blksz,
1016 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1017 	} else {
1018 		/* currently allocated, want to be allocated */
1019 		dmu_tx_hold_bonus(tx, drro->drr_object);
1020 		/*
1021 		 * We may change blocksize, so need to
1022 		 * hold_write
1023 		 */
1024 		dmu_tx_hold_write(tx, drro->drr_object, 0, 1);
1025 		err = dmu_tx_assign(tx, TXG_WAIT);
1026 		if (err) {
1027 			dmu_tx_abort(tx);
1028 			return (err);
1029 		}
1030 
1031 		err = dmu_object_reclaim(os, drro->drr_object,
1032 		    drro->drr_type, drro->drr_blksz,
1033 		    drro->drr_bonustype, drro->drr_bonuslen, tx);
1034 	}
1035 	if (err) {
1036 		dmu_tx_commit(tx);
1037 		return (EINVAL);
1038 	}
1039 
1040 	dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx);
1041 	dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx);
1042 
1043 	if (drro->drr_bonuslen) {
1044 		dmu_buf_t *db;
1045 		void *data;
1046 		VERIFY(0 == dmu_bonus_hold(os, drro->drr_object, FTAG, &db));
1047 		dmu_buf_will_dirty(db, tx);
1048 
1049 		ASSERT3U(db->db_size, ==, drro->drr_bonuslen);
1050 		data = restore_read(ra, P2ROUNDUP(db->db_size, 8));
1051 		if (data == NULL) {
1052 			dmu_tx_commit(tx);
1053 			return (ra->err);
1054 		}
1055 		bcopy(data, db->db_data, db->db_size);
1056 		if (ra->byteswap) {
1057 			dmu_ot[drro->drr_bonustype].ot_byteswap(db->db_data,
1058 			    drro->drr_bonuslen);
1059 		}
1060 		dmu_buf_rele(db, FTAG);
1061 	}
1062 	dmu_tx_commit(tx);
1063 	return (0);
1064 }
1065 
1066 /* ARGSUSED */
1067 static int
1068 restore_freeobjects(struct restorearg *ra, objset_t *os,
1069     struct drr_freeobjects *drrfo)
1070 {
1071 	uint64_t obj;
1072 
1073 	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1074 		return (EINVAL);
1075 
1076 	for (obj = drrfo->drr_firstobj;
1077 	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs; obj++) {
1078 		dmu_tx_t *tx;
1079 		int err;
1080 
1081 		if (dmu_object_info(os, obj, NULL) != 0)
1082 			continue;
1083 
1084 		tx = dmu_tx_create(os);
1085 		dmu_tx_hold_bonus(tx, obj);
1086 		err = dmu_tx_assign(tx, TXG_WAIT);
1087 		if (err) {
1088 			dmu_tx_abort(tx);
1089 			return (err);
1090 		}
1091 		err = dmu_object_free(os, obj, tx);
1092 		dmu_tx_commit(tx);
1093 		if (err && err != ENOENT)
1094 			return (EINVAL);
1095 	}
1096 	return (0);
1097 }
1098 
1099 static int
1100 restore_write(struct restorearg *ra, objset_t *os,
1101     struct drr_write *drrw)
1102 {
1103 	dmu_tx_t *tx;
1104 	void *data;
1105 	int err;
1106 
1107 	if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
1108 	    drrw->drr_type >= DMU_OT_NUMTYPES)
1109 		return (EINVAL);
1110 
1111 	data = restore_read(ra, drrw->drr_length);
1112 	if (data == NULL)
1113 		return (ra->err);
1114 
1115 	if (dmu_object_info(os, drrw->drr_object, NULL) != 0)
1116 		return (EINVAL);
1117 
1118 	tx = dmu_tx_create(os);
1119 
1120 	dmu_tx_hold_write(tx, drrw->drr_object,
1121 	    drrw->drr_offset, drrw->drr_length);
1122 	err = dmu_tx_assign(tx, TXG_WAIT);
1123 	if (err) {
1124 		dmu_tx_abort(tx);
1125 		return (err);
1126 	}
1127 	if (ra->byteswap)
1128 		dmu_ot[drrw->drr_type].ot_byteswap(data, drrw->drr_length);
1129 	dmu_write(os, drrw->drr_object,
1130 	    drrw->drr_offset, drrw->drr_length, data, tx);
1131 	dmu_tx_commit(tx);
1132 	return (0);
1133 }
1134 
1135 /* ARGSUSED */
1136 static int
1137 restore_free(struct restorearg *ra, objset_t *os,
1138     struct drr_free *drrf)
1139 {
1140 	dmu_tx_t *tx;
1141 	int err;
1142 
1143 	if (drrf->drr_length != -1ULL &&
1144 	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1145 		return (EINVAL);
1146 
1147 	if (dmu_object_info(os, drrf->drr_object, NULL) != 0)
1148 		return (EINVAL);
1149 
1150 	tx = dmu_tx_create(os);
1151 
1152 	dmu_tx_hold_free(tx, drrf->drr_object,
1153 	    drrf->drr_offset, drrf->drr_length);
1154 	err = dmu_tx_assign(tx, TXG_WAIT);
1155 	if (err) {
1156 		dmu_tx_abort(tx);
1157 		return (err);
1158 	}
1159 	err = dmu_free_range(os, drrf->drr_object,
1160 	    drrf->drr_offset, drrf->drr_length, tx);
1161 	dmu_tx_commit(tx);
1162 	return (err);
1163 }
1164 
1165 int
1166 dmu_recvbackup(char *tosnap, struct drr_begin *drrb, uint64_t *sizep,
1167     vnode_t *vp, uint64_t voffset)
1168 {
1169 	struct restorearg ra;
1170 	dmu_replay_record_t *drr;
1171 	char *cp;
1172 	dsl_dir_t *dd = NULL;
1173 	objset_t *os = NULL;
1174 	zio_cksum_t pzc;
1175 
1176 	bzero(&ra, sizeof (ra));
1177 	ra.vp = vp;
1178 	ra.voff = voffset;
1179 	ra.bufsize = 1<<20;
1180 	ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP);
1181 
1182 	if (drrb->drr_magic == DMU_BACKUP_MAGIC) {
1183 		ra.byteswap = FALSE;
1184 	} else if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1185 		ra.byteswap = TRUE;
1186 	} else {
1187 		ra.err = EINVAL;
1188 		goto out;
1189 	}
1190 
1191 	/*
1192 	 * NB: this assumes that struct drr_begin will be the largest in
1193 	 * dmu_replay_record_t's drr_u, and thus we don't need to pad it
1194 	 * with zeros to make it the same length as we wrote out.
1195 	 */
1196 	((dmu_replay_record_t *)ra.buf)->drr_type = DRR_BEGIN;
1197 	((dmu_replay_record_t *)ra.buf)->drr_pad = 0;
1198 	((dmu_replay_record_t *)ra.buf)->drr_u.drr_begin = *drrb;
1199 	if (ra.byteswap) {
1200 		fletcher_4_incremental_byteswap(ra.buf,
1201 		    sizeof (dmu_replay_record_t), &ra.zc);
1202 	} else {
1203 		fletcher_4_incremental_native(ra.buf,
1204 		    sizeof (dmu_replay_record_t), &ra.zc);
1205 	}
1206 	(void) strcpy(drrb->drr_toname, tosnap); /* for the sync funcs */
1207 
1208 	if (ra.byteswap) {
1209 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
1210 		drrb->drr_version = BSWAP_64(drrb->drr_version);
1211 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
1212 		drrb->drr_type = BSWAP_32(drrb->drr_type);
1213 		drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
1214 		drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
1215 	}
1216 
1217 	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1218 
1219 	if (drrb->drr_version != DMU_BACKUP_VERSION ||
1220 	    drrb->drr_type >= DMU_OST_NUMTYPES ||
1221 	    strchr(drrb->drr_toname, '@') == NULL) {
1222 		ra.err = EINVAL;
1223 		goto out;
1224 	}
1225 
1226 	/*
1227 	 * Process the begin in syncing context.
1228 	 */
1229 	if (drrb->drr_fromguid) {
1230 		/* incremental backup */
1231 
1232 		cp = strchr(tosnap, '@');
1233 		*cp = '\0';
1234 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, NULL);
1235 		*cp = '@';
1236 		if (ra.err)
1237 			goto out;
1238 
1239 		ra.err = dsl_dir_sync_task(dd, replay_incremental_sync,
1240 		    drrb, 1<<20);
1241 	} else {
1242 		/* full backup */
1243 		const char *tail;
1244 
1245 		cp = strchr(tosnap, '@');
1246 		*cp = '\0';
1247 		ra.err = dsl_dir_open(tosnap, FTAG, &dd, &tail);
1248 		*cp = '@';
1249 		if (ra.err)
1250 			goto out;
1251 		if (tail == NULL) {
1252 			ra.err = EEXIST;
1253 			goto out;
1254 		}
1255 
1256 		ra.err = dsl_dir_sync_task(dd, replay_full_sync,
1257 		    drrb, 1<<20);
1258 	}
1259 	if (ra.err)
1260 		goto out;
1261 
1262 	/*
1263 	 * Open the objset we are modifying.
1264 	 */
1265 
1266 	cp = strchr(tosnap, '@');
1267 	*cp = '\0';
1268 	ra.err = dmu_objset_open(tosnap, DMU_OST_ANY,
1269 	    DS_MODE_PRIMARY | DS_MODE_INCONSISTENT, &os);
1270 	*cp = '@';
1271 	ASSERT3U(ra.err, ==, 0);
1272 
1273 	/*
1274 	 * Read records and process them.
1275 	 */
1276 	pzc = ra.zc;
1277 	while (ra.err == 0 &&
1278 	    NULL != (drr = restore_read(&ra, sizeof (*drr)))) {
1279 		if (issig(JUSTLOOKING) && issig(FORREAL)) {
1280 			ra.err = EINTR;
1281 			goto out;
1282 		}
1283 
1284 		if (ra.byteswap)
1285 			backup_byteswap(drr);
1286 
1287 		switch (drr->drr_type) {
1288 		case DRR_OBJECT:
1289 		{
1290 			/*
1291 			 * We need to make a copy of the record header,
1292 			 * because restore_{object,write} may need to
1293 			 * restore_read(), which will invalidate drr.
1294 			 */
1295 			struct drr_object drro = drr->drr_u.drr_object;
1296 			ra.err = restore_object(&ra, os, &drro);
1297 			break;
1298 		}
1299 		case DRR_FREEOBJECTS:
1300 		{
1301 			struct drr_freeobjects drrfo =
1302 			    drr->drr_u.drr_freeobjects;
1303 			ra.err = restore_freeobjects(&ra, os, &drrfo);
1304 			break;
1305 		}
1306 		case DRR_WRITE:
1307 		{
1308 			struct drr_write drrw = drr->drr_u.drr_write;
1309 			ra.err = restore_write(&ra, os, &drrw);
1310 			break;
1311 		}
1312 		case DRR_FREE:
1313 		{
1314 			struct drr_free drrf = drr->drr_u.drr_free;
1315 			ra.err = restore_free(&ra, os, &drrf);
1316 			break;
1317 		}
1318 		case DRR_END:
1319 		{
1320 			struct drr_end drre = drr->drr_u.drr_end;
1321 			/*
1322 			 * We compare against the *previous* checksum
1323 			 * value, because the stored checksum is of
1324 			 * everything before the DRR_END record.
1325 			 */
1326 			if (drre.drr_checksum.zc_word[0] != 0 &&
1327 			    ((drre.drr_checksum.zc_word[0] - pzc.zc_word[0]) |
1328 			    (drre.drr_checksum.zc_word[1] - pzc.zc_word[1]) |
1329 			    (drre.drr_checksum.zc_word[2] - pzc.zc_word[2]) |
1330 			    (drre.drr_checksum.zc_word[3] - pzc.zc_word[3]))) {
1331 				ra.err = ECKSUM;
1332 				goto out;
1333 			}
1334 
1335 			/*
1336 			 * dd may be the parent of the dd we are
1337 			 * restoring into (eg. if it's a full backup).
1338 			 */
1339 			ra.err = dsl_dir_sync_task(dmu_objset_ds(os)->
1340 			    ds_dir, replay_end_sync, drrb, 1<<20);
1341 			goto out;
1342 		}
1343 		default:
1344 			ra.err = EINVAL;
1345 			goto out;
1346 		}
1347 		pzc = ra.zc;
1348 	}
1349 
1350 out:
1351 	if (os)
1352 		dmu_objset_close(os);
1353 
1354 	/*
1355 	 * Make sure we don't rollback/destroy unless we actually
1356 	 * processed the begin properly.  'os' will only be set if this
1357 	 * is the case.
1358 	 */
1359 	if (ra.err && os && dd && tosnap && strchr(tosnap, '@')) {
1360 		/*
1361 		 * rollback or destroy what we created, so we don't
1362 		 * leave it in the restoring state.
1363 		 */
1364 		txg_wait_synced(dd->dd_pool, 0);
1365 		if (drrb->drr_fromguid) {
1366 			/* incremental: rollback to most recent snapshot */
1367 			(void) dsl_dir_sync_task(dd,
1368 			    dsl_dataset_rollback_sync, NULL, 0);
1369 		} else {
1370 			/* full: destroy whole fs */
1371 			cp = strchr(tosnap, '@');
1372 			*cp = '\0';
1373 			cp = strchr(tosnap, '/');
1374 			if (cp) {
1375 				(void) dsl_dir_sync_task(dd,
1376 				    dsl_dir_destroy_sync, cp+1, 0);
1377 			}
1378 			cp = strchr(tosnap, '\0');
1379 			*cp = '@';
1380 		}
1381 
1382 	}
1383 
1384 	if (dd)
1385 		dsl_dir_close(dd, FTAG);
1386 	kmem_free(ra.buf, ra.bufsize);
1387 	if (sizep)
1388 		*sizep = ra.voff;
1389 	return (ra.err);
1390 }
1391 
1392 /*
1393  * Intent log support: sync the block at <os, object, offset> to disk.
1394  * N.B. and XXX: the caller is responsible for serializing dmu_sync()s
1395  * of the same block, and for making sure that the data isn't changing
1396  * while dmu_sync() is writing it.
1397  *
1398  * Return values:
1399  *
1400  *	EALREADY: this txg has already been synced, so there's nothing to to.
1401  *		The caller should not log the write.
1402  *
1403  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
1404  *		The caller should not log the write.
1405  *
1406  *	EINPROGRESS: the block is in the process of being synced by the
1407  *		usual mechanism (spa_sync()), so we can't sync it here.
1408  *		The caller should txg_wait_synced() and not log the write.
1409  *
1410  *	EBUSY: another thread is trying to dmu_sync() the same dbuf.
1411  *		(This case cannot arise under the current locking rules.)
1412  *		The caller should txg_wait_synced() and not log the write.
1413  *
1414  *	ESTALE: the block was dirtied or freed while we were writing it,
1415  *		so the data is no longer valid.
1416  *		The caller should txg_wait_synced() and not log the write.
1417  *
1418  *	0: success.  Sets *bp to the blkptr just written, and sets
1419  *		*blkoff to the data's offset within that block.
1420  *		The caller should log this blkptr/blkoff in its lr_write_t.
1421  */
1422 int
1423 dmu_sync(objset_t *os, uint64_t object, uint64_t offset, uint64_t *blkoff,
1424     blkptr_t *bp, uint64_t txg)
1425 {
1426 	objset_impl_t *osi = os->os;
1427 	dsl_pool_t *dp = osi->os_dsl_dataset->ds_dir->dd_pool;
1428 	tx_state_t *tx = &dp->dp_tx;
1429 	dmu_buf_impl_t *db;
1430 	blkptr_t *blk;
1431 	int err;
1432 	zbookmark_t zb;
1433 
1434 	ASSERT(RW_LOCK_HELD(&tx->tx_suspend));
1435 	ASSERT(BP_IS_HOLE(bp));
1436 	ASSERT(txg != 0);
1437 
1438 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
1439 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
1440 
1441 	/*
1442 	 * XXX why is this routine using dmu_buf_*() and casting between
1443 	 * dmu_buf_impl_t and dmu_buf_t?
1444 	 */
1445 
1446 	/*
1447 	 * If this txg already synced, there's nothing to do.
1448 	 */
1449 	if (txg <= tx->tx_synced_txg) {
1450 		/*
1451 		 * If we're running ziltest, we need the blkptr regardless.
1452 		 */
1453 		if (txg > spa_freeze_txg(dp->dp_spa)) {
1454 			err = dmu_buf_hold(os, object, offset,
1455 			    FTAG, (dmu_buf_t **)&db);
1456 			if (err)
1457 				return (err);
1458 			/* if db_blkptr == NULL, this was an empty write */
1459 			if (db->db_blkptr)
1460 				*bp = *db->db_blkptr; /* structure assignment */
1461 			else
1462 				bzero(bp, sizeof (blkptr_t));
1463 			*blkoff = offset - db->db.db_offset;
1464 			ASSERT3U(*blkoff, <, db->db.db_size);
1465 			dmu_buf_rele((dmu_buf_t *)db, FTAG);
1466 			return (0);
1467 		}
1468 		return (EALREADY);
1469 	}
1470 
1471 	/*
1472 	 * If this txg is in the middle of syncing, just wait for it.
1473 	 */
1474 	if (txg == tx->tx_syncing_txg) {
1475 		ASSERT(txg != tx->tx_open_txg);
1476 		return (EINPROGRESS);
1477 	}
1478 
1479 	err = dmu_buf_hold(os, object, offset, FTAG, (dmu_buf_t **)&db);
1480 	if (err)
1481 		return (err);
1482 
1483 	mutex_enter(&db->db_mtx);
1484 
1485 	/*
1486 	 * If this dbuf isn't dirty, must have been free_range'd.
1487 	 * There's no need to log writes to freed blocks, so we're done.
1488 	 */
1489 	if (!list_link_active(&db->db_dirty_node[txg&TXG_MASK])) {
1490 		mutex_exit(&db->db_mtx);
1491 		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1492 		return (ENOENT);
1493 	}
1494 
1495 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
1496 
1497 	/*
1498 	 * If we already did a dmu_sync() of this dbuf in this txg,
1499 	 * free the old block before writing the new one.
1500 	 */
1501 	if (blk != NULL) {
1502 		ASSERT(blk != IN_DMU_SYNC);
1503 		if (blk == IN_DMU_SYNC) {
1504 			mutex_exit(&db->db_mtx);
1505 			dmu_buf_rele((dmu_buf_t *)db, FTAG);
1506 			return (EBUSY);
1507 		}
1508 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
1509 		if (!BP_IS_HOLE(blk)) {
1510 			(void) arc_free(NULL, osi->os_spa, txg, blk,
1511 			    NULL, NULL, ARC_WAIT);
1512 		}
1513 		kmem_free(blk, sizeof (blkptr_t));
1514 	}
1515 
1516 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
1517 	mutex_exit(&db->db_mtx);
1518 
1519 	blk = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
1520 	blk->blk_birth = 0; /* mark as invalid */
1521 
1522 	zb.zb_objset = osi->os_dsl_dataset->ds_object;
1523 	zb.zb_object = db->db.db_object;
1524 	zb.zb_level = db->db_level;
1525 	zb.zb_blkid = db->db_blkid;
1526 	err = arc_write(NULL, osi->os_spa,
1527 	    zio_checksum_select(db->db_dnode->dn_checksum, osi->os_checksum),
1528 	    zio_compress_select(db->db_dnode->dn_compress, osi->os_compress),
1529 	    dmu_get_replication_level(osi->os_spa, &zb, db->db_dnode->dn_type),
1530 	    txg, blk, db->db_d.db_data_old[txg&TXG_MASK], NULL, NULL,
1531 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, ARC_WAIT, &zb);
1532 	ASSERT(err == 0);
1533 
1534 	if (!BP_IS_HOLE(blk)) {
1535 		blk->blk_fill = 1;
1536 		BP_SET_TYPE(blk, db->db_dnode->dn_type);
1537 		BP_SET_LEVEL(blk, 0);
1538 	}
1539 
1540 	/* copy the block pointer back to caller */
1541 	*bp = *blk; /* structure assignment */
1542 	*blkoff = offset - db->db.db_offset;
1543 	ASSERT3U(*blkoff, <, db->db.db_size);
1544 
1545 	mutex_enter(&db->db_mtx);
1546 	if (db->db_d.db_overridden_by[txg&TXG_MASK] != IN_DMU_SYNC) {
1547 		/* we were dirtied/freed during the sync */
1548 		ASSERT3P(db->db_d.db_overridden_by[txg&TXG_MASK], ==, NULL);
1549 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
1550 		mutex_exit(&db->db_mtx);
1551 		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1552 		/* Note that this block does not free on disk until txg syncs */
1553 
1554 		/*
1555 		 * XXX can we use ARC_NOWAIT here?
1556 		 * XXX should we be ignoring the return code?
1557 		 */
1558 		if (!BP_IS_HOLE(blk)) {
1559 			(void) arc_free(NULL, osi->os_spa, txg, blk,
1560 			    NULL, NULL, ARC_WAIT);
1561 		}
1562 		kmem_free(blk, sizeof (blkptr_t));
1563 		return (ESTALE);
1564 	}
1565 
1566 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
1567 	mutex_exit(&db->db_mtx);
1568 	dmu_buf_rele((dmu_buf_t *)db, FTAG);
1569 	ASSERT3U(txg, >, tx->tx_syncing_txg);
1570 	return (0);
1571 }
1572 
1573 uint64_t
1574 dmu_object_max_nonzero_offset(objset_t *os, uint64_t object)
1575 {
1576 	dnode_t *dn;
1577 
1578 	/* XXX assumes dnode_hold will not get an i/o error */
1579 	(void) dnode_hold(os->os, object, FTAG, &dn);
1580 	uint64_t rv = dnode_max_nonzero_offset(dn);
1581 	dnode_rele(dn, FTAG);
1582 	return (rv);
1583 }
1584 
1585 int
1586 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
1587 	dmu_tx_t *tx)
1588 {
1589 	dnode_t *dn;
1590 	int err;
1591 
1592 	err = dnode_hold(os->os, object, FTAG, &dn);
1593 	if (err)
1594 		return (err);
1595 	err = dnode_set_blksz(dn, size, ibs, tx);
1596 	dnode_rele(dn, FTAG);
1597 	return (err);
1598 }
1599 
1600 void
1601 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1602 	dmu_tx_t *tx)
1603 {
1604 	dnode_t *dn;
1605 
1606 	/* XXX assumes dnode_hold will not get an i/o error */
1607 	(void) dnode_hold(os->os, object, FTAG, &dn);
1608 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1609 	dn->dn_checksum = checksum;
1610 	dnode_setdirty(dn, tx);
1611 	dnode_rele(dn, FTAG);
1612 }
1613 
1614 void
1615 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1616 	dmu_tx_t *tx)
1617 {
1618 	dnode_t *dn;
1619 
1620 	/* XXX assumes dnode_hold will not get an i/o error */
1621 	(void) dnode_hold(os->os, object, FTAG, &dn);
1622 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1623 	dn->dn_compress = compress;
1624 	dnode_setdirty(dn, tx);
1625 	dnode_rele(dn, FTAG);
1626 }
1627 
1628 /*
1629  * XXX - eventually, this should take into account per-dataset (or
1630  *       even per-object?) user requests for higher levels of replication.
1631  */
1632 int
1633 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
1634 {
1635 	int ncopies = 1;
1636 
1637 	if (dmu_ot[ot].ot_metadata)
1638 		ncopies++;
1639 	if (zb->zb_level != 0)
1640 		ncopies++;
1641 	if (zb->zb_objset == 0 && zb->zb_object == 0)
1642 		ncopies++;
1643 	return (MIN(ncopies, spa_max_replication(spa)));
1644 }
1645 
1646 int
1647 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1648 {
1649 	dnode_t *dn;
1650 	int i, err;
1651 
1652 	err = dnode_hold(os->os, object, FTAG, &dn);
1653 	if (err)
1654 		return (err);
1655 	/*
1656 	 * Sync any current changes before
1657 	 * we go trundling through the block pointers.
1658 	 */
1659 	for (i = 0; i < TXG_SIZE; i++) {
1660 		if (list_link_active(&dn->dn_dirty_link[i]))
1661 			break;
1662 	}
1663 	if (i != TXG_SIZE) {
1664 		dnode_rele(dn, FTAG);
1665 		txg_wait_synced(dmu_objset_pool(os), 0);
1666 		err = dnode_hold(os->os, object, FTAG, &dn);
1667 		if (err)
1668 			return (err);
1669 	}
1670 
1671 	err = dnode_next_offset(dn, hole, off, 1, 1);
1672 	dnode_rele(dn, FTAG);
1673 
1674 	return (err);
1675 }
1676 
1677 void
1678 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1679 {
1680 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1681 	mutex_enter(&dn->dn_mtx);
1682 
1683 	doi->doi_data_block_size = dn->dn_datablksz;
1684 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1685 	    1ULL << dn->dn_indblkshift : 0;
1686 	doi->doi_indirection = dn->dn_nlevels;
1687 	doi->doi_checksum = dn->dn_checksum;
1688 	doi->doi_compress = dn->dn_compress;
1689 	doi->doi_physical_blks = dn->dn_phys->dn_secphys;
1690 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1691 	doi->doi_type = dn->dn_type;
1692 	doi->doi_bonus_size = dn->dn_bonuslen;
1693 	doi->doi_bonus_type = dn->dn_bonustype;
1694 
1695 	mutex_exit(&dn->dn_mtx);
1696 	rw_exit(&dn->dn_struct_rwlock);
1697 }
1698 
1699 /*
1700  * Get information on a DMU object.
1701  * If doi is NULL, just indicates whether the object exists.
1702  */
1703 int
1704 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1705 {
1706 	dnode_t *dn;
1707 	int err = dnode_hold(os->os, object, FTAG, &dn);
1708 
1709 	if (err)
1710 		return (err);
1711 
1712 	if (doi != NULL)
1713 		dmu_object_info_from_dnode(dn, doi);
1714 
1715 	dnode_rele(dn, FTAG);
1716 	return (0);
1717 }
1718 
1719 /*
1720  * As above, but faster; can be used when you have a held dbuf in hand.
1721  */
1722 void
1723 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1724 {
1725 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1726 }
1727 
1728 /*
1729  * Faster still when you only care about the size.
1730  * This is specifically optimized for zfs_getattr().
1731  */
1732 void
1733 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1734 {
1735 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1736 
1737 	*blksize = dn->dn_datablksz;
1738 	*nblk512 = dn->dn_phys->dn_secphys + 1;	/* add 1 for dnode space */
1739 }
1740 
1741 /*
1742  * Given a bookmark, return the name of the dataset, object, and range in
1743  * human-readable format.
1744  */
1745 int
1746 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, char *dsname, size_t dslen,
1747     char *objname, size_t objlen, char *range, size_t rangelen)
1748 {
1749 	dsl_pool_t *dp;
1750 	dsl_dataset_t *ds = NULL;
1751 	objset_t *os = NULL;
1752 	dnode_t *dn = NULL;
1753 	int err, shift;
1754 
1755 	if (dslen < MAXNAMELEN || objlen < 32 || rangelen < 64)
1756 		return (ENOSPC);
1757 
1758 	dp = spa_get_dsl(spa);
1759 	if (zb->zb_objset != 0) {
1760 		rw_enter(&dp->dp_config_rwlock, RW_READER);
1761 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
1762 		    NULL, DS_MODE_NONE, FTAG, &ds);
1763 		if (err) {
1764 			rw_exit(&dp->dp_config_rwlock);
1765 			return (err);
1766 		}
1767 		dsl_dataset_name(ds, dsname);
1768 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
1769 		rw_exit(&dp->dp_config_rwlock);
1770 
1771 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
1772 		if (err)
1773 			goto out;
1774 
1775 	} else {
1776 		dsl_dataset_name(NULL, dsname);
1777 		os = dp->dp_meta_objset;
1778 	}
1779 
1780 
1781 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
1782 		(void) strncpy(objname, "mdn", objlen);
1783 	} else {
1784 		(void) snprintf(objname, objlen, "%lld",
1785 		    (longlong_t)zb->zb_object);
1786 	}
1787 
1788 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
1789 	if (err)
1790 		goto out;
1791 
1792 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
1793 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
1794 	(void) snprintf(range, rangelen, "%llu-%llu",
1795 	    (u_longlong_t)(zb->zb_blkid << shift),
1796 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
1797 
1798 out:
1799 	if (dn)
1800 		dnode_rele(dn, FTAG);
1801 	if (os && os != dp->dp_meta_objset)
1802 		dmu_objset_close(os);
1803 	return (err);
1804 }
1805 
1806 void
1807 byteswap_uint64_array(void *vbuf, size_t size)
1808 {
1809 	uint64_t *buf = vbuf;
1810 	size_t count = size >> 3;
1811 	int i;
1812 
1813 	ASSERT((size & 7) == 0);
1814 
1815 	for (i = 0; i < count; i++)
1816 		buf[i] = BSWAP_64(buf[i]);
1817 }
1818 
1819 void
1820 byteswap_uint32_array(void *vbuf, size_t size)
1821 {
1822 	uint32_t *buf = vbuf;
1823 	size_t count = size >> 2;
1824 	int i;
1825 
1826 	ASSERT((size & 3) == 0);
1827 
1828 	for (i = 0; i < count; i++)
1829 		buf[i] = BSWAP_32(buf[i]);
1830 }
1831 
1832 void
1833 byteswap_uint16_array(void *vbuf, size_t size)
1834 {
1835 	uint16_t *buf = vbuf;
1836 	size_t count = size >> 1;
1837 	int i;
1838 
1839 	ASSERT((size & 1) == 0);
1840 
1841 	for (i = 0; i < count; i++)
1842 		buf[i] = BSWAP_16(buf[i]);
1843 }
1844 
1845 /* ARGSUSED */
1846 void
1847 byteswap_uint8_array(void *vbuf, size_t size)
1848 {
1849 }
1850 
1851 void
1852 dmu_init(void)
1853 {
1854 	dbuf_init();
1855 	dnode_init();
1856 	arc_init();
1857 }
1858 
1859 void
1860 dmu_fini(void)
1861 {
1862 	arc_fini();
1863 	dnode_fini();
1864 	dbuf_fini();
1865 }
1866