xref: /titanic_41/usr/src/uts/common/fs/zfs/dmu.c (revision 111141478eff4400835d62aac967ab46a197dee3)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dmu_zfetch.h>
41 #include <sys/zfs_ioctl.h>
42 #include <sys/zap.h>
43 #include <sys/zio_checksum.h>
44 #ifdef _KERNEL
45 #include <sys/vmsystm.h>
46 #endif
47 
48 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
49 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
50 	{	zap_byteswap,		TRUE,	"object directory"	},
51 	{	byteswap_uint64_array,	TRUE,	"object array"		},
52 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
53 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
54 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
55 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
56 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
58 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
59 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
60 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
61 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
62 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
63 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
64 	{	zap_byteswap,		TRUE,	"DSL props"		},
65 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
66 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
67 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
68 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
69 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
70 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
71 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
72 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
73 	{	zap_byteswap,		TRUE,	"zvol prop"		},
74 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
75 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
76 	{	zap_byteswap,		TRUE,	"other ZAP"		},
77 	{	zap_byteswap,		TRUE,	"persistent error log"	},
78 };
79 
80 int
81 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
82     void *tag, dmu_buf_t **dbp)
83 {
84 	dnode_t *dn;
85 	uint64_t blkid;
86 	dmu_buf_impl_t *db;
87 	int err;
88 
89 	err = dnode_hold(os->os, object, FTAG, &dn);
90 	if (err)
91 		return (err);
92 	blkid = dbuf_whichblock(dn, offset);
93 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
94 	db = dbuf_hold(dn, blkid, tag);
95 	rw_exit(&dn->dn_struct_rwlock);
96 	if (db == NULL) {
97 		err = EIO;
98 	} else {
99 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
100 		if (err) {
101 			dbuf_rele(db, tag);
102 			db = NULL;
103 		}
104 	}
105 
106 	dnode_rele(dn, FTAG);
107 	*dbp = &db->db;
108 	return (err);
109 }
110 
111 int
112 dmu_bonus_max(void)
113 {
114 	return (DN_MAX_BONUSLEN);
115 }
116 
117 /*
118  * returns ENOENT, EIO, or 0.
119  */
120 int
121 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
122 {
123 	dnode_t *dn;
124 	int err, count;
125 	dmu_buf_impl_t *db;
126 
127 	err = dnode_hold(os->os, object, FTAG, &dn);
128 	if (err)
129 		return (err);
130 
131 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
132 	if (dn->dn_bonus == NULL) {
133 		rw_exit(&dn->dn_struct_rwlock);
134 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
135 		if (dn->dn_bonus == NULL)
136 			dn->dn_bonus = dbuf_create_bonus(dn);
137 	}
138 	db = dn->dn_bonus;
139 	rw_exit(&dn->dn_struct_rwlock);
140 	mutex_enter(&db->db_mtx);
141 	count = refcount_add(&db->db_holds, tag);
142 	mutex_exit(&db->db_mtx);
143 	if (count == 1)
144 		dnode_add_ref(dn, db);
145 	dnode_rele(dn, FTAG);
146 
147 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
148 
149 	*dbp = &db->db;
150 	return (0);
151 }
152 
153 /*
154  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
155  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
156  * and can induce severe lock contention when writing to several files
157  * whose dnodes are in the same block.
158  */
159 static int
160 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
161     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
162 {
163 	dmu_buf_t **dbp;
164 	uint64_t blkid, nblks, i;
165 	uint32_t flags;
166 	int err;
167 	zio_t *zio;
168 
169 	ASSERT(length <= DMU_MAX_ACCESS);
170 
171 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
172 	if (length > zfetch_array_rd_sz)
173 		flags |= DB_RF_NOPREFETCH;
174 
175 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
176 	if (dn->dn_datablkshift) {
177 		int blkshift = dn->dn_datablkshift;
178 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
179 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
180 	} else {
181 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
182 		nblks = 1;
183 	}
184 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
185 
186 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
187 	blkid = dbuf_whichblock(dn, offset);
188 	for (i = 0; i < nblks; i++) {
189 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
190 		if (db == NULL) {
191 			rw_exit(&dn->dn_struct_rwlock);
192 			dmu_buf_rele_array(dbp, nblks, tag);
193 			zio_nowait(zio);
194 			return (EIO);
195 		}
196 		/* initiate async i/o */
197 		if (read) {
198 			rw_exit(&dn->dn_struct_rwlock);
199 			(void) dbuf_read(db, zio, flags);
200 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
201 		}
202 		dbp[i] = &db->db;
203 	}
204 	rw_exit(&dn->dn_struct_rwlock);
205 
206 	/* wait for async i/o */
207 	err = zio_wait(zio);
208 	if (err) {
209 		dmu_buf_rele_array(dbp, nblks, tag);
210 		return (err);
211 	}
212 
213 	/* wait for other io to complete */
214 	if (read) {
215 		for (i = 0; i < nblks; i++) {
216 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
217 			mutex_enter(&db->db_mtx);
218 			while (db->db_state == DB_READ ||
219 			    db->db_state == DB_FILL)
220 				cv_wait(&db->db_changed, &db->db_mtx);
221 			if (db->db_state == DB_UNCACHED)
222 				err = EIO;
223 			mutex_exit(&db->db_mtx);
224 			if (err) {
225 				dmu_buf_rele_array(dbp, nblks, tag);
226 				return (err);
227 			}
228 		}
229 	}
230 
231 	*numbufsp = nblks;
232 	*dbpp = dbp;
233 	return (0);
234 }
235 
236 int
237 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
238     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
239 {
240 	dnode_t *dn;
241 	int err;
242 
243 	err = dnode_hold(os->os, object, FTAG, &dn);
244 	if (err)
245 		return (err);
246 
247 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
248 	    numbufsp, dbpp);
249 
250 	dnode_rele(dn, FTAG);
251 
252 	return (err);
253 }
254 
255 int
256 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
257     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
258 {
259 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
260 	int err;
261 
262 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
263 	    numbufsp, dbpp);
264 
265 	return (err);
266 }
267 
268 void
269 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
270 {
271 	int i;
272 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
273 
274 	if (numbufs == 0)
275 		return;
276 
277 	for (i = 0; i < numbufs; i++) {
278 		if (dbp[i])
279 			dbuf_rele(dbp[i], tag);
280 	}
281 
282 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
283 }
284 
285 void
286 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
287 {
288 	dnode_t *dn;
289 	uint64_t blkid;
290 	int nblks, i, err;
291 
292 	if (len == 0) {  /* they're interested in the bonus buffer */
293 		dn = os->os->os_meta_dnode;
294 
295 		if (object == 0 || object >= DN_MAX_OBJECT)
296 			return;
297 
298 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
299 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
300 		dbuf_prefetch(dn, blkid);
301 		rw_exit(&dn->dn_struct_rwlock);
302 		return;
303 	}
304 
305 	/*
306 	 * XXX - Note, if the dnode for the requested object is not
307 	 * already cached, we will do a *synchronous* read in the
308 	 * dnode_hold() call.  The same is true for any indirects.
309 	 */
310 	err = dnode_hold(os->os, object, FTAG, &dn);
311 	if (err != 0)
312 		return;
313 
314 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
315 	if (dn->dn_datablkshift) {
316 		int blkshift = dn->dn_datablkshift;
317 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
318 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
319 	} else {
320 		nblks = (offset < dn->dn_datablksz);
321 	}
322 
323 	if (nblks != 0) {
324 		blkid = dbuf_whichblock(dn, offset);
325 		for (i = 0; i < nblks; i++)
326 			dbuf_prefetch(dn, blkid+i);
327 	}
328 
329 	rw_exit(&dn->dn_struct_rwlock);
330 
331 	dnode_rele(dn, FTAG);
332 }
333 
334 int
335 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
336     uint64_t size, dmu_tx_t *tx)
337 {
338 	dnode_t *dn;
339 	int err = dnode_hold(os->os, object, FTAG, &dn);
340 	if (err)
341 		return (err);
342 	ASSERT(offset < UINT64_MAX);
343 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
344 	dnode_free_range(dn, offset, size, tx);
345 	dnode_rele(dn, FTAG);
346 	return (0);
347 }
348 
349 int
350 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
351     void *buf)
352 {
353 	dnode_t *dn;
354 	dmu_buf_t **dbp;
355 	int numbufs, i, err;
356 
357 	/*
358 	 * Deal with odd block sizes, where there can't be data past the
359 	 * first block.
360 	 */
361 	err = dnode_hold(os->os, object, FTAG, &dn);
362 	if (err)
363 		return (err);
364 	if (dn->dn_datablkshift == 0) {
365 		int newsz = offset > dn->dn_datablksz ? 0 :
366 		    MIN(size, dn->dn_datablksz - offset);
367 		bzero((char *)buf + newsz, size - newsz);
368 		size = newsz;
369 	}
370 	dnode_rele(dn, FTAG);
371 
372 	while (size > 0) {
373 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
374 		int err;
375 
376 		/*
377 		 * NB: we could do this block-at-a-time, but it's nice
378 		 * to be reading in parallel.
379 		 */
380 		err = dmu_buf_hold_array(os, object, offset, mylen,
381 		    TRUE, FTAG, &numbufs, &dbp);
382 		if (err)
383 			return (err);
384 
385 		for (i = 0; i < numbufs; i++) {
386 			int tocpy;
387 			int bufoff;
388 			dmu_buf_t *db = dbp[i];
389 
390 			ASSERT(size > 0);
391 
392 			bufoff = offset - db->db_offset;
393 			tocpy = (int)MIN(db->db_size - bufoff, size);
394 
395 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
396 
397 			offset += tocpy;
398 			size -= tocpy;
399 			buf = (char *)buf + tocpy;
400 		}
401 		dmu_buf_rele_array(dbp, numbufs, FTAG);
402 	}
403 	return (0);
404 }
405 
406 void
407 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
408     const void *buf, dmu_tx_t *tx)
409 {
410 	dmu_buf_t **dbp;
411 	int numbufs, i;
412 
413 	if (size == 0)
414 		return;
415 
416 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
417 	    FALSE, FTAG, &numbufs, &dbp));
418 
419 	for (i = 0; i < numbufs; i++) {
420 		int tocpy;
421 		int bufoff;
422 		dmu_buf_t *db = dbp[i];
423 
424 		ASSERT(size > 0);
425 
426 		bufoff = offset - db->db_offset;
427 		tocpy = (int)MIN(db->db_size - bufoff, size);
428 
429 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
430 
431 		if (tocpy == db->db_size)
432 			dmu_buf_will_fill(db, tx);
433 		else
434 			dmu_buf_will_dirty(db, tx);
435 
436 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
437 
438 		if (tocpy == db->db_size)
439 			dmu_buf_fill_done(db, tx);
440 
441 		offset += tocpy;
442 		size -= tocpy;
443 		buf = (char *)buf + tocpy;
444 	}
445 	dmu_buf_rele_array(dbp, numbufs, FTAG);
446 }
447 
448 #ifdef _KERNEL
449 int
450 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
451     uio_t *uio, dmu_tx_t *tx)
452 {
453 	dmu_buf_t **dbp;
454 	int numbufs, i;
455 	int err = 0;
456 
457 	if (size == 0)
458 		return (0);
459 
460 	err = dmu_buf_hold_array(os, object, offset, size,
461 	    FALSE, FTAG, &numbufs, &dbp);
462 	if (err)
463 		return (err);
464 
465 	for (i = 0; i < numbufs; i++) {
466 		int tocpy;
467 		int bufoff;
468 		dmu_buf_t *db = dbp[i];
469 
470 		ASSERT(size > 0);
471 
472 		bufoff = offset - db->db_offset;
473 		tocpy = (int)MIN(db->db_size - bufoff, size);
474 
475 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
476 
477 		if (tocpy == db->db_size)
478 			dmu_buf_will_fill(db, tx);
479 		else
480 			dmu_buf_will_dirty(db, tx);
481 
482 		/*
483 		 * XXX uiomove could block forever (eg. nfs-backed
484 		 * pages).  There needs to be a uiolockdown() function
485 		 * to lock the pages in memory, so that uiomove won't
486 		 * block.
487 		 */
488 		err = uiomove((char *)db->db_data + bufoff, tocpy,
489 		    UIO_WRITE, uio);
490 
491 		if (tocpy == db->db_size)
492 			dmu_buf_fill_done(db, tx);
493 
494 		if (err)
495 			break;
496 
497 		offset += tocpy;
498 		size -= tocpy;
499 	}
500 	dmu_buf_rele_array(dbp, numbufs, FTAG);
501 	return (err);
502 }
503 
504 int
505 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
506     page_t *pp, dmu_tx_t *tx)
507 {
508 	dmu_buf_t **dbp;
509 	int numbufs, i;
510 	int err;
511 
512 	if (size == 0)
513 		return (0);
514 
515 	err = dmu_buf_hold_array(os, object, offset, size,
516 	    FALSE, FTAG, &numbufs, &dbp);
517 	if (err)
518 		return (err);
519 
520 	for (i = 0; i < numbufs; i++) {
521 		int tocpy, copied, thiscpy;
522 		int bufoff;
523 		dmu_buf_t *db = dbp[i];
524 		caddr_t va;
525 
526 		ASSERT(size > 0);
527 		ASSERT3U(db->db_size, >=, PAGESIZE);
528 
529 		bufoff = offset - db->db_offset;
530 		tocpy = (int)MIN(db->db_size - bufoff, size);
531 
532 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
533 
534 		if (tocpy == db->db_size)
535 			dmu_buf_will_fill(db, tx);
536 		else
537 			dmu_buf_will_dirty(db, tx);
538 
539 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
540 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
541 			thiscpy = MIN(PAGESIZE, tocpy - copied);
542 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
543 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
544 			ppmapout(va);
545 			pp = pp->p_next;
546 			bufoff += PAGESIZE;
547 		}
548 
549 		if (tocpy == db->db_size)
550 			dmu_buf_fill_done(db, tx);
551 
552 		if (err)
553 			break;
554 
555 		offset += tocpy;
556 		size -= tocpy;
557 	}
558 	dmu_buf_rele_array(dbp, numbufs, FTAG);
559 	return (err);
560 }
561 #endif
562 
563 typedef struct {
564 	uint64_t	txg;
565 	dmu_buf_impl_t	*db;
566 	dmu_sync_cb_t	*done;
567 	void		*arg;
568 } dmu_sync_cbin_t;
569 
570 typedef union {
571 	dmu_sync_cbin_t	data;
572 	blkptr_t	blk;
573 } dmu_sync_cbarg_t;
574 
575 /* ARGSUSED */
576 static void
577 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
578 {
579 	dmu_sync_cbin_t *in = (dmu_sync_cbin_t *)varg;
580 	dmu_buf_impl_t *db = in->db;
581 	uint64_t txg = in->txg;
582 	dmu_sync_cb_t *done = in->done;
583 	void *arg = in->arg;
584 	blkptr_t *blk = (blkptr_t *)varg;
585 
586 	if (!BP_IS_HOLE(zio->io_bp)) {
587 		zio->io_bp->blk_fill = 1;
588 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
589 		BP_SET_LEVEL(zio->io_bp, 0);
590 	}
591 
592 	*blk = *zio->io_bp; /* structure assignment */
593 
594 	mutex_enter(&db->db_mtx);
595 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == IN_DMU_SYNC);
596 	db->db_d.db_overridden_by[txg&TXG_MASK] = blk;
597 	cv_broadcast(&db->db_changed);
598 	mutex_exit(&db->db_mtx);
599 
600 	if (done)
601 		done(&(db->db), arg);
602 }
603 
604 /*
605  * Intent log support: sync the block associated with db to disk.
606  * N.B. and XXX: the caller is responsible for making sure that the
607  * data isn't changing while dmu_sync() is writing it.
608  *
609  * Return values:
610  *
611  *	EEXIST: this txg has already been synced, so there's nothing to to.
612  *		The caller should not log the write.
613  *
614  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
615  *		The caller should not log the write.
616  *
617  *	EALREADY: this block is already in the process of being synced.
618  *		The caller should track its progress (somehow).
619  *
620  *	EINPROGRESS: the IO has been initiated.
621  *		The caller should log this blkptr in the callback.
622  *
623  *	0: completed.  Sets *bp to the blkptr just written.
624  *		The caller should log this blkptr immediately.
625  */
626 int
627 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
628     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
629 {
630 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
631 	objset_impl_t *os = db->db_objset;
632 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
633 	tx_state_t *tx = &dp->dp_tx;
634 	dmu_sync_cbin_t *in;
635 	blkptr_t *blk;
636 	zbookmark_t zb;
637 	uint32_t arc_flag;
638 	int err;
639 
640 	ASSERT(BP_IS_HOLE(bp));
641 	ASSERT(txg != 0);
642 
643 
644 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
645 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
646 
647 	/*
648 	 * XXX - would be nice if we could do this without suspending...
649 	 */
650 	txg_suspend(dp);
651 
652 	/*
653 	 * If this txg already synced, there's nothing to do.
654 	 */
655 	if (txg <= tx->tx_synced_txg) {
656 		txg_resume(dp);
657 		/*
658 		 * If we're running ziltest, we need the blkptr regardless.
659 		 */
660 		if (txg > spa_freeze_txg(dp->dp_spa)) {
661 			/* if db_blkptr == NULL, this was an empty write */
662 			if (db->db_blkptr)
663 				*bp = *db->db_blkptr; /* structure assignment */
664 			return (0);
665 		}
666 		return (EEXIST);
667 	}
668 
669 	mutex_enter(&db->db_mtx);
670 
671 	blk = db->db_d.db_overridden_by[txg&TXG_MASK];
672 	if (blk == IN_DMU_SYNC) {
673 		/*
674 		 * We have already issued a sync write for this buffer.
675 		 */
676 		mutex_exit(&db->db_mtx);
677 		txg_resume(dp);
678 		return (EALREADY);
679 	} else if (blk != NULL) {
680 		/*
681 		 * This buffer had already been synced.  It could not
682 		 * have been dirtied since, or we would have cleared blk.
683 		 */
684 		*bp = *blk; /* structure assignment */
685 		mutex_exit(&db->db_mtx);
686 		txg_resume(dp);
687 		return (0);
688 	}
689 
690 	if (txg == tx->tx_syncing_txg) {
691 		while (db->db_data_pending) {
692 			/*
693 			 * IO is in-progress.  Wait for it to finish.
694 			 * XXX - would be nice to be able to somehow "attach"
695 			 * this zio to the parent zio passed in.
696 			 */
697 			cv_wait(&db->db_changed, &db->db_mtx);
698 			if (!db->db_data_pending &&
699 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
700 				/*
701 				 * IO was compressed away
702 				 */
703 				*bp = *db->db_blkptr; /* structure assignment */
704 				mutex_exit(&db->db_mtx);
705 				txg_resume(dp);
706 				return (0);
707 			}
708 			ASSERT(db->db_data_pending ||
709 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
710 		}
711 
712 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
713 			/*
714 			 * IO is already completed.
715 			 */
716 			*bp = *db->db_blkptr; /* structure assignment */
717 			mutex_exit(&db->db_mtx);
718 			txg_resume(dp);
719 			return (0);
720 		}
721 	}
722 
723 	if (db->db_d.db_data_old[txg&TXG_MASK] == NULL) {
724 		/*
725 		 * This dbuf isn't dirty, must have been free_range'd.
726 		 * There's no need to log writes to freed blocks, so we're done.
727 		 */
728 		mutex_exit(&db->db_mtx);
729 		txg_resume(dp);
730 		return (ENOENT);
731 	}
732 
733 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == NULL);
734 	db->db_d.db_overridden_by[txg&TXG_MASK] = IN_DMU_SYNC;
735 	/*
736 	 * XXX - a little ugly to stash the blkptr in the callback
737 	 * buffer.  We always need to make sure the following is true:
738 	 * ASSERT(sizeof(blkptr_t) >= sizeof(dmu_sync_cbin_t));
739 	 */
740 	in = kmem_alloc(sizeof (blkptr_t), KM_SLEEP);
741 	in->db = db;
742 	in->txg = txg;
743 	in->done = done;
744 	in->arg = arg;
745 	mutex_exit(&db->db_mtx);
746 	txg_resume(dp);
747 
748 	arc_flag = pio == NULL ? ARC_WAIT : ARC_NOWAIT;
749 	zb.zb_objset = os->os_dsl_dataset->ds_object;
750 	zb.zb_object = db->db.db_object;
751 	zb.zb_level = db->db_level;
752 	zb.zb_blkid = db->db_blkid;
753 	err = arc_write(pio, os->os_spa,
754 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
755 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
756 	    dmu_get_replication_level(os->os_spa, &zb, db->db_dnode->dn_type),
757 	    txg, bp, db->db_d.db_data_old[txg&TXG_MASK], dmu_sync_done, in,
758 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, arc_flag, &zb);
759 	ASSERT(err == 0);
760 
761 	return (arc_flag == ARC_NOWAIT ? EINPROGRESS : 0);
762 }
763 
764 uint64_t
765 dmu_object_max_nonzero_offset(objset_t *os, uint64_t object)
766 {
767 	dnode_t *dn;
768 
769 	/* XXX assumes dnode_hold will not get an i/o error */
770 	(void) dnode_hold(os->os, object, FTAG, &dn);
771 	uint64_t rv = dnode_max_nonzero_offset(dn);
772 	dnode_rele(dn, FTAG);
773 	return (rv);
774 }
775 
776 int
777 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
778 	dmu_tx_t *tx)
779 {
780 	dnode_t *dn;
781 	int err;
782 
783 	err = dnode_hold(os->os, object, FTAG, &dn);
784 	if (err)
785 		return (err);
786 	err = dnode_set_blksz(dn, size, ibs, tx);
787 	dnode_rele(dn, FTAG);
788 	return (err);
789 }
790 
791 void
792 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
793 	dmu_tx_t *tx)
794 {
795 	dnode_t *dn;
796 
797 	/* XXX assumes dnode_hold will not get an i/o error */
798 	(void) dnode_hold(os->os, object, FTAG, &dn);
799 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
800 	dn->dn_checksum = checksum;
801 	dnode_setdirty(dn, tx);
802 	dnode_rele(dn, FTAG);
803 }
804 
805 void
806 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
807 	dmu_tx_t *tx)
808 {
809 	dnode_t *dn;
810 
811 	/* XXX assumes dnode_hold will not get an i/o error */
812 	(void) dnode_hold(os->os, object, FTAG, &dn);
813 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
814 	dn->dn_compress = compress;
815 	dnode_setdirty(dn, tx);
816 	dnode_rele(dn, FTAG);
817 }
818 
819 /*
820  * XXX - eventually, this should take into account per-dataset (or
821  *       even per-object?) user requests for higher levels of replication.
822  */
823 int
824 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
825 {
826 	int ncopies = 1;
827 
828 	if (dmu_ot[ot].ot_metadata)
829 		ncopies++;
830 	if (zb->zb_level != 0)
831 		ncopies++;
832 	if (zb->zb_objset == 0 && zb->zb_object == 0)
833 		ncopies++;
834 	return (MIN(ncopies, spa_max_replication(spa)));
835 }
836 
837 int
838 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
839 {
840 	dnode_t *dn;
841 	int i, err;
842 
843 	err = dnode_hold(os->os, object, FTAG, &dn);
844 	if (err)
845 		return (err);
846 	/*
847 	 * Sync any current changes before
848 	 * we go trundling through the block pointers.
849 	 */
850 	for (i = 0; i < TXG_SIZE; i++) {
851 		if (list_link_active(&dn->dn_dirty_link[i]))
852 			break;
853 	}
854 	if (i != TXG_SIZE) {
855 		dnode_rele(dn, FTAG);
856 		txg_wait_synced(dmu_objset_pool(os), 0);
857 		err = dnode_hold(os->os, object, FTAG, &dn);
858 		if (err)
859 			return (err);
860 	}
861 
862 	err = dnode_next_offset(dn, hole, off, 1, 1);
863 	dnode_rele(dn, FTAG);
864 
865 	return (err);
866 }
867 
868 void
869 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
870 {
871 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
872 	mutex_enter(&dn->dn_mtx);
873 
874 	doi->doi_data_block_size = dn->dn_datablksz;
875 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
876 	    1ULL << dn->dn_indblkshift : 0;
877 	doi->doi_indirection = dn->dn_nlevels;
878 	doi->doi_checksum = dn->dn_checksum;
879 	doi->doi_compress = dn->dn_compress;
880 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
881 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
882 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
883 	doi->doi_type = dn->dn_type;
884 	doi->doi_bonus_size = dn->dn_bonuslen;
885 	doi->doi_bonus_type = dn->dn_bonustype;
886 
887 	mutex_exit(&dn->dn_mtx);
888 	rw_exit(&dn->dn_struct_rwlock);
889 }
890 
891 /*
892  * Get information on a DMU object.
893  * If doi is NULL, just indicates whether the object exists.
894  */
895 int
896 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
897 {
898 	dnode_t *dn;
899 	int err = dnode_hold(os->os, object, FTAG, &dn);
900 
901 	if (err)
902 		return (err);
903 
904 	if (doi != NULL)
905 		dmu_object_info_from_dnode(dn, doi);
906 
907 	dnode_rele(dn, FTAG);
908 	return (0);
909 }
910 
911 /*
912  * As above, but faster; can be used when you have a held dbuf in hand.
913  */
914 void
915 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
916 {
917 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
918 }
919 
920 /*
921  * Faster still when you only care about the size.
922  * This is specifically optimized for zfs_getattr().
923  */
924 void
925 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
926 {
927 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
928 
929 	*blksize = dn->dn_datablksz;
930 	/* add 1 for dnode space */
931 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
932 	    SPA_MINBLOCKSHIFT) + 1;
933 }
934 
935 /*
936  * Given a bookmark, return the name of the dataset, object, and range in
937  * human-readable format.
938  */
939 int
940 spa_bookmark_name(spa_t *spa, zbookmark_t *zb, nvlist_t *nvl)
941 {
942 	dsl_pool_t *dp;
943 	dsl_dataset_t *ds = NULL;
944 	objset_t *os = NULL;
945 	dnode_t *dn = NULL;
946 	int err, shift;
947 	char dsname[MAXNAMELEN];
948 	char objname[32];
949 	char range[64];
950 
951 	dp = spa_get_dsl(spa);
952 	if (zb->zb_objset != 0) {
953 		rw_enter(&dp->dp_config_rwlock, RW_READER);
954 		err = dsl_dataset_open_obj(dp, zb->zb_objset,
955 		    NULL, DS_MODE_NONE, FTAG, &ds);
956 		if (err) {
957 			rw_exit(&dp->dp_config_rwlock);
958 			return (err);
959 		}
960 		dsl_dataset_name(ds, dsname);
961 		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
962 		rw_exit(&dp->dp_config_rwlock);
963 
964 		err = dmu_objset_open(dsname, DMU_OST_ANY, DS_MODE_NONE, &os);
965 		if (err)
966 			goto out;
967 
968 	} else {
969 		dsl_dataset_name(NULL, dsname);
970 		os = dp->dp_meta_objset;
971 	}
972 
973 
974 	if (zb->zb_object == DMU_META_DNODE_OBJECT) {
975 		(void) strncpy(objname, "mdn", sizeof (objname));
976 	} else {
977 		(void) snprintf(objname, sizeof (objname), "%lld",
978 		    (longlong_t)zb->zb_object);
979 	}
980 
981 	err = dnode_hold(os->os, zb->zb_object, FTAG, &dn);
982 	if (err)
983 		goto out;
984 
985 	shift = (dn->dn_datablkshift?dn->dn_datablkshift:SPA_MAXBLOCKSHIFT) +
986 	    zb->zb_level * (dn->dn_indblkshift - SPA_BLKPTRSHIFT);
987 	(void) snprintf(range, sizeof (range), "%llu-%llu",
988 	    (u_longlong_t)(zb->zb_blkid << shift),
989 	    (u_longlong_t)((zb->zb_blkid+1) << shift));
990 
991 	if ((err = nvlist_add_string(nvl, ZPOOL_ERR_DATASET, dsname)) != 0 ||
992 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_OBJECT, objname)) != 0 ||
993 	    (err = nvlist_add_string(nvl, ZPOOL_ERR_RANGE, range)) != 0)
994 		goto out;
995 
996 out:
997 	if (dn)
998 		dnode_rele(dn, FTAG);
999 	if (os && os != dp->dp_meta_objset)
1000 		dmu_objset_close(os);
1001 	return (err);
1002 }
1003 
1004 void
1005 byteswap_uint64_array(void *vbuf, size_t size)
1006 {
1007 	uint64_t *buf = vbuf;
1008 	size_t count = size >> 3;
1009 	int i;
1010 
1011 	ASSERT((size & 7) == 0);
1012 
1013 	for (i = 0; i < count; i++)
1014 		buf[i] = BSWAP_64(buf[i]);
1015 }
1016 
1017 void
1018 byteswap_uint32_array(void *vbuf, size_t size)
1019 {
1020 	uint32_t *buf = vbuf;
1021 	size_t count = size >> 2;
1022 	int i;
1023 
1024 	ASSERT((size & 3) == 0);
1025 
1026 	for (i = 0; i < count; i++)
1027 		buf[i] = BSWAP_32(buf[i]);
1028 }
1029 
1030 void
1031 byteswap_uint16_array(void *vbuf, size_t size)
1032 {
1033 	uint16_t *buf = vbuf;
1034 	size_t count = size >> 1;
1035 	int i;
1036 
1037 	ASSERT((size & 1) == 0);
1038 
1039 	for (i = 0; i < count; i++)
1040 		buf[i] = BSWAP_16(buf[i]);
1041 }
1042 
1043 /* ARGSUSED */
1044 void
1045 byteswap_uint8_array(void *vbuf, size_t size)
1046 {
1047 }
1048 
1049 void
1050 dmu_init(void)
1051 {
1052 	dbuf_init();
1053 	dnode_init();
1054 	arc_init();
1055 }
1056 
1057 void
1058 dmu_fini(void)
1059 {
1060 	arc_fini();
1061 	dnode_fini();
1062 	dbuf_fini();
1063 }
1064