xref: /illumos-gate/usr/src/uts/common/fs/zfs/dmu.c (revision 2e83744e07e0937d9ade0801c0a4d8316ac3071e)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 	{	zap_byteswap,		TRUE,	"Pool properties"	},
82 	{	zap_byteswap,		TRUE,	"DSL permissions"	},
83 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
84 	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
85 	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
86 	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
87 	{	zap_byteswap,		TRUE,	"DSL dataset next clones"},
88 	{	zap_byteswap,		TRUE,	"scrub work queue"	},
89 };
90 
91 int
92 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
93     void *tag, dmu_buf_t **dbp)
94 {
95 	dnode_t *dn;
96 	uint64_t blkid;
97 	dmu_buf_impl_t *db;
98 	int err;
99 
100 	err = dnode_hold(os->os, object, FTAG, &dn);
101 	if (err)
102 		return (err);
103 	blkid = dbuf_whichblock(dn, offset);
104 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
105 	db = dbuf_hold(dn, blkid, tag);
106 	rw_exit(&dn->dn_struct_rwlock);
107 	if (db == NULL) {
108 		err = EIO;
109 	} else {
110 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
111 		if (err) {
112 			dbuf_rele(db, tag);
113 			db = NULL;
114 		}
115 	}
116 
117 	dnode_rele(dn, FTAG);
118 	*dbp = &db->db;
119 	return (err);
120 }
121 
122 int
123 dmu_bonus_max(void)
124 {
125 	return (DN_MAX_BONUSLEN);
126 }
127 
128 int
129 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
130 {
131 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
132 
133 	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
134 		return (EINVAL);
135 	if (newsize < 0 || newsize > db->db_size)
136 		return (EINVAL);
137 	dnode_setbonuslen(dn, newsize, tx);
138 	return (0);
139 }
140 
141 /*
142  * returns ENOENT, EIO, or 0.
143  */
144 int
145 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
146 {
147 	dnode_t *dn;
148 	dmu_buf_impl_t *db;
149 	int error;
150 
151 	error = dnode_hold(os->os, object, FTAG, &dn);
152 	if (error)
153 		return (error);
154 
155 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
156 	if (dn->dn_bonus == NULL) {
157 		rw_exit(&dn->dn_struct_rwlock);
158 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
159 		if (dn->dn_bonus == NULL)
160 			dbuf_create_bonus(dn);
161 	}
162 	db = dn->dn_bonus;
163 	rw_exit(&dn->dn_struct_rwlock);
164 
165 	/* as long as the bonus buf is held, the dnode will be held */
166 	if (refcount_add(&db->db_holds, tag) == 1)
167 		VERIFY(dnode_add_ref(dn, db));
168 
169 	dnode_rele(dn, FTAG);
170 
171 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
172 
173 	*dbp = &db->db;
174 	return (0);
175 }
176 
177 /*
178  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
179  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
180  * and can induce severe lock contention when writing to several files
181  * whose dnodes are in the same block.
182  */
183 static int
184 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
185     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
186 {
187 	dmu_buf_t **dbp;
188 	uint64_t blkid, nblks, i;
189 	uint32_t flags;
190 	int err;
191 	zio_t *zio;
192 
193 	ASSERT(length <= DMU_MAX_ACCESS);
194 
195 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
196 	if (length > zfetch_array_rd_sz)
197 		flags |= DB_RF_NOPREFETCH;
198 
199 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
200 	if (dn->dn_datablkshift) {
201 		int blkshift = dn->dn_datablkshift;
202 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
203 		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
204 	} else {
205 		if (offset + length > dn->dn_datablksz) {
206 			zfs_panic_recover("zfs: accessing past end of object "
207 			    "%llx/%llx (size=%u access=%llu+%llu)",
208 			    (longlong_t)dn->dn_objset->
209 			    os_dsl_dataset->ds_object,
210 			    (longlong_t)dn->dn_object, dn->dn_datablksz,
211 			    (longlong_t)offset, (longlong_t)length);
212 			return (EIO);
213 		}
214 		nblks = 1;
215 	}
216 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
217 
218 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
219 	blkid = dbuf_whichblock(dn, offset);
220 	for (i = 0; i < nblks; i++) {
221 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
222 		if (db == NULL) {
223 			rw_exit(&dn->dn_struct_rwlock);
224 			dmu_buf_rele_array(dbp, nblks, tag);
225 			zio_nowait(zio);
226 			return (EIO);
227 		}
228 		/* initiate async i/o */
229 		if (read) {
230 			rw_exit(&dn->dn_struct_rwlock);
231 			(void) dbuf_read(db, zio, flags);
232 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
233 		}
234 		dbp[i] = &db->db;
235 	}
236 	rw_exit(&dn->dn_struct_rwlock);
237 
238 	/* wait for async i/o */
239 	err = zio_wait(zio);
240 	if (err) {
241 		dmu_buf_rele_array(dbp, nblks, tag);
242 		return (err);
243 	}
244 
245 	/* wait for other io to complete */
246 	if (read) {
247 		for (i = 0; i < nblks; i++) {
248 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
249 			mutex_enter(&db->db_mtx);
250 			while (db->db_state == DB_READ ||
251 			    db->db_state == DB_FILL)
252 				cv_wait(&db->db_changed, &db->db_mtx);
253 			if (db->db_state == DB_UNCACHED)
254 				err = EIO;
255 			mutex_exit(&db->db_mtx);
256 			if (err) {
257 				dmu_buf_rele_array(dbp, nblks, tag);
258 				return (err);
259 			}
260 		}
261 	}
262 
263 	*numbufsp = nblks;
264 	*dbpp = dbp;
265 	return (0);
266 }
267 
268 static int
269 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
270     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
271 {
272 	dnode_t *dn;
273 	int err;
274 
275 	err = dnode_hold(os->os, object, FTAG, &dn);
276 	if (err)
277 		return (err);
278 
279 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
280 	    numbufsp, dbpp);
281 
282 	dnode_rele(dn, FTAG);
283 
284 	return (err);
285 }
286 
287 int
288 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
289     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
290 {
291 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
292 	int err;
293 
294 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
295 	    numbufsp, dbpp);
296 
297 	return (err);
298 }
299 
300 void
301 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
302 {
303 	int i;
304 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
305 
306 	if (numbufs == 0)
307 		return;
308 
309 	for (i = 0; i < numbufs; i++) {
310 		if (dbp[i])
311 			dbuf_rele(dbp[i], tag);
312 	}
313 
314 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
315 }
316 
317 void
318 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
319 {
320 	dnode_t *dn;
321 	uint64_t blkid;
322 	int nblks, i, err;
323 
324 	if (zfs_prefetch_disable)
325 		return;
326 
327 	if (len == 0) {  /* they're interested in the bonus buffer */
328 		dn = os->os->os_meta_dnode;
329 
330 		if (object == 0 || object >= DN_MAX_OBJECT)
331 			return;
332 
333 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
334 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
335 		dbuf_prefetch(dn, blkid);
336 		rw_exit(&dn->dn_struct_rwlock);
337 		return;
338 	}
339 
340 	/*
341 	 * XXX - Note, if the dnode for the requested object is not
342 	 * already cached, we will do a *synchronous* read in the
343 	 * dnode_hold() call.  The same is true for any indirects.
344 	 */
345 	err = dnode_hold(os->os, object, FTAG, &dn);
346 	if (err != 0)
347 		return;
348 
349 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
350 	if (dn->dn_datablkshift) {
351 		int blkshift = dn->dn_datablkshift;
352 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
353 		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
354 	} else {
355 		nblks = (offset < dn->dn_datablksz);
356 	}
357 
358 	if (nblks != 0) {
359 		blkid = dbuf_whichblock(dn, offset);
360 		for (i = 0; i < nblks; i++)
361 			dbuf_prefetch(dn, blkid+i);
362 	}
363 
364 	rw_exit(&dn->dn_struct_rwlock);
365 
366 	dnode_rele(dn, FTAG);
367 }
368 
369 static int
370 get_next_chunk(dnode_t *dn, uint64_t *offset, uint64_t limit)
371 {
372 	uint64_t len = limit - *offset;
373 	uint64_t chunk_len = dn->dn_datablksz * DMU_MAX_DELETEBLKCNT;
374 	uint64_t dn_used;
375 	int err;
376 
377 	ASSERT(limit <= *offset);
378 
379 	dn_used = dn->dn_phys->dn_used <<
380 	    (dn->dn_phys->dn_flags & DNODE_FLAG_USED_BYTES ? 0 : DEV_BSHIFT);
381 	if (len <= chunk_len || dn_used <= chunk_len) {
382 		*offset = limit;
383 		return (0);
384 	}
385 
386 	while (*offset > limit) {
387 		uint64_t initial_offset = *offset;
388 		uint64_t delta;
389 
390 		/* skip over allocated data */
391 		err = dnode_next_offset(dn,
392 		    DNODE_FIND_HOLE|DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
393 		if (err == ESRCH)
394 			*offset = limit;
395 		else if (err)
396 			return (err);
397 
398 		ASSERT3U(*offset, <=, initial_offset);
399 		delta = initial_offset - *offset;
400 		if (delta >= chunk_len) {
401 			*offset += delta - chunk_len;
402 			return (0);
403 		}
404 		chunk_len -= delta;
405 
406 		/* skip over unallocated data */
407 		err = dnode_next_offset(dn,
408 		    DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
409 		if (err == ESRCH)
410 			*offset = limit;
411 		else if (err)
412 			return (err);
413 
414 		if (*offset < limit)
415 			*offset = limit;
416 		ASSERT3U(*offset, <, initial_offset);
417 	}
418 	return (0);
419 }
420 
421 static int
422 dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
423     uint64_t length, boolean_t free_dnode)
424 {
425 	dmu_tx_t *tx;
426 	uint64_t object_size, start, end, len;
427 	boolean_t trunc = (length == DMU_OBJECT_END);
428 	int align, err;
429 
430 	align = 1 << dn->dn_datablkshift;
431 	ASSERT(align > 0);
432 	object_size = align == 1 ? dn->dn_datablksz :
433 	    (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
434 
435 	if (trunc || (end = offset + length) > object_size)
436 		end = object_size;
437 	if (end <= offset)
438 		return (0);
439 	length = end - offset;
440 
441 	while (length) {
442 		start = end;
443 		err = get_next_chunk(dn, &start, offset);
444 		if (err)
445 			return (err);
446 		len = trunc ? DMU_OBJECT_END : end - start;
447 
448 		tx = dmu_tx_create(os);
449 		dmu_tx_hold_free(tx, dn->dn_object, start, len);
450 		err = dmu_tx_assign(tx, TXG_WAIT);
451 		if (err) {
452 			dmu_tx_abort(tx);
453 			return (err);
454 		}
455 
456 		dnode_free_range(dn, start, trunc ? -1 : len, tx);
457 
458 		if (start == 0 && trunc && free_dnode)
459 			dnode_free(dn, tx);
460 
461 		length -= end - start;
462 
463 		dmu_tx_commit(tx);
464 		end = start;
465 		trunc = FALSE;
466 	}
467 	return (0);
468 }
469 
470 int
471 dmu_free_long_range(objset_t *os, uint64_t object,
472     uint64_t offset, uint64_t length)
473 {
474 	dnode_t *dn;
475 	int err;
476 
477 	err = dnode_hold(os->os, object, FTAG, &dn);
478 	if (err != 0)
479 		return (err);
480 	err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
481 	dnode_rele(dn, FTAG);
482 	return (err);
483 }
484 
485 int
486 dmu_free_object(objset_t *os, uint64_t object)
487 {
488 	dnode_t *dn;
489 	dmu_tx_t *tx;
490 	int err;
491 
492 	err = dnode_hold_impl(os->os, object, DNODE_MUST_BE_ALLOCATED,
493 	    FTAG, &dn);
494 	if (err != 0)
495 		return (err);
496 	if (dn->dn_nlevels == 1) {
497 		tx = dmu_tx_create(os);
498 		dmu_tx_hold_bonus(tx, object);
499 		dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
500 		err = dmu_tx_assign(tx, TXG_WAIT);
501 		if (err == 0) {
502 			dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
503 			dnode_free(dn, tx);
504 			dmu_tx_commit(tx);
505 		} else {
506 			dmu_tx_abort(tx);
507 		}
508 	} else {
509 		err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
510 	}
511 	dnode_rele(dn, FTAG);
512 	return (err);
513 }
514 
515 int
516 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
517     uint64_t size, dmu_tx_t *tx)
518 {
519 	dnode_t *dn;
520 	int err = dnode_hold(os->os, object, FTAG, &dn);
521 	if (err)
522 		return (err);
523 	ASSERT(offset < UINT64_MAX);
524 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
525 	dnode_free_range(dn, offset, size, tx);
526 	dnode_rele(dn, FTAG);
527 	return (0);
528 }
529 
530 int
531 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
532     void *buf)
533 {
534 	dnode_t *dn;
535 	dmu_buf_t **dbp;
536 	int numbufs, i, err;
537 
538 	err = dnode_hold(os->os, object, FTAG, &dn);
539 	if (err)
540 		return (err);
541 
542 	/*
543 	 * Deal with odd block sizes, where there can't be data past the first
544 	 * block.  If we ever do the tail block optimization, we will need to
545 	 * handle that here as well.
546 	 */
547 	if (dn->dn_datablkshift == 0) {
548 		int newsz = offset > dn->dn_datablksz ? 0 :
549 		    MIN(size, dn->dn_datablksz - offset);
550 		bzero((char *)buf + newsz, size - newsz);
551 		size = newsz;
552 	}
553 
554 	while (size > 0) {
555 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
556 
557 		/*
558 		 * NB: we could do this block-at-a-time, but it's nice
559 		 * to be reading in parallel.
560 		 */
561 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
562 		    TRUE, FTAG, &numbufs, &dbp);
563 		if (err)
564 			break;
565 
566 		for (i = 0; i < numbufs; i++) {
567 			int tocpy;
568 			int bufoff;
569 			dmu_buf_t *db = dbp[i];
570 
571 			ASSERT(size > 0);
572 
573 			bufoff = offset - db->db_offset;
574 			tocpy = (int)MIN(db->db_size - bufoff, size);
575 
576 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
577 
578 			offset += tocpy;
579 			size -= tocpy;
580 			buf = (char *)buf + tocpy;
581 		}
582 		dmu_buf_rele_array(dbp, numbufs, FTAG);
583 	}
584 	dnode_rele(dn, FTAG);
585 	return (err);
586 }
587 
588 void
589 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
590     const void *buf, dmu_tx_t *tx)
591 {
592 	dmu_buf_t **dbp;
593 	int numbufs, i;
594 
595 	if (size == 0)
596 		return;
597 
598 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
599 	    FALSE, FTAG, &numbufs, &dbp));
600 
601 	for (i = 0; i < numbufs; i++) {
602 		int tocpy;
603 		int bufoff;
604 		dmu_buf_t *db = dbp[i];
605 
606 		ASSERT(size > 0);
607 
608 		bufoff = offset - db->db_offset;
609 		tocpy = (int)MIN(db->db_size - bufoff, size);
610 
611 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
612 
613 		if (tocpy == db->db_size)
614 			dmu_buf_will_fill(db, tx);
615 		else
616 			dmu_buf_will_dirty(db, tx);
617 
618 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
619 
620 		if (tocpy == db->db_size)
621 			dmu_buf_fill_done(db, tx);
622 
623 		offset += tocpy;
624 		size -= tocpy;
625 		buf = (char *)buf + tocpy;
626 	}
627 	dmu_buf_rele_array(dbp, numbufs, FTAG);
628 }
629 
630 #ifdef _KERNEL
631 int
632 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
633 {
634 	dmu_buf_t **dbp;
635 	int numbufs, i, err;
636 
637 	/*
638 	 * NB: we could do this block-at-a-time, but it's nice
639 	 * to be reading in parallel.
640 	 */
641 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
642 	    &numbufs, &dbp);
643 	if (err)
644 		return (err);
645 
646 	for (i = 0; i < numbufs; i++) {
647 		int tocpy;
648 		int bufoff;
649 		dmu_buf_t *db = dbp[i];
650 
651 		ASSERT(size > 0);
652 
653 		bufoff = uio->uio_loffset - db->db_offset;
654 		tocpy = (int)MIN(db->db_size - bufoff, size);
655 
656 		err = uiomove((char *)db->db_data + bufoff, tocpy,
657 		    UIO_READ, uio);
658 		if (err)
659 			break;
660 
661 		size -= tocpy;
662 	}
663 	dmu_buf_rele_array(dbp, numbufs, FTAG);
664 
665 	return (err);
666 }
667 
668 int
669 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
670     dmu_tx_t *tx)
671 {
672 	dmu_buf_t **dbp;
673 	int numbufs, i;
674 	int err = 0;
675 
676 	if (size == 0)
677 		return (0);
678 
679 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
680 	    FALSE, FTAG, &numbufs, &dbp);
681 	if (err)
682 		return (err);
683 
684 	for (i = 0; i < numbufs; i++) {
685 		int tocpy;
686 		int bufoff;
687 		dmu_buf_t *db = dbp[i];
688 
689 		ASSERT(size > 0);
690 
691 		bufoff = uio->uio_loffset - db->db_offset;
692 		tocpy = (int)MIN(db->db_size - bufoff, size);
693 
694 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
695 
696 		if (tocpy == db->db_size)
697 			dmu_buf_will_fill(db, tx);
698 		else
699 			dmu_buf_will_dirty(db, tx);
700 
701 		/*
702 		 * XXX uiomove could block forever (eg. nfs-backed
703 		 * pages).  There needs to be a uiolockdown() function
704 		 * to lock the pages in memory, so that uiomove won't
705 		 * block.
706 		 */
707 		err = uiomove((char *)db->db_data + bufoff, tocpy,
708 		    UIO_WRITE, uio);
709 
710 		if (tocpy == db->db_size)
711 			dmu_buf_fill_done(db, tx);
712 
713 		if (err)
714 			break;
715 
716 		size -= tocpy;
717 	}
718 	dmu_buf_rele_array(dbp, numbufs, FTAG);
719 	return (err);
720 }
721 
722 int
723 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
724     page_t *pp, dmu_tx_t *tx)
725 {
726 	dmu_buf_t **dbp;
727 	int numbufs, i;
728 	int err;
729 
730 	if (size == 0)
731 		return (0);
732 
733 	err = dmu_buf_hold_array(os, object, offset, size,
734 	    FALSE, FTAG, &numbufs, &dbp);
735 	if (err)
736 		return (err);
737 
738 	for (i = 0; i < numbufs; i++) {
739 		int tocpy, copied, thiscpy;
740 		int bufoff;
741 		dmu_buf_t *db = dbp[i];
742 		caddr_t va;
743 
744 		ASSERT(size > 0);
745 		ASSERT3U(db->db_size, >=, PAGESIZE);
746 
747 		bufoff = offset - db->db_offset;
748 		tocpy = (int)MIN(db->db_size - bufoff, size);
749 
750 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
751 
752 		if (tocpy == db->db_size)
753 			dmu_buf_will_fill(db, tx);
754 		else
755 			dmu_buf_will_dirty(db, tx);
756 
757 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
758 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
759 			thiscpy = MIN(PAGESIZE, tocpy - copied);
760 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
761 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
762 			ppmapout(va);
763 			pp = pp->p_next;
764 			bufoff += PAGESIZE;
765 		}
766 
767 		if (tocpy == db->db_size)
768 			dmu_buf_fill_done(db, tx);
769 
770 		if (err)
771 			break;
772 
773 		offset += tocpy;
774 		size -= tocpy;
775 	}
776 	dmu_buf_rele_array(dbp, numbufs, FTAG);
777 	return (err);
778 }
779 #endif
780 
781 typedef struct {
782 	dbuf_dirty_record_t	*dr;
783 	dmu_sync_cb_t		*done;
784 	void			*arg;
785 } dmu_sync_arg_t;
786 
787 /* ARGSUSED */
788 static void
789 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
790 {
791 	dmu_sync_arg_t *in = varg;
792 	dbuf_dirty_record_t *dr = in->dr;
793 	dmu_buf_impl_t *db = dr->dr_dbuf;
794 	dmu_sync_cb_t *done = in->done;
795 
796 	if (!BP_IS_HOLE(zio->io_bp)) {
797 		zio->io_bp->blk_fill = 1;
798 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
799 		BP_SET_LEVEL(zio->io_bp, 0);
800 	}
801 
802 	mutex_enter(&db->db_mtx);
803 	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
804 	dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
805 	dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
806 	cv_broadcast(&db->db_changed);
807 	mutex_exit(&db->db_mtx);
808 
809 	if (done)
810 		done(&(db->db), in->arg);
811 
812 	kmem_free(in, sizeof (dmu_sync_arg_t));
813 }
814 
815 /*
816  * Intent log support: sync the block associated with db to disk.
817  * N.B. and XXX: the caller is responsible for making sure that the
818  * data isn't changing while dmu_sync() is writing it.
819  *
820  * Return values:
821  *
822  *	EEXIST: this txg has already been synced, so there's nothing to to.
823  *		The caller should not log the write.
824  *
825  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
826  *		The caller should not log the write.
827  *
828  *	EALREADY: this block is already in the process of being synced.
829  *		The caller should track its progress (somehow).
830  *
831  *	EINPROGRESS: the IO has been initiated.
832  *		The caller should log this blkptr in the callback.
833  *
834  *	0: completed.  Sets *bp to the blkptr just written.
835  *		The caller should log this blkptr immediately.
836  */
837 int
838 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
839     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
840 {
841 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
842 	objset_impl_t *os = db->db_objset;
843 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
844 	tx_state_t *tx = &dp->dp_tx;
845 	dbuf_dirty_record_t *dr;
846 	dmu_sync_arg_t *in;
847 	zbookmark_t zb;
848 	writeprops_t wp = { 0 };
849 	zio_t *zio;
850 	int zio_flags;
851 	int err;
852 
853 	ASSERT(BP_IS_HOLE(bp));
854 	ASSERT(txg != 0);
855 
856 
857 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
858 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
859 
860 	/*
861 	 * XXX - would be nice if we could do this without suspending...
862 	 */
863 	txg_suspend(dp);
864 
865 	/*
866 	 * If this txg already synced, there's nothing to do.
867 	 */
868 	if (txg <= tx->tx_synced_txg) {
869 		txg_resume(dp);
870 		/*
871 		 * If we're running ziltest, we need the blkptr regardless.
872 		 */
873 		if (txg > spa_freeze_txg(dp->dp_spa)) {
874 			/* if db_blkptr == NULL, this was an empty write */
875 			if (db->db_blkptr)
876 				*bp = *db->db_blkptr; /* structure assignment */
877 			return (0);
878 		}
879 		return (EEXIST);
880 	}
881 
882 	mutex_enter(&db->db_mtx);
883 
884 	if (txg == tx->tx_syncing_txg) {
885 		while (db->db_data_pending) {
886 			/*
887 			 * IO is in-progress.  Wait for it to finish.
888 			 * XXX - would be nice to be able to somehow "attach"
889 			 * this zio to the parent zio passed in.
890 			 */
891 			cv_wait(&db->db_changed, &db->db_mtx);
892 			if (!db->db_data_pending &&
893 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
894 				/*
895 				 * IO was compressed away
896 				 */
897 				*bp = *db->db_blkptr; /* structure assignment */
898 				mutex_exit(&db->db_mtx);
899 				txg_resume(dp);
900 				return (0);
901 			}
902 			ASSERT(db->db_data_pending ||
903 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
904 		}
905 
906 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
907 			/*
908 			 * IO is already completed.
909 			 */
910 			*bp = *db->db_blkptr; /* structure assignment */
911 			mutex_exit(&db->db_mtx);
912 			txg_resume(dp);
913 			return (0);
914 		}
915 	}
916 
917 	dr = db->db_last_dirty;
918 	while (dr && dr->dr_txg > txg)
919 		dr = dr->dr_next;
920 	if (dr == NULL || dr->dr_txg < txg) {
921 		/*
922 		 * This dbuf isn't dirty, must have been free_range'd.
923 		 * There's no need to log writes to freed blocks, so we're done.
924 		 */
925 		mutex_exit(&db->db_mtx);
926 		txg_resume(dp);
927 		return (ENOENT);
928 	}
929 
930 	ASSERT(dr->dr_txg == txg);
931 	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
932 		/*
933 		 * We have already issued a sync write for this buffer.
934 		 */
935 		mutex_exit(&db->db_mtx);
936 		txg_resume(dp);
937 		return (EALREADY);
938 	} else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
939 		/*
940 		 * This buffer has already been synced.  It could not
941 		 * have been dirtied since, or we would have cleared the state.
942 		 */
943 		*bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
944 		mutex_exit(&db->db_mtx);
945 		txg_resume(dp);
946 		return (0);
947 	}
948 
949 	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
950 	in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
951 	in->dr = dr;
952 	in->done = done;
953 	in->arg = arg;
954 	mutex_exit(&db->db_mtx);
955 	txg_resume(dp);
956 
957 	zb.zb_objset = os->os_dsl_dataset->ds_object;
958 	zb.zb_object = db->db.db_object;
959 	zb.zb_level = db->db_level;
960 	zb.zb_blkid = db->db_blkid;
961 	zio_flags = ZIO_FLAG_MUSTSUCCEED;
962 	if (dmu_ot[db->db_dnode->dn_type].ot_metadata || zb.zb_level != 0)
963 		zio_flags |= ZIO_FLAG_METADATA;
964 	wp.wp_type = db->db_dnode->dn_type;
965 	wp.wp_copies = os->os_copies;
966 	wp.wp_level = db->db_level;
967 	wp.wp_dnchecksum = db->db_dnode->dn_checksum;
968 	wp.wp_oschecksum = os->os_checksum;
969 	wp.wp_dncompress = db->db_dnode->dn_compress;
970 	wp.wp_oscompress = os->os_compress;
971 	zio = arc_write(pio, os->os_spa, &wp,
972 	    txg, bp, dr->dt.dl.dr_data, NULL, dmu_sync_done, in,
973 	    ZIO_PRIORITY_SYNC_WRITE, zio_flags, &zb);
974 
975 	if (pio) {
976 		zio_nowait(zio);
977 		err = EINPROGRESS;
978 	} else {
979 		err = zio_wait(zio);
980 		ASSERT(err == 0);
981 	}
982 	return (err);
983 }
984 
985 int
986 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
987 	dmu_tx_t *tx)
988 {
989 	dnode_t *dn;
990 	int err;
991 
992 	err = dnode_hold(os->os, object, FTAG, &dn);
993 	if (err)
994 		return (err);
995 	err = dnode_set_blksz(dn, size, ibs, tx);
996 	dnode_rele(dn, FTAG);
997 	return (err);
998 }
999 
1000 void
1001 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
1002 	dmu_tx_t *tx)
1003 {
1004 	dnode_t *dn;
1005 
1006 	/* XXX assumes dnode_hold will not get an i/o error */
1007 	(void) dnode_hold(os->os, object, FTAG, &dn);
1008 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
1009 	dn->dn_checksum = checksum;
1010 	dnode_setdirty(dn, tx);
1011 	dnode_rele(dn, FTAG);
1012 }
1013 
1014 void
1015 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
1016 	dmu_tx_t *tx)
1017 {
1018 	dnode_t *dn;
1019 
1020 	/* XXX assumes dnode_hold will not get an i/o error */
1021 	(void) dnode_hold(os->os, object, FTAG, &dn);
1022 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
1023 	dn->dn_compress = compress;
1024 	dnode_setdirty(dn, tx);
1025 	dnode_rele(dn, FTAG);
1026 }
1027 
1028 int
1029 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
1030 {
1031 	dnode_t *dn;
1032 	int i, err;
1033 
1034 	err = dnode_hold(os->os, object, FTAG, &dn);
1035 	if (err)
1036 		return (err);
1037 	/*
1038 	 * Sync any current changes before
1039 	 * we go trundling through the block pointers.
1040 	 */
1041 	for (i = 0; i < TXG_SIZE; i++) {
1042 		if (list_link_active(&dn->dn_dirty_link[i]))
1043 			break;
1044 	}
1045 	if (i != TXG_SIZE) {
1046 		dnode_rele(dn, FTAG);
1047 		txg_wait_synced(dmu_objset_pool(os), 0);
1048 		err = dnode_hold(os->os, object, FTAG, &dn);
1049 		if (err)
1050 			return (err);
1051 	}
1052 
1053 	err = dnode_next_offset(dn, (hole ? DNODE_FIND_HOLE : 0), off, 1, 1, 0);
1054 	dnode_rele(dn, FTAG);
1055 
1056 	return (err);
1057 }
1058 
1059 void
1060 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
1061 {
1062 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
1063 	mutex_enter(&dn->dn_mtx);
1064 
1065 	doi->doi_data_block_size = dn->dn_datablksz;
1066 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
1067 	    1ULL << dn->dn_indblkshift : 0;
1068 	doi->doi_indirection = dn->dn_nlevels;
1069 	doi->doi_checksum = dn->dn_checksum;
1070 	doi->doi_compress = dn->dn_compress;
1071 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
1072 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
1073 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
1074 	doi->doi_type = dn->dn_type;
1075 	doi->doi_bonus_size = dn->dn_bonuslen;
1076 	doi->doi_bonus_type = dn->dn_bonustype;
1077 
1078 	mutex_exit(&dn->dn_mtx);
1079 	rw_exit(&dn->dn_struct_rwlock);
1080 }
1081 
1082 /*
1083  * Get information on a DMU object.
1084  * If doi is NULL, just indicates whether the object exists.
1085  */
1086 int
1087 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
1088 {
1089 	dnode_t *dn;
1090 	int err = dnode_hold(os->os, object, FTAG, &dn);
1091 
1092 	if (err)
1093 		return (err);
1094 
1095 	if (doi != NULL)
1096 		dmu_object_info_from_dnode(dn, doi);
1097 
1098 	dnode_rele(dn, FTAG);
1099 	return (0);
1100 }
1101 
1102 /*
1103  * As above, but faster; can be used when you have a held dbuf in hand.
1104  */
1105 void
1106 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
1107 {
1108 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
1109 }
1110 
1111 /*
1112  * Faster still when you only care about the size.
1113  * This is specifically optimized for zfs_getattr().
1114  */
1115 void
1116 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
1117 {
1118 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
1119 
1120 	*blksize = dn->dn_datablksz;
1121 	/* add 1 for dnode space */
1122 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
1123 	    SPA_MINBLOCKSHIFT) + 1;
1124 }
1125 
1126 void
1127 byteswap_uint64_array(void *vbuf, size_t size)
1128 {
1129 	uint64_t *buf = vbuf;
1130 	size_t count = size >> 3;
1131 	int i;
1132 
1133 	ASSERT((size & 7) == 0);
1134 
1135 	for (i = 0; i < count; i++)
1136 		buf[i] = BSWAP_64(buf[i]);
1137 }
1138 
1139 void
1140 byteswap_uint32_array(void *vbuf, size_t size)
1141 {
1142 	uint32_t *buf = vbuf;
1143 	size_t count = size >> 2;
1144 	int i;
1145 
1146 	ASSERT((size & 3) == 0);
1147 
1148 	for (i = 0; i < count; i++)
1149 		buf[i] = BSWAP_32(buf[i]);
1150 }
1151 
1152 void
1153 byteswap_uint16_array(void *vbuf, size_t size)
1154 {
1155 	uint16_t *buf = vbuf;
1156 	size_t count = size >> 1;
1157 	int i;
1158 
1159 	ASSERT((size & 1) == 0);
1160 
1161 	for (i = 0; i < count; i++)
1162 		buf[i] = BSWAP_16(buf[i]);
1163 }
1164 
1165 /* ARGSUSED */
1166 void
1167 byteswap_uint8_array(void *vbuf, size_t size)
1168 {
1169 }
1170 
1171 void
1172 dmu_init(void)
1173 {
1174 	dbuf_init();
1175 	dnode_init();
1176 	arc_init();
1177 	l2arc_init();
1178 }
1179 
1180 void
1181 dmu_fini(void)
1182 {
1183 	arc_fini();
1184 	dnode_fini();
1185 	dbuf_fini();
1186 	l2arc_fini();
1187 }
1188