xref: /titanic_51/usr/src/uts/common/fs/zfs/dmu.c (revision 3db30c357c20c1eb09687fd0194e0ca62d6358cb)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 	{	zap_byteswap,		TRUE,	"Pool properties"	},
82 	{	zap_byteswap,		TRUE,	"DSL permissions"	},
83 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
84 	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
85 	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
86 	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
87 };
88 
89 int
90 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
91     void *tag, dmu_buf_t **dbp)
92 {
93 	dnode_t *dn;
94 	uint64_t blkid;
95 	dmu_buf_impl_t *db;
96 	int err;
97 
98 	err = dnode_hold(os->os, object, FTAG, &dn);
99 	if (err)
100 		return (err);
101 	blkid = dbuf_whichblock(dn, offset);
102 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
103 	db = dbuf_hold(dn, blkid, tag);
104 	rw_exit(&dn->dn_struct_rwlock);
105 	if (db == NULL) {
106 		err = EIO;
107 	} else {
108 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
109 		if (err) {
110 			dbuf_rele(db, tag);
111 			db = NULL;
112 		}
113 	}
114 
115 	dnode_rele(dn, FTAG);
116 	*dbp = &db->db;
117 	return (err);
118 }
119 
120 int
121 dmu_bonus_max(void)
122 {
123 	return (DN_MAX_BONUSLEN);
124 }
125 
126 int
127 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
128 {
129 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
130 
131 	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
132 		return (EINVAL);
133 	if (newsize < 0 || newsize > db->db_size)
134 		return (EINVAL);
135 	dnode_setbonuslen(dn, newsize, tx);
136 	return (0);
137 }
138 
139 /*
140  * returns ENOENT, EIO, or 0.
141  */
142 int
143 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
144 {
145 	dnode_t *dn;
146 	dmu_buf_impl_t *db;
147 	int error;
148 
149 	error = dnode_hold(os->os, object, FTAG, &dn);
150 	if (error)
151 		return (error);
152 
153 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
154 	if (dn->dn_bonus == NULL) {
155 		rw_exit(&dn->dn_struct_rwlock);
156 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
157 		if (dn->dn_bonus == NULL)
158 			dbuf_create_bonus(dn);
159 	}
160 	db = dn->dn_bonus;
161 	rw_exit(&dn->dn_struct_rwlock);
162 
163 	/* as long as the bonus buf is held, the dnode will be held */
164 	if (refcount_add(&db->db_holds, tag) == 1)
165 		VERIFY(dnode_add_ref(dn, db));
166 
167 	dnode_rele(dn, FTAG);
168 
169 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
170 
171 	*dbp = &db->db;
172 	return (0);
173 }
174 
175 /*
176  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
177  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
178  * and can induce severe lock contention when writing to several files
179  * whose dnodes are in the same block.
180  */
181 static int
182 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
183     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
184 {
185 	dmu_buf_t **dbp;
186 	uint64_t blkid, nblks, i;
187 	uint32_t flags;
188 	int err;
189 	zio_t *zio;
190 
191 	ASSERT(length <= DMU_MAX_ACCESS);
192 
193 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
194 	if (length > zfetch_array_rd_sz)
195 		flags |= DB_RF_NOPREFETCH;
196 
197 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
198 	if (dn->dn_datablkshift) {
199 		int blkshift = dn->dn_datablkshift;
200 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
201 		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
202 	} else {
203 		if (offset + length > dn->dn_datablksz) {
204 			zfs_panic_recover("zfs: accessing past end of object "
205 			    "%llx/%llx (size=%u access=%llu+%llu)",
206 			    (longlong_t)dn->dn_objset->
207 			    os_dsl_dataset->ds_object,
208 			    (longlong_t)dn->dn_object, dn->dn_datablksz,
209 			    (longlong_t)offset, (longlong_t)length);
210 			return (EIO);
211 		}
212 		nblks = 1;
213 	}
214 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
215 
216 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
217 	blkid = dbuf_whichblock(dn, offset);
218 	for (i = 0; i < nblks; i++) {
219 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
220 		if (db == NULL) {
221 			rw_exit(&dn->dn_struct_rwlock);
222 			dmu_buf_rele_array(dbp, nblks, tag);
223 			zio_nowait(zio);
224 			return (EIO);
225 		}
226 		/* initiate async i/o */
227 		if (read) {
228 			rw_exit(&dn->dn_struct_rwlock);
229 			(void) dbuf_read(db, zio, flags);
230 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
231 		}
232 		dbp[i] = &db->db;
233 	}
234 	rw_exit(&dn->dn_struct_rwlock);
235 
236 	/* wait for async i/o */
237 	err = zio_wait(zio);
238 	if (err) {
239 		dmu_buf_rele_array(dbp, nblks, tag);
240 		return (err);
241 	}
242 
243 	/* wait for other io to complete */
244 	if (read) {
245 		for (i = 0; i < nblks; i++) {
246 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
247 			mutex_enter(&db->db_mtx);
248 			while (db->db_state == DB_READ ||
249 			    db->db_state == DB_FILL)
250 				cv_wait(&db->db_changed, &db->db_mtx);
251 			if (db->db_state == DB_UNCACHED)
252 				err = EIO;
253 			mutex_exit(&db->db_mtx);
254 			if (err) {
255 				dmu_buf_rele_array(dbp, nblks, tag);
256 				return (err);
257 			}
258 		}
259 	}
260 
261 	*numbufsp = nblks;
262 	*dbpp = dbp;
263 	return (0);
264 }
265 
266 static int
267 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
268     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
269 {
270 	dnode_t *dn;
271 	int err;
272 
273 	err = dnode_hold(os->os, object, FTAG, &dn);
274 	if (err)
275 		return (err);
276 
277 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
278 	    numbufsp, dbpp);
279 
280 	dnode_rele(dn, FTAG);
281 
282 	return (err);
283 }
284 
285 int
286 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
287     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
288 {
289 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
290 	int err;
291 
292 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
293 	    numbufsp, dbpp);
294 
295 	return (err);
296 }
297 
298 void
299 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
300 {
301 	int i;
302 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
303 
304 	if (numbufs == 0)
305 		return;
306 
307 	for (i = 0; i < numbufs; i++) {
308 		if (dbp[i])
309 			dbuf_rele(dbp[i], tag);
310 	}
311 
312 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
313 }
314 
315 void
316 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
317 {
318 	dnode_t *dn;
319 	uint64_t blkid;
320 	int nblks, i, err;
321 
322 	if (zfs_prefetch_disable)
323 		return;
324 
325 	if (len == 0) {  /* they're interested in the bonus buffer */
326 		dn = os->os->os_meta_dnode;
327 
328 		if (object == 0 || object >= DN_MAX_OBJECT)
329 			return;
330 
331 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
332 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
333 		dbuf_prefetch(dn, blkid);
334 		rw_exit(&dn->dn_struct_rwlock);
335 		return;
336 	}
337 
338 	/*
339 	 * XXX - Note, if the dnode for the requested object is not
340 	 * already cached, we will do a *synchronous* read in the
341 	 * dnode_hold() call.  The same is true for any indirects.
342 	 */
343 	err = dnode_hold(os->os, object, FTAG, &dn);
344 	if (err != 0)
345 		return;
346 
347 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
348 	if (dn->dn_datablkshift) {
349 		int blkshift = dn->dn_datablkshift;
350 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
351 		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
352 	} else {
353 		nblks = (offset < dn->dn_datablksz);
354 	}
355 
356 	if (nblks != 0) {
357 		blkid = dbuf_whichblock(dn, offset);
358 		for (i = 0; i < nblks; i++)
359 			dbuf_prefetch(dn, blkid+i);
360 	}
361 
362 	rw_exit(&dn->dn_struct_rwlock);
363 
364 	dnode_rele(dn, FTAG);
365 }
366 
367 int
368 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
369     uint64_t size, dmu_tx_t *tx)
370 {
371 	dnode_t *dn;
372 	int err = dnode_hold(os->os, object, FTAG, &dn);
373 	if (err)
374 		return (err);
375 	ASSERT(offset < UINT64_MAX);
376 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
377 	dnode_free_range(dn, offset, size, tx);
378 	dnode_rele(dn, FTAG);
379 	return (0);
380 }
381 
382 int
383 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
384     void *buf)
385 {
386 	dnode_t *dn;
387 	dmu_buf_t **dbp;
388 	int numbufs, i, err;
389 
390 	err = dnode_hold(os->os, object, FTAG, &dn);
391 	if (err)
392 		return (err);
393 
394 	/*
395 	 * Deal with odd block sizes, where there can't be data past the first
396 	 * block.  If we ever do the tail block optimization, we will need to
397 	 * handle that here as well.
398 	 */
399 	if (dn->dn_datablkshift == 0) {
400 		int newsz = offset > dn->dn_datablksz ? 0 :
401 		    MIN(size, dn->dn_datablksz - offset);
402 		bzero((char *)buf + newsz, size - newsz);
403 		size = newsz;
404 	}
405 
406 	while (size > 0) {
407 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
408 
409 		/*
410 		 * NB: we could do this block-at-a-time, but it's nice
411 		 * to be reading in parallel.
412 		 */
413 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
414 		    TRUE, FTAG, &numbufs, &dbp);
415 		if (err)
416 			break;
417 
418 		for (i = 0; i < numbufs; i++) {
419 			int tocpy;
420 			int bufoff;
421 			dmu_buf_t *db = dbp[i];
422 
423 			ASSERT(size > 0);
424 
425 			bufoff = offset - db->db_offset;
426 			tocpy = (int)MIN(db->db_size - bufoff, size);
427 
428 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
429 
430 			offset += tocpy;
431 			size -= tocpy;
432 			buf = (char *)buf + tocpy;
433 		}
434 		dmu_buf_rele_array(dbp, numbufs, FTAG);
435 	}
436 	dnode_rele(dn, FTAG);
437 	return (err);
438 }
439 
440 void
441 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
442     const void *buf, dmu_tx_t *tx)
443 {
444 	dmu_buf_t **dbp;
445 	int numbufs, i;
446 
447 	if (size == 0)
448 		return;
449 
450 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
451 	    FALSE, FTAG, &numbufs, &dbp));
452 
453 	for (i = 0; i < numbufs; i++) {
454 		int tocpy;
455 		int bufoff;
456 		dmu_buf_t *db = dbp[i];
457 
458 		ASSERT(size > 0);
459 
460 		bufoff = offset - db->db_offset;
461 		tocpy = (int)MIN(db->db_size - bufoff, size);
462 
463 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
464 
465 		if (tocpy == db->db_size)
466 			dmu_buf_will_fill(db, tx);
467 		else
468 			dmu_buf_will_dirty(db, tx);
469 
470 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
471 
472 		if (tocpy == db->db_size)
473 			dmu_buf_fill_done(db, tx);
474 
475 		offset += tocpy;
476 		size -= tocpy;
477 		buf = (char *)buf + tocpy;
478 	}
479 	dmu_buf_rele_array(dbp, numbufs, FTAG);
480 }
481 
482 #ifdef _KERNEL
483 int
484 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
485 {
486 	dmu_buf_t **dbp;
487 	int numbufs, i, err;
488 
489 	/*
490 	 * NB: we could do this block-at-a-time, but it's nice
491 	 * to be reading in parallel.
492 	 */
493 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
494 	    &numbufs, &dbp);
495 	if (err)
496 		return (err);
497 
498 	for (i = 0; i < numbufs; i++) {
499 		int tocpy;
500 		int bufoff;
501 		dmu_buf_t *db = dbp[i];
502 
503 		ASSERT(size > 0);
504 
505 		bufoff = uio->uio_loffset - db->db_offset;
506 		tocpy = (int)MIN(db->db_size - bufoff, size);
507 
508 		err = uiomove((char *)db->db_data + bufoff, tocpy,
509 		    UIO_READ, uio);
510 		if (err)
511 			break;
512 
513 		size -= tocpy;
514 	}
515 	dmu_buf_rele_array(dbp, numbufs, FTAG);
516 
517 	return (err);
518 }
519 
520 int
521 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
522     dmu_tx_t *tx)
523 {
524 	dmu_buf_t **dbp;
525 	int numbufs, i;
526 	int err = 0;
527 
528 	if (size == 0)
529 		return (0);
530 
531 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
532 	    FALSE, FTAG, &numbufs, &dbp);
533 	if (err)
534 		return (err);
535 
536 	for (i = 0; i < numbufs; i++) {
537 		int tocpy;
538 		int bufoff;
539 		dmu_buf_t *db = dbp[i];
540 
541 		ASSERT(size > 0);
542 
543 		bufoff = uio->uio_loffset - db->db_offset;
544 		tocpy = (int)MIN(db->db_size - bufoff, size);
545 
546 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
547 
548 		if (tocpy == db->db_size)
549 			dmu_buf_will_fill(db, tx);
550 		else
551 			dmu_buf_will_dirty(db, tx);
552 
553 		/*
554 		 * XXX uiomove could block forever (eg. nfs-backed
555 		 * pages).  There needs to be a uiolockdown() function
556 		 * to lock the pages in memory, so that uiomove won't
557 		 * block.
558 		 */
559 		err = uiomove((char *)db->db_data + bufoff, tocpy,
560 		    UIO_WRITE, uio);
561 
562 		if (tocpy == db->db_size)
563 			dmu_buf_fill_done(db, tx);
564 
565 		if (err)
566 			break;
567 
568 		size -= tocpy;
569 	}
570 	dmu_buf_rele_array(dbp, numbufs, FTAG);
571 	return (err);
572 }
573 
574 int
575 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
576     page_t *pp, dmu_tx_t *tx)
577 {
578 	dmu_buf_t **dbp;
579 	int numbufs, i;
580 	int err;
581 
582 	if (size == 0)
583 		return (0);
584 
585 	err = dmu_buf_hold_array(os, object, offset, size,
586 	    FALSE, FTAG, &numbufs, &dbp);
587 	if (err)
588 		return (err);
589 
590 	for (i = 0; i < numbufs; i++) {
591 		int tocpy, copied, thiscpy;
592 		int bufoff;
593 		dmu_buf_t *db = dbp[i];
594 		caddr_t va;
595 
596 		ASSERT(size > 0);
597 		ASSERT3U(db->db_size, >=, PAGESIZE);
598 
599 		bufoff = offset - db->db_offset;
600 		tocpy = (int)MIN(db->db_size - bufoff, size);
601 
602 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
603 
604 		if (tocpy == db->db_size)
605 			dmu_buf_will_fill(db, tx);
606 		else
607 			dmu_buf_will_dirty(db, tx);
608 
609 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
610 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
611 			thiscpy = MIN(PAGESIZE, tocpy - copied);
612 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
613 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
614 			ppmapout(va);
615 			pp = pp->p_next;
616 			bufoff += PAGESIZE;
617 		}
618 
619 		if (tocpy == db->db_size)
620 			dmu_buf_fill_done(db, tx);
621 
622 		if (err)
623 			break;
624 
625 		offset += tocpy;
626 		size -= tocpy;
627 	}
628 	dmu_buf_rele_array(dbp, numbufs, FTAG);
629 	return (err);
630 }
631 #endif
632 
633 typedef struct {
634 	dbuf_dirty_record_t	*dr;
635 	dmu_sync_cb_t		*done;
636 	void			*arg;
637 } dmu_sync_arg_t;
638 
639 /* ARGSUSED */
640 static void
641 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
642 {
643 	dmu_sync_arg_t *in = varg;
644 	dbuf_dirty_record_t *dr = in->dr;
645 	dmu_buf_impl_t *db = dr->dr_dbuf;
646 	dmu_sync_cb_t *done = in->done;
647 
648 	if (!BP_IS_HOLE(zio->io_bp)) {
649 		zio->io_bp->blk_fill = 1;
650 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
651 		BP_SET_LEVEL(zio->io_bp, 0);
652 	}
653 
654 	mutex_enter(&db->db_mtx);
655 	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
656 	dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
657 	dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
658 	cv_broadcast(&db->db_changed);
659 	mutex_exit(&db->db_mtx);
660 
661 	if (done)
662 		done(&(db->db), in->arg);
663 
664 	kmem_free(in, sizeof (dmu_sync_arg_t));
665 }
666 
667 /*
668  * Intent log support: sync the block associated with db to disk.
669  * N.B. and XXX: the caller is responsible for making sure that the
670  * data isn't changing while dmu_sync() is writing it.
671  *
672  * Return values:
673  *
674  *	EEXIST: this txg has already been synced, so there's nothing to to.
675  *		The caller should not log the write.
676  *
677  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
678  *		The caller should not log the write.
679  *
680  *	EALREADY: this block is already in the process of being synced.
681  *		The caller should track its progress (somehow).
682  *
683  *	EINPROGRESS: the IO has been initiated.
684  *		The caller should log this blkptr in the callback.
685  *
686  *	0: completed.  Sets *bp to the blkptr just written.
687  *		The caller should log this blkptr immediately.
688  */
689 int
690 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
691     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
692 {
693 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
694 	objset_impl_t *os = db->db_objset;
695 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
696 	tx_state_t *tx = &dp->dp_tx;
697 	dbuf_dirty_record_t *dr;
698 	dmu_sync_arg_t *in;
699 	zbookmark_t zb;
700 	zio_t *zio;
701 	int zio_flags;
702 	int err;
703 
704 	ASSERT(BP_IS_HOLE(bp));
705 	ASSERT(txg != 0);
706 
707 
708 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
709 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
710 
711 	/*
712 	 * XXX - would be nice if we could do this without suspending...
713 	 */
714 	txg_suspend(dp);
715 
716 	/*
717 	 * If this txg already synced, there's nothing to do.
718 	 */
719 	if (txg <= tx->tx_synced_txg) {
720 		txg_resume(dp);
721 		/*
722 		 * If we're running ziltest, we need the blkptr regardless.
723 		 */
724 		if (txg > spa_freeze_txg(dp->dp_spa)) {
725 			/* if db_blkptr == NULL, this was an empty write */
726 			if (db->db_blkptr)
727 				*bp = *db->db_blkptr; /* structure assignment */
728 			return (0);
729 		}
730 		return (EEXIST);
731 	}
732 
733 	mutex_enter(&db->db_mtx);
734 
735 	if (txg == tx->tx_syncing_txg) {
736 		while (db->db_data_pending) {
737 			/*
738 			 * IO is in-progress.  Wait for it to finish.
739 			 * XXX - would be nice to be able to somehow "attach"
740 			 * this zio to the parent zio passed in.
741 			 */
742 			cv_wait(&db->db_changed, &db->db_mtx);
743 			if (!db->db_data_pending &&
744 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
745 				/*
746 				 * IO was compressed away
747 				 */
748 				*bp = *db->db_blkptr; /* structure assignment */
749 				mutex_exit(&db->db_mtx);
750 				txg_resume(dp);
751 				return (0);
752 			}
753 			ASSERT(db->db_data_pending ||
754 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
755 		}
756 
757 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
758 			/*
759 			 * IO is already completed.
760 			 */
761 			*bp = *db->db_blkptr; /* structure assignment */
762 			mutex_exit(&db->db_mtx);
763 			txg_resume(dp);
764 			return (0);
765 		}
766 	}
767 
768 	dr = db->db_last_dirty;
769 	while (dr && dr->dr_txg > txg)
770 		dr = dr->dr_next;
771 	if (dr == NULL || dr->dr_txg < txg) {
772 		/*
773 		 * This dbuf isn't dirty, must have been free_range'd.
774 		 * There's no need to log writes to freed blocks, so we're done.
775 		 */
776 		mutex_exit(&db->db_mtx);
777 		txg_resume(dp);
778 		return (ENOENT);
779 	}
780 
781 	ASSERT(dr->dr_txg == txg);
782 	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
783 		/*
784 		 * We have already issued a sync write for this buffer.
785 		 */
786 		mutex_exit(&db->db_mtx);
787 		txg_resume(dp);
788 		return (EALREADY);
789 	} else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
790 		/*
791 		 * This buffer has already been synced.  It could not
792 		 * have been dirtied since, or we would have cleared the state.
793 		 */
794 		*bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
795 		mutex_exit(&db->db_mtx);
796 		txg_resume(dp);
797 		return (0);
798 	}
799 
800 	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
801 	in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
802 	in->dr = dr;
803 	in->done = done;
804 	in->arg = arg;
805 	mutex_exit(&db->db_mtx);
806 	txg_resume(dp);
807 
808 	zb.zb_objset = os->os_dsl_dataset->ds_object;
809 	zb.zb_object = db->db.db_object;
810 	zb.zb_level = db->db_level;
811 	zb.zb_blkid = db->db_blkid;
812 	zio_flags = ZIO_FLAG_MUSTSUCCEED;
813 	if (dmu_ot[db->db_dnode->dn_type].ot_metadata || zb.zb_level != 0)
814 		zio_flags |= ZIO_FLAG_METADATA;
815 	zio = arc_write(pio, os->os_spa,
816 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
817 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
818 	    dmu_get_replication_level(os, &zb, db->db_dnode->dn_type),
819 	    txg, bp, dr->dt.dl.dr_data, NULL, dmu_sync_done, in,
820 	    ZIO_PRIORITY_SYNC_WRITE, zio_flags, &zb);
821 
822 	if (pio) {
823 		zio_nowait(zio);
824 		err = EINPROGRESS;
825 	} else {
826 		err = zio_wait(zio);
827 		ASSERT(err == 0);
828 	}
829 	return (err);
830 }
831 
832 int
833 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
834 	dmu_tx_t *tx)
835 {
836 	dnode_t *dn;
837 	int err;
838 
839 	err = dnode_hold(os->os, object, FTAG, &dn);
840 	if (err)
841 		return (err);
842 	err = dnode_set_blksz(dn, size, ibs, tx);
843 	dnode_rele(dn, FTAG);
844 	return (err);
845 }
846 
847 void
848 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
849 	dmu_tx_t *tx)
850 {
851 	dnode_t *dn;
852 
853 	/* XXX assumes dnode_hold will not get an i/o error */
854 	(void) dnode_hold(os->os, object, FTAG, &dn);
855 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
856 	dn->dn_checksum = checksum;
857 	dnode_setdirty(dn, tx);
858 	dnode_rele(dn, FTAG);
859 }
860 
861 void
862 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
863 	dmu_tx_t *tx)
864 {
865 	dnode_t *dn;
866 
867 	/* XXX assumes dnode_hold will not get an i/o error */
868 	(void) dnode_hold(os->os, object, FTAG, &dn);
869 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
870 	dn->dn_compress = compress;
871 	dnode_setdirty(dn, tx);
872 	dnode_rele(dn, FTAG);
873 }
874 
875 int
876 dmu_get_replication_level(objset_impl_t *os,
877     zbookmark_t *zb, dmu_object_type_t ot)
878 {
879 	int ncopies = os->os_copies;
880 
881 	/* If it's the mos, it should have max copies set. */
882 	ASSERT(zb->zb_objset != 0 ||
883 	    ncopies == spa_max_replication(os->os_spa));
884 
885 	if (dmu_ot[ot].ot_metadata || zb->zb_level != 0)
886 		ncopies++;
887 	return (MIN(ncopies, spa_max_replication(os->os_spa)));
888 }
889 
890 int
891 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
892 {
893 	dnode_t *dn;
894 	int i, err;
895 
896 	err = dnode_hold(os->os, object, FTAG, &dn);
897 	if (err)
898 		return (err);
899 	/*
900 	 * Sync any current changes before
901 	 * we go trundling through the block pointers.
902 	 */
903 	for (i = 0; i < TXG_SIZE; i++) {
904 		if (list_link_active(&dn->dn_dirty_link[i]))
905 			break;
906 	}
907 	if (i != TXG_SIZE) {
908 		dnode_rele(dn, FTAG);
909 		txg_wait_synced(dmu_objset_pool(os), 0);
910 		err = dnode_hold(os->os, object, FTAG, &dn);
911 		if (err)
912 			return (err);
913 	}
914 
915 	err = dnode_next_offset(dn, hole, off, 1, 1, 0);
916 	dnode_rele(dn, FTAG);
917 
918 	return (err);
919 }
920 
921 void
922 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
923 {
924 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
925 	mutex_enter(&dn->dn_mtx);
926 
927 	doi->doi_data_block_size = dn->dn_datablksz;
928 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
929 	    1ULL << dn->dn_indblkshift : 0;
930 	doi->doi_indirection = dn->dn_nlevels;
931 	doi->doi_checksum = dn->dn_checksum;
932 	doi->doi_compress = dn->dn_compress;
933 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
934 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
935 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
936 	doi->doi_type = dn->dn_type;
937 	doi->doi_bonus_size = dn->dn_bonuslen;
938 	doi->doi_bonus_type = dn->dn_bonustype;
939 
940 	mutex_exit(&dn->dn_mtx);
941 	rw_exit(&dn->dn_struct_rwlock);
942 }
943 
944 /*
945  * Get information on a DMU object.
946  * If doi is NULL, just indicates whether the object exists.
947  */
948 int
949 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
950 {
951 	dnode_t *dn;
952 	int err = dnode_hold(os->os, object, FTAG, &dn);
953 
954 	if (err)
955 		return (err);
956 
957 	if (doi != NULL)
958 		dmu_object_info_from_dnode(dn, doi);
959 
960 	dnode_rele(dn, FTAG);
961 	return (0);
962 }
963 
964 /*
965  * As above, but faster; can be used when you have a held dbuf in hand.
966  */
967 void
968 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
969 {
970 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
971 }
972 
973 /*
974  * Faster still when you only care about the size.
975  * This is specifically optimized for zfs_getattr().
976  */
977 void
978 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
979 {
980 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
981 
982 	*blksize = dn->dn_datablksz;
983 	/* add 1 for dnode space */
984 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
985 	    SPA_MINBLOCKSHIFT) + 1;
986 }
987 
988 void
989 byteswap_uint64_array(void *vbuf, size_t size)
990 {
991 	uint64_t *buf = vbuf;
992 	size_t count = size >> 3;
993 	int i;
994 
995 	ASSERT((size & 7) == 0);
996 
997 	for (i = 0; i < count; i++)
998 		buf[i] = BSWAP_64(buf[i]);
999 }
1000 
1001 void
1002 byteswap_uint32_array(void *vbuf, size_t size)
1003 {
1004 	uint32_t *buf = vbuf;
1005 	size_t count = size >> 2;
1006 	int i;
1007 
1008 	ASSERT((size & 3) == 0);
1009 
1010 	for (i = 0; i < count; i++)
1011 		buf[i] = BSWAP_32(buf[i]);
1012 }
1013 
1014 void
1015 byteswap_uint16_array(void *vbuf, size_t size)
1016 {
1017 	uint16_t *buf = vbuf;
1018 	size_t count = size >> 1;
1019 	int i;
1020 
1021 	ASSERT((size & 1) == 0);
1022 
1023 	for (i = 0; i < count; i++)
1024 		buf[i] = BSWAP_16(buf[i]);
1025 }
1026 
1027 /* ARGSUSED */
1028 void
1029 byteswap_uint8_array(void *vbuf, size_t size)
1030 {
1031 }
1032 
1033 void
1034 dmu_init(void)
1035 {
1036 	dbuf_init();
1037 	dnode_init();
1038 	arc_init();
1039 	l2arc_init();
1040 }
1041 
1042 void
1043 dmu_fini(void)
1044 {
1045 	arc_fini();
1046 	dnode_fini();
1047 	dbuf_fini();
1048 	l2arc_fini();
1049 }
1050