xref: /titanic_50/usr/src/uts/common/fs/zfs/dmu.c (revision 2321aa36382ca9bc1d3f0437d553acc4e342c81b)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 	{	zap_byteswap,		TRUE,	"Pool properties"	},
82 	{	zap_byteswap,		TRUE,	"DSL permissions"	},
83 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
84 	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
85 	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
86 	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
87 };
88 
89 int
90 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
91     void *tag, dmu_buf_t **dbp)
92 {
93 	dnode_t *dn;
94 	uint64_t blkid;
95 	dmu_buf_impl_t *db;
96 	int err;
97 
98 	err = dnode_hold(os->os, object, FTAG, &dn);
99 	if (err)
100 		return (err);
101 	blkid = dbuf_whichblock(dn, offset);
102 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
103 	db = dbuf_hold(dn, blkid, tag);
104 	rw_exit(&dn->dn_struct_rwlock);
105 	if (db == NULL) {
106 		err = EIO;
107 	} else {
108 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
109 		if (err) {
110 			dbuf_rele(db, tag);
111 			db = NULL;
112 		}
113 	}
114 
115 	dnode_rele(dn, FTAG);
116 	*dbp = &db->db;
117 	return (err);
118 }
119 
120 int
121 dmu_bonus_max(void)
122 {
123 	return (DN_MAX_BONUSLEN);
124 }
125 
126 int
127 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
128 {
129 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
130 
131 	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
132 		return (EINVAL);
133 	if (newsize < 0 || newsize > db->db_size)
134 		return (EINVAL);
135 	dnode_setbonuslen(dn, newsize, tx);
136 	return (0);
137 }
138 
139 /*
140  * returns ENOENT, EIO, or 0.
141  */
142 int
143 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
144 {
145 	dnode_t *dn;
146 	dmu_buf_impl_t *db;
147 	int error;
148 
149 	error = dnode_hold(os->os, object, FTAG, &dn);
150 	if (error)
151 		return (error);
152 
153 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
154 	if (dn->dn_bonus == NULL) {
155 		rw_exit(&dn->dn_struct_rwlock);
156 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
157 		if (dn->dn_bonus == NULL)
158 			dbuf_create_bonus(dn);
159 	}
160 	db = dn->dn_bonus;
161 	rw_exit(&dn->dn_struct_rwlock);
162 
163 	/* as long as the bonus buf is held, the dnode will be held */
164 	if (refcount_add(&db->db_holds, tag) == 1)
165 		VERIFY(dnode_add_ref(dn, db));
166 
167 	dnode_rele(dn, FTAG);
168 
169 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
170 
171 	*dbp = &db->db;
172 	return (0);
173 }
174 
175 /*
176  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
177  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
178  * and can induce severe lock contention when writing to several files
179  * whose dnodes are in the same block.
180  */
181 static int
182 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
183     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
184 {
185 	dmu_buf_t **dbp;
186 	uint64_t blkid, nblks, i;
187 	uint32_t flags;
188 	int err;
189 	zio_t *zio;
190 
191 	ASSERT(length <= DMU_MAX_ACCESS);
192 
193 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
194 	if (length > zfetch_array_rd_sz)
195 		flags |= DB_RF_NOPREFETCH;
196 
197 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
198 	if (dn->dn_datablkshift) {
199 		int blkshift = dn->dn_datablkshift;
200 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
201 		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
202 	} else {
203 		if (offset + length > dn->dn_datablksz) {
204 			zfs_panic_recover("zfs: accessing past end of object "
205 			    "%llx/%llx (size=%u access=%llu+%llu)",
206 			    (longlong_t)dn->dn_objset->
207 			    os_dsl_dataset->ds_object,
208 			    (longlong_t)dn->dn_object, dn->dn_datablksz,
209 			    (longlong_t)offset, (longlong_t)length);
210 			return (EIO);
211 		}
212 		nblks = 1;
213 	}
214 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
215 
216 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
217 	blkid = dbuf_whichblock(dn, offset);
218 	for (i = 0; i < nblks; i++) {
219 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
220 		if (db == NULL) {
221 			rw_exit(&dn->dn_struct_rwlock);
222 			dmu_buf_rele_array(dbp, nblks, tag);
223 			zio_nowait(zio);
224 			return (EIO);
225 		}
226 		/* initiate async i/o */
227 		if (read) {
228 			rw_exit(&dn->dn_struct_rwlock);
229 			(void) dbuf_read(db, zio, flags);
230 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
231 		}
232 		dbp[i] = &db->db;
233 	}
234 	rw_exit(&dn->dn_struct_rwlock);
235 
236 	/* wait for async i/o */
237 	err = zio_wait(zio);
238 	if (err) {
239 		dmu_buf_rele_array(dbp, nblks, tag);
240 		return (err);
241 	}
242 
243 	/* wait for other io to complete */
244 	if (read) {
245 		for (i = 0; i < nblks; i++) {
246 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
247 			mutex_enter(&db->db_mtx);
248 			while (db->db_state == DB_READ ||
249 			    db->db_state == DB_FILL)
250 				cv_wait(&db->db_changed, &db->db_mtx);
251 			if (db->db_state == DB_UNCACHED)
252 				err = EIO;
253 			mutex_exit(&db->db_mtx);
254 			if (err) {
255 				dmu_buf_rele_array(dbp, nblks, tag);
256 				return (err);
257 			}
258 		}
259 	}
260 
261 	*numbufsp = nblks;
262 	*dbpp = dbp;
263 	return (0);
264 }
265 
266 static int
267 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
268     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
269 {
270 	dnode_t *dn;
271 	int err;
272 
273 	err = dnode_hold(os->os, object, FTAG, &dn);
274 	if (err)
275 		return (err);
276 
277 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
278 	    numbufsp, dbpp);
279 
280 	dnode_rele(dn, FTAG);
281 
282 	return (err);
283 }
284 
285 int
286 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
287     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
288 {
289 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
290 	int err;
291 
292 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
293 	    numbufsp, dbpp);
294 
295 	return (err);
296 }
297 
298 void
299 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
300 {
301 	int i;
302 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
303 
304 	if (numbufs == 0)
305 		return;
306 
307 	for (i = 0; i < numbufs; i++) {
308 		if (dbp[i])
309 			dbuf_rele(dbp[i], tag);
310 	}
311 
312 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
313 }
314 
315 void
316 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
317 {
318 	dnode_t *dn;
319 	uint64_t blkid;
320 	int nblks, i, err;
321 
322 	if (zfs_prefetch_disable)
323 		return;
324 
325 	if (len == 0) {  /* they're interested in the bonus buffer */
326 		dn = os->os->os_meta_dnode;
327 
328 		if (object == 0 || object >= DN_MAX_OBJECT)
329 			return;
330 
331 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
332 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
333 		dbuf_prefetch(dn, blkid);
334 		rw_exit(&dn->dn_struct_rwlock);
335 		return;
336 	}
337 
338 	/*
339 	 * XXX - Note, if the dnode for the requested object is not
340 	 * already cached, we will do a *synchronous* read in the
341 	 * dnode_hold() call.  The same is true for any indirects.
342 	 */
343 	err = dnode_hold(os->os, object, FTAG, &dn);
344 	if (err != 0)
345 		return;
346 
347 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
348 	if (dn->dn_datablkshift) {
349 		int blkshift = dn->dn_datablkshift;
350 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
351 		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
352 	} else {
353 		nblks = (offset < dn->dn_datablksz);
354 	}
355 
356 	if (nblks != 0) {
357 		blkid = dbuf_whichblock(dn, offset);
358 		for (i = 0; i < nblks; i++)
359 			dbuf_prefetch(dn, blkid+i);
360 	}
361 
362 	rw_exit(&dn->dn_struct_rwlock);
363 
364 	dnode_rele(dn, FTAG);
365 }
366 
367 static int
368 get_next_chunk(dnode_t *dn, uint64_t *offset, uint64_t limit)
369 {
370 	uint64_t len = limit - *offset;
371 	uint64_t chunk_len = dn->dn_datablksz * DMU_MAX_DELETEBLKCNT;
372 	uint64_t dn_used;
373 	int err;
374 
375 	ASSERT(limit <= *offset);
376 
377 	dn_used = dn->dn_phys->dn_used <<
378 	    (dn->dn_phys->dn_flags & DNODE_FLAG_USED_BYTES ? 0 : DEV_BSHIFT);
379 	if (len <= chunk_len || dn_used <= chunk_len) {
380 		*offset = limit;
381 		return (0);
382 	}
383 
384 	while (*offset > limit) {
385 		uint64_t initial_offset = *offset;
386 		uint64_t delta;
387 
388 		/* skip over allocated data */
389 		err = dnode_next_offset(dn,
390 		    DNODE_FIND_HOLE|DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
391 		if (err == ESRCH)
392 			*offset = limit;
393 		else if (err)
394 			return (err);
395 
396 		ASSERT3U(*offset, <=, initial_offset);
397 		delta = initial_offset - *offset;
398 		if (delta >= chunk_len) {
399 			*offset += delta - chunk_len;
400 			return (0);
401 		}
402 		chunk_len -= delta;
403 
404 		/* skip over unallocated data */
405 		err = dnode_next_offset(dn,
406 		    DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
407 		if (err == ESRCH)
408 			*offset = limit;
409 		else if (err)
410 			return (err);
411 
412 		if (*offset < limit)
413 			*offset = limit;
414 		ASSERT3U(*offset, <, initial_offset);
415 	}
416 	return (0);
417 }
418 
419 static int
420 dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
421     uint64_t length, boolean_t free_dnode)
422 {
423 	dmu_tx_t *tx;
424 	uint64_t object_size, start, end, len;
425 	boolean_t trunc = (length == DMU_OBJECT_END);
426 	int align, err;
427 
428 	align = 1 << dn->dn_datablkshift;
429 	ASSERT(align > 0);
430 	object_size = align == 1 ? dn->dn_datablksz :
431 	    (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
432 
433 	if (trunc || (end = offset + length) > object_size)
434 		end = object_size;
435 	if (end <= offset)
436 		return (0);
437 	length = end - offset;
438 
439 	while (length) {
440 		start = end;
441 		err = get_next_chunk(dn, &start, offset);
442 		if (err)
443 			return (err);
444 		len = trunc ? DMU_OBJECT_END : end - start;
445 
446 		tx = dmu_tx_create(os);
447 		dmu_tx_hold_free(tx, dn->dn_object, start, len);
448 		err = dmu_tx_assign(tx, TXG_WAIT);
449 		if (err) {
450 			dmu_tx_abort(tx);
451 			return (err);
452 		}
453 
454 		dnode_free_range(dn, start, trunc ? -1 : len, tx);
455 
456 		if (start == 0 && trunc && free_dnode)
457 			dnode_free(dn, tx);
458 
459 		length -= end - start;
460 
461 		dmu_tx_commit(tx);
462 		end = start;
463 		trunc = FALSE;
464 	}
465 	return (0);
466 }
467 
468 int
469 dmu_free_long_range(objset_t *os, uint64_t object,
470     uint64_t offset, uint64_t length)
471 {
472 	dnode_t *dn;
473 	int err;
474 
475 	err = dnode_hold(os->os, object, FTAG, &dn);
476 	if (err != 0)
477 		return (err);
478 	err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
479 	dnode_rele(dn, FTAG);
480 	return (err);
481 }
482 
483 int
484 dmu_free_object(objset_t *os, uint64_t object)
485 {
486 	dnode_t *dn;
487 	dmu_tx_t *tx;
488 	int err;
489 
490 	err = dnode_hold_impl(os->os, object, DNODE_MUST_BE_ALLOCATED,
491 	    FTAG, &dn);
492 	if (err != 0)
493 		return (err);
494 	if (dn->dn_nlevels == 1) {
495 		tx = dmu_tx_create(os);
496 		dmu_tx_hold_bonus(tx, object);
497 		dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
498 		err = dmu_tx_assign(tx, TXG_WAIT);
499 		if (err == 0) {
500 			dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
501 			dnode_free(dn, tx);
502 			dmu_tx_commit(tx);
503 		} else {
504 			dmu_tx_abort(tx);
505 		}
506 	} else {
507 		err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
508 	}
509 	dnode_rele(dn, FTAG);
510 	return (err);
511 }
512 
513 int
514 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
515     uint64_t size, dmu_tx_t *tx)
516 {
517 	dnode_t *dn;
518 	int err = dnode_hold(os->os, object, FTAG, &dn);
519 	if (err)
520 		return (err);
521 	ASSERT(offset < UINT64_MAX);
522 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
523 	dnode_free_range(dn, offset, size, tx);
524 	dnode_rele(dn, FTAG);
525 	return (0);
526 }
527 
528 int
529 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
530     void *buf)
531 {
532 	dnode_t *dn;
533 	dmu_buf_t **dbp;
534 	int numbufs, i, err;
535 
536 	err = dnode_hold(os->os, object, FTAG, &dn);
537 	if (err)
538 		return (err);
539 
540 	/*
541 	 * Deal with odd block sizes, where there can't be data past the first
542 	 * block.  If we ever do the tail block optimization, we will need to
543 	 * handle that here as well.
544 	 */
545 	if (dn->dn_datablkshift == 0) {
546 		int newsz = offset > dn->dn_datablksz ? 0 :
547 		    MIN(size, dn->dn_datablksz - offset);
548 		bzero((char *)buf + newsz, size - newsz);
549 		size = newsz;
550 	}
551 
552 	while (size > 0) {
553 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
554 
555 		/*
556 		 * NB: we could do this block-at-a-time, but it's nice
557 		 * to be reading in parallel.
558 		 */
559 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
560 		    TRUE, FTAG, &numbufs, &dbp);
561 		if (err)
562 			break;
563 
564 		for (i = 0; i < numbufs; i++) {
565 			int tocpy;
566 			int bufoff;
567 			dmu_buf_t *db = dbp[i];
568 
569 			ASSERT(size > 0);
570 
571 			bufoff = offset - db->db_offset;
572 			tocpy = (int)MIN(db->db_size - bufoff, size);
573 
574 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
575 
576 			offset += tocpy;
577 			size -= tocpy;
578 			buf = (char *)buf + tocpy;
579 		}
580 		dmu_buf_rele_array(dbp, numbufs, FTAG);
581 	}
582 	dnode_rele(dn, FTAG);
583 	return (err);
584 }
585 
586 void
587 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
588     const void *buf, dmu_tx_t *tx)
589 {
590 	dmu_buf_t **dbp;
591 	int numbufs, i;
592 
593 	if (size == 0)
594 		return;
595 
596 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
597 	    FALSE, FTAG, &numbufs, &dbp));
598 
599 	for (i = 0; i < numbufs; i++) {
600 		int tocpy;
601 		int bufoff;
602 		dmu_buf_t *db = dbp[i];
603 
604 		ASSERT(size > 0);
605 
606 		bufoff = offset - db->db_offset;
607 		tocpy = (int)MIN(db->db_size - bufoff, size);
608 
609 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
610 
611 		if (tocpy == db->db_size)
612 			dmu_buf_will_fill(db, tx);
613 		else
614 			dmu_buf_will_dirty(db, tx);
615 
616 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
617 
618 		if (tocpy == db->db_size)
619 			dmu_buf_fill_done(db, tx);
620 
621 		offset += tocpy;
622 		size -= tocpy;
623 		buf = (char *)buf + tocpy;
624 	}
625 	dmu_buf_rele_array(dbp, numbufs, FTAG);
626 }
627 
628 #ifdef _KERNEL
629 int
630 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
631 {
632 	dmu_buf_t **dbp;
633 	int numbufs, i, err;
634 
635 	/*
636 	 * NB: we could do this block-at-a-time, but it's nice
637 	 * to be reading in parallel.
638 	 */
639 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
640 	    &numbufs, &dbp);
641 	if (err)
642 		return (err);
643 
644 	for (i = 0; i < numbufs; i++) {
645 		int tocpy;
646 		int bufoff;
647 		dmu_buf_t *db = dbp[i];
648 
649 		ASSERT(size > 0);
650 
651 		bufoff = uio->uio_loffset - db->db_offset;
652 		tocpy = (int)MIN(db->db_size - bufoff, size);
653 
654 		err = uiomove((char *)db->db_data + bufoff, tocpy,
655 		    UIO_READ, uio);
656 		if (err)
657 			break;
658 
659 		size -= tocpy;
660 	}
661 	dmu_buf_rele_array(dbp, numbufs, FTAG);
662 
663 	return (err);
664 }
665 
666 int
667 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
668     dmu_tx_t *tx)
669 {
670 	dmu_buf_t **dbp;
671 	int numbufs, i;
672 	int err = 0;
673 
674 	if (size == 0)
675 		return (0);
676 
677 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
678 	    FALSE, FTAG, &numbufs, &dbp);
679 	if (err)
680 		return (err);
681 
682 	for (i = 0; i < numbufs; i++) {
683 		int tocpy;
684 		int bufoff;
685 		dmu_buf_t *db = dbp[i];
686 
687 		ASSERT(size > 0);
688 
689 		bufoff = uio->uio_loffset - db->db_offset;
690 		tocpy = (int)MIN(db->db_size - bufoff, size);
691 
692 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
693 
694 		if (tocpy == db->db_size)
695 			dmu_buf_will_fill(db, tx);
696 		else
697 			dmu_buf_will_dirty(db, tx);
698 
699 		/*
700 		 * XXX uiomove could block forever (eg. nfs-backed
701 		 * pages).  There needs to be a uiolockdown() function
702 		 * to lock the pages in memory, so that uiomove won't
703 		 * block.
704 		 */
705 		err = uiomove((char *)db->db_data + bufoff, tocpy,
706 		    UIO_WRITE, uio);
707 
708 		if (tocpy == db->db_size)
709 			dmu_buf_fill_done(db, tx);
710 
711 		if (err)
712 			break;
713 
714 		size -= tocpy;
715 	}
716 	dmu_buf_rele_array(dbp, numbufs, FTAG);
717 	return (err);
718 }
719 
720 int
721 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
722     page_t *pp, dmu_tx_t *tx)
723 {
724 	dmu_buf_t **dbp;
725 	int numbufs, i;
726 	int err;
727 
728 	if (size == 0)
729 		return (0);
730 
731 	err = dmu_buf_hold_array(os, object, offset, size,
732 	    FALSE, FTAG, &numbufs, &dbp);
733 	if (err)
734 		return (err);
735 
736 	for (i = 0; i < numbufs; i++) {
737 		int tocpy, copied, thiscpy;
738 		int bufoff;
739 		dmu_buf_t *db = dbp[i];
740 		caddr_t va;
741 
742 		ASSERT(size > 0);
743 		ASSERT3U(db->db_size, >=, PAGESIZE);
744 
745 		bufoff = offset - db->db_offset;
746 		tocpy = (int)MIN(db->db_size - bufoff, size);
747 
748 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
749 
750 		if (tocpy == db->db_size)
751 			dmu_buf_will_fill(db, tx);
752 		else
753 			dmu_buf_will_dirty(db, tx);
754 
755 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
756 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
757 			thiscpy = MIN(PAGESIZE, tocpy - copied);
758 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
759 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
760 			ppmapout(va);
761 			pp = pp->p_next;
762 			bufoff += PAGESIZE;
763 		}
764 
765 		if (tocpy == db->db_size)
766 			dmu_buf_fill_done(db, tx);
767 
768 		if (err)
769 			break;
770 
771 		offset += tocpy;
772 		size -= tocpy;
773 	}
774 	dmu_buf_rele_array(dbp, numbufs, FTAG);
775 	return (err);
776 }
777 #endif
778 
779 typedef struct {
780 	dbuf_dirty_record_t	*dr;
781 	dmu_sync_cb_t		*done;
782 	void			*arg;
783 } dmu_sync_arg_t;
784 
785 /* ARGSUSED */
786 static void
787 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
788 {
789 	dmu_sync_arg_t *in = varg;
790 	dbuf_dirty_record_t *dr = in->dr;
791 	dmu_buf_impl_t *db = dr->dr_dbuf;
792 	dmu_sync_cb_t *done = in->done;
793 
794 	if (!BP_IS_HOLE(zio->io_bp)) {
795 		zio->io_bp->blk_fill = 1;
796 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
797 		BP_SET_LEVEL(zio->io_bp, 0);
798 	}
799 
800 	mutex_enter(&db->db_mtx);
801 	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
802 	dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
803 	dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
804 	cv_broadcast(&db->db_changed);
805 	mutex_exit(&db->db_mtx);
806 
807 	if (done)
808 		done(&(db->db), in->arg);
809 
810 	kmem_free(in, sizeof (dmu_sync_arg_t));
811 }
812 
813 /*
814  * Intent log support: sync the block associated with db to disk.
815  * N.B. and XXX: the caller is responsible for making sure that the
816  * data isn't changing while dmu_sync() is writing it.
817  *
818  * Return values:
819  *
820  *	EEXIST: this txg has already been synced, so there's nothing to to.
821  *		The caller should not log the write.
822  *
823  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
824  *		The caller should not log the write.
825  *
826  *	EALREADY: this block is already in the process of being synced.
827  *		The caller should track its progress (somehow).
828  *
829  *	EINPROGRESS: the IO has been initiated.
830  *		The caller should log this blkptr in the callback.
831  *
832  *	0: completed.  Sets *bp to the blkptr just written.
833  *		The caller should log this blkptr immediately.
834  */
835 int
836 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
837     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
838 {
839 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
840 	objset_impl_t *os = db->db_objset;
841 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
842 	tx_state_t *tx = &dp->dp_tx;
843 	dbuf_dirty_record_t *dr;
844 	dmu_sync_arg_t *in;
845 	zbookmark_t zb;
846 	zio_t *zio;
847 	int zio_flags;
848 	int err;
849 
850 	ASSERT(BP_IS_HOLE(bp));
851 	ASSERT(txg != 0);
852 
853 
854 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
855 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
856 
857 	/*
858 	 * XXX - would be nice if we could do this without suspending...
859 	 */
860 	txg_suspend(dp);
861 
862 	/*
863 	 * If this txg already synced, there's nothing to do.
864 	 */
865 	if (txg <= tx->tx_synced_txg) {
866 		txg_resume(dp);
867 		/*
868 		 * If we're running ziltest, we need the blkptr regardless.
869 		 */
870 		if (txg > spa_freeze_txg(dp->dp_spa)) {
871 			/* if db_blkptr == NULL, this was an empty write */
872 			if (db->db_blkptr)
873 				*bp = *db->db_blkptr; /* structure assignment */
874 			return (0);
875 		}
876 		return (EEXIST);
877 	}
878 
879 	mutex_enter(&db->db_mtx);
880 
881 	if (txg == tx->tx_syncing_txg) {
882 		while (db->db_data_pending) {
883 			/*
884 			 * IO is in-progress.  Wait for it to finish.
885 			 * XXX - would be nice to be able to somehow "attach"
886 			 * this zio to the parent zio passed in.
887 			 */
888 			cv_wait(&db->db_changed, &db->db_mtx);
889 			if (!db->db_data_pending &&
890 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
891 				/*
892 				 * IO was compressed away
893 				 */
894 				*bp = *db->db_blkptr; /* structure assignment */
895 				mutex_exit(&db->db_mtx);
896 				txg_resume(dp);
897 				return (0);
898 			}
899 			ASSERT(db->db_data_pending ||
900 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
901 		}
902 
903 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
904 			/*
905 			 * IO is already completed.
906 			 */
907 			*bp = *db->db_blkptr; /* structure assignment */
908 			mutex_exit(&db->db_mtx);
909 			txg_resume(dp);
910 			return (0);
911 		}
912 	}
913 
914 	dr = db->db_last_dirty;
915 	while (dr && dr->dr_txg > txg)
916 		dr = dr->dr_next;
917 	if (dr == NULL || dr->dr_txg < txg) {
918 		/*
919 		 * This dbuf isn't dirty, must have been free_range'd.
920 		 * There's no need to log writes to freed blocks, so we're done.
921 		 */
922 		mutex_exit(&db->db_mtx);
923 		txg_resume(dp);
924 		return (ENOENT);
925 	}
926 
927 	ASSERT(dr->dr_txg == txg);
928 	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
929 		/*
930 		 * We have already issued a sync write for this buffer.
931 		 */
932 		mutex_exit(&db->db_mtx);
933 		txg_resume(dp);
934 		return (EALREADY);
935 	} else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
936 		/*
937 		 * This buffer has already been synced.  It could not
938 		 * have been dirtied since, or we would have cleared the state.
939 		 */
940 		*bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
941 		mutex_exit(&db->db_mtx);
942 		txg_resume(dp);
943 		return (0);
944 	}
945 
946 	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
947 	in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
948 	in->dr = dr;
949 	in->done = done;
950 	in->arg = arg;
951 	mutex_exit(&db->db_mtx);
952 	txg_resume(dp);
953 
954 	zb.zb_objset = os->os_dsl_dataset->ds_object;
955 	zb.zb_object = db->db.db_object;
956 	zb.zb_level = db->db_level;
957 	zb.zb_blkid = db->db_blkid;
958 	zio_flags = ZIO_FLAG_MUSTSUCCEED;
959 	if (dmu_ot[db->db_dnode->dn_type].ot_metadata || zb.zb_level != 0)
960 		zio_flags |= ZIO_FLAG_METADATA;
961 	zio = arc_write(pio, os->os_spa,
962 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
963 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
964 	    dmu_get_replication_level(os, &zb, db->db_dnode->dn_type),
965 	    txg, bp, dr->dt.dl.dr_data, NULL, dmu_sync_done, in,
966 	    ZIO_PRIORITY_SYNC_WRITE, zio_flags, &zb);
967 
968 	if (pio) {
969 		zio_nowait(zio);
970 		err = EINPROGRESS;
971 	} else {
972 		err = zio_wait(zio);
973 		ASSERT(err == 0);
974 	}
975 	return (err);
976 }
977 
978 int
979 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
980 	dmu_tx_t *tx)
981 {
982 	dnode_t *dn;
983 	int err;
984 
985 	err = dnode_hold(os->os, object, FTAG, &dn);
986 	if (err)
987 		return (err);
988 	err = dnode_set_blksz(dn, size, ibs, tx);
989 	dnode_rele(dn, FTAG);
990 	return (err);
991 }
992 
993 void
994 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
995 	dmu_tx_t *tx)
996 {
997 	dnode_t *dn;
998 
999 	/* XXX assumes dnode_hold will not get an i/o error */
1000 	(void) dnode_hold(os->os, object, FTAG, &dn);
1001 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1002 	dn->dn_checksum = checksum;
1003 	dnode_setdirty(dn, tx);
1004 	dnode_rele(dn, FTAG);
1005 }
1006 
1007 void
1008 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1009 	dmu_tx_t *tx)
1010 {
1011 	dnode_t *dn;
1012 
1013 	/* XXX assumes dnode_hold will not get an i/o error */
1014 	(void) dnode_hold(os->os, object, FTAG, &dn);
1015 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1016 	dn->dn_compress = compress;
1017 	dnode_setdirty(dn, tx);
1018 	dnode_rele(dn, FTAG);
1019 }
1020 
1021 int
1022 dmu_get_replication_level(objset_impl_t *os,
1023     zbookmark_t *zb, dmu_object_type_t ot)
1024 {
1025 	int ncopies = os->os_copies;
1026 
1027 	/* If it's the mos, it should have max copies set. */
1028 	ASSERT(zb->zb_objset != 0 ||
1029 	    ncopies == spa_max_replication(os->os_spa));
1030 
1031 	if (dmu_ot[ot].ot_metadata || zb->zb_level != 0)
1032 		ncopies++;
1033 	return (MIN(ncopies, spa_max_replication(os->os_spa)));
1034 }
1035 
1036 int
1037 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1038 {
1039 	dnode_t *dn;
1040 	int i, err;
1041 
1042 	err = dnode_hold(os->os, object, FTAG, &dn);
1043 	if (err)
1044 		return (err);
1045 	/*
1046 	 * Sync any current changes before
1047 	 * we go trundling through the block pointers.
1048 	 */
1049 	for (i = 0; i < TXG_SIZE; i++) {
1050 		if (list_link_active(&dn->dn_dirty_link[i]))
1051 			break;
1052 	}
1053 	if (i != TXG_SIZE) {
1054 		dnode_rele(dn, FTAG);
1055 		txg_wait_synced(dmu_objset_pool(os), 0);
1056 		err = dnode_hold(os->os, object, FTAG, &dn);
1057 		if (err)
1058 			return (err);
1059 	}
1060 
1061 	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
1062 	dnode_rele(dn, FTAG);
1063 
1064 	return (err);
1065 }
1066 
1067 void
1068 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1069 {
1070 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1071 	mutex_enter(&dn->dn_mtx);
1072 
1073 	doi->doi_data_block_size = dn->dn_datablksz;
1074 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1075 	    1ULL << dn->dn_indblkshift : 0;
1076 	doi->doi_indirection = dn->dn_nlevels;
1077 	doi->doi_checksum = dn->dn_checksum;
1078 	doi->doi_compress = dn->dn_compress;
1079 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
1080 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
1081 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1082 	doi->doi_type = dn->dn_type;
1083 	doi->doi_bonus_size = dn->dn_bonuslen;
1084 	doi->doi_bonus_type = dn->dn_bonustype;
1085 
1086 	mutex_exit(&dn->dn_mtx);
1087 	rw_exit(&dn->dn_struct_rwlock);
1088 }
1089 
1090 /*
1091  * Get information on a DMU object.
1092  * If doi is NULL, just indicates whether the object exists.
1093  */
1094 int
1095 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1096 {
1097 	dnode_t *dn;
1098 	int err = dnode_hold(os->os, object, FTAG, &dn);
1099 
1100 	if (err)
1101 		return (err);
1102 
1103 	if (doi != NULL)
1104 		dmu_object_info_from_dnode(dn, doi);
1105 
1106 	dnode_rele(dn, FTAG);
1107 	return (0);
1108 }
1109 
1110 /*
1111  * As above, but faster; can be used when you have a held dbuf in hand.
1112  */
1113 void
1114 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1115 {
1116 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1117 }
1118 
1119 /*
1120  * Faster still when you only care about the size.
1121  * This is specifically optimized for zfs_getattr().
1122  */
1123 void
1124 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1125 {
1126 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1127 
1128 	*blksize = dn->dn_datablksz;
1129 	/* add 1 for dnode space */
1130 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1131 	    SPA_MINBLOCKSHIFT) + 1;
1132 }
1133 
1134 void
1135 byteswap_uint64_array(void *vbuf, size_t size)
1136 {
1137 	uint64_t *buf = vbuf;
1138 	size_t count = size >> 3;
1139 	int i;
1140 
1141 	ASSERT((size & 7) == 0);
1142 
1143 	for (i = 0; i < count; i++)
1144 		buf[i] = BSWAP_64(buf[i]);
1145 }
1146 
1147 void
1148 byteswap_uint32_array(void *vbuf, size_t size)
1149 {
1150 	uint32_t *buf = vbuf;
1151 	size_t count = size >> 2;
1152 	int i;
1153 
1154 	ASSERT((size & 3) == 0);
1155 
1156 	for (i = 0; i < count; i++)
1157 		buf[i] = BSWAP_32(buf[i]);
1158 }
1159 
1160 void
1161 byteswap_uint16_array(void *vbuf, size_t size)
1162 {
1163 	uint16_t *buf = vbuf;
1164 	size_t count = size >> 1;
1165 	int i;
1166 
1167 	ASSERT((size & 1) == 0);
1168 
1169 	for (i = 0; i < count; i++)
1170 		buf[i] = BSWAP_16(buf[i]);
1171 }
1172 
1173 /* ARGSUSED */
1174 void
1175 byteswap_uint8_array(void *vbuf, size_t size)
1176 {
1177 }
1178 
1179 void
1180 dmu_init(void)
1181 {
1182 	dbuf_init();
1183 	dnode_init();
1184 	arc_init();
1185 	l2arc_init();
1186 }
1187 
1188 void
1189 dmu_fini(void)
1190 {
1191 	arc_fini();
1192 	dnode_fini();
1193 	dbuf_fini();
1194 	l2arc_fini();
1195 }
1196