xref: /titanic_50/usr/src/uts/common/fs/zfs/dmu.c (revision cef4cb45384ad79f5c070e9b8d8dff2ed3481ed4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/dmu.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dbuf.h>
32 #include <sys/dnode.h>
33 #include <sys/zfs_context.h>
34 #include <sys/dmu_objset.h>
35 #include <sys/dmu_traverse.h>
36 #include <sys/dsl_dataset.h>
37 #include <sys/dsl_dir.h>
38 #include <sys/dsl_pool.h>
39 #include <sys/dsl_synctask.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dmu_zfetch.h>
42 #include <sys/zfs_ioctl.h>
43 #include <sys/zap.h>
44 #include <sys/zio_checksum.h>
45 #ifdef _KERNEL
46 #include <sys/vmsystm.h>
47 #endif
48 
49 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
50 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
51 	{	zap_byteswap,		TRUE,	"object directory"	},
52 	{	byteswap_uint64_array,	TRUE,	"object array"		},
53 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
54 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
55 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
56 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
57 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
58 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
59 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
60 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
61 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
62 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
63 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
64 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
65 	{	zap_byteswap,		TRUE,	"DSL props"		},
66 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
67 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
68 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
69 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
70 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
71 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
72 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
73 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
74 	{	zap_byteswap,		TRUE,	"zvol prop"		},
75 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
76 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
77 	{	zap_byteswap,		TRUE,	"other ZAP"		},
78 	{	zap_byteswap,		TRUE,	"persistent error log"	},
79 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
80 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
81 };
82 
83 int
84 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
85     void *tag, dmu_buf_t **dbp)
86 {
87 	dnode_t *dn;
88 	uint64_t blkid;
89 	dmu_buf_impl_t *db;
90 	int err;
91 
92 	err = dnode_hold(os->os, object, FTAG, &dn);
93 	if (err)
94 		return (err);
95 	blkid = dbuf_whichblock(dn, offset);
96 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
97 	db = dbuf_hold(dn, blkid, tag);
98 	rw_exit(&dn->dn_struct_rwlock);
99 	if (db == NULL) {
100 		err = EIO;
101 	} else {
102 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
103 		if (err) {
104 			dbuf_rele(db, tag);
105 			db = NULL;
106 		}
107 	}
108 
109 	dnode_rele(dn, FTAG);
110 	*dbp = &db->db;
111 	return (err);
112 }
113 
114 int
115 dmu_bonus_max(void)
116 {
117 	return (DN_MAX_BONUSLEN);
118 }
119 
120 /*
121  * returns ENOENT, EIO, or 0.
122  */
123 int
124 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
125 {
126 	dnode_t *dn;
127 	int err, count;
128 	dmu_buf_impl_t *db;
129 
130 	err = dnode_hold(os->os, object, FTAG, &dn);
131 	if (err)
132 		return (err);
133 
134 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
135 	if (dn->dn_bonus == NULL) {
136 		rw_exit(&dn->dn_struct_rwlock);
137 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
138 		if (dn->dn_bonus == NULL)
139 			dn->dn_bonus = dbuf_create_bonus(dn);
140 	}
141 	db = dn->dn_bonus;
142 	rw_exit(&dn->dn_struct_rwlock);
143 	mutex_enter(&db->db_mtx);
144 	count = refcount_add(&db->db_holds, tag);
145 	mutex_exit(&db->db_mtx);
146 	if (count == 1)
147 		dnode_add_ref(dn, db);
148 	dnode_rele(dn, FTAG);
149 
150 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
151 
152 	*dbp = &db->db;
153 	return (0);
154 }
155 
156 /*
157  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
158  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
159  * and can induce severe lock contention when writing to several files
160  * whose dnodes are in the same block.
161  */
162 static int
163 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
164     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
165 {
166 	dmu_buf_t **dbp;
167 	uint64_t blkid, nblks, i;
168 	uint32_t flags;
169 	int err;
170 	zio_t *zio;
171 
172 	ASSERT(length <= DMU_MAX_ACCESS);
173 
174 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
175 	if (length > zfetch_array_rd_sz)
176 		flags |= DB_RF_NOPREFETCH;
177 
178 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
179 	if (dn->dn_datablkshift) {
180 		int blkshift = dn->dn_datablkshift;
181 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
182 			P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
183 	} else {
184 		ASSERT3U(offset + length, <=, dn->dn_datablksz);
185 		nblks = 1;
186 	}
187 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
188 
189 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, TRUE);
190 	blkid = dbuf_whichblock(dn, offset);
191 	for (i = 0; i < nblks; i++) {
192 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
193 		if (db == NULL) {
194 			rw_exit(&dn->dn_struct_rwlock);
195 			dmu_buf_rele_array(dbp, nblks, tag);
196 			zio_nowait(zio);
197 			return (EIO);
198 		}
199 		/* initiate async i/o */
200 		if (read) {
201 			rw_exit(&dn->dn_struct_rwlock);
202 			(void) dbuf_read(db, zio, flags);
203 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
204 		}
205 		dbp[i] = &db->db;
206 	}
207 	rw_exit(&dn->dn_struct_rwlock);
208 
209 	/* wait for async i/o */
210 	err = zio_wait(zio);
211 	if (err) {
212 		dmu_buf_rele_array(dbp, nblks, tag);
213 		return (err);
214 	}
215 
216 	/* wait for other io to complete */
217 	if (read) {
218 		for (i = 0; i < nblks; i++) {
219 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
220 			mutex_enter(&db->db_mtx);
221 			while (db->db_state == DB_READ ||
222 			    db->db_state == DB_FILL)
223 				cv_wait(&db->db_changed, &db->db_mtx);
224 			if (db->db_state == DB_UNCACHED)
225 				err = EIO;
226 			mutex_exit(&db->db_mtx);
227 			if (err) {
228 				dmu_buf_rele_array(dbp, nblks, tag);
229 				return (err);
230 			}
231 		}
232 	}
233 
234 	*numbufsp = nblks;
235 	*dbpp = dbp;
236 	return (0);
237 }
238 
239 static int
240 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
241     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
242 {
243 	dnode_t *dn;
244 	int err;
245 
246 	err = dnode_hold(os->os, object, FTAG, &dn);
247 	if (err)
248 		return (err);
249 
250 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
251 	    numbufsp, dbpp);
252 
253 	dnode_rele(dn, FTAG);
254 
255 	return (err);
256 }
257 
258 int
259 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
260     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
261 {
262 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
263 	int err;
264 
265 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
266 	    numbufsp, dbpp);
267 
268 	return (err);
269 }
270 
271 void
272 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
273 {
274 	int i;
275 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
276 
277 	if (numbufs == 0)
278 		return;
279 
280 	for (i = 0; i < numbufs; i++) {
281 		if (dbp[i])
282 			dbuf_rele(dbp[i], tag);
283 	}
284 
285 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
286 }
287 
288 void
289 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
290 {
291 	dnode_t *dn;
292 	uint64_t blkid;
293 	int nblks, i, err;
294 
295 	if (zfs_prefetch_disable)
296 		return;
297 
298 	if (len == 0) {  /* they're interested in the bonus buffer */
299 		dn = os->os->os_meta_dnode;
300 
301 		if (object == 0 || object >= DN_MAX_OBJECT)
302 			return;
303 
304 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
305 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
306 		dbuf_prefetch(dn, blkid);
307 		rw_exit(&dn->dn_struct_rwlock);
308 		return;
309 	}
310 
311 	/*
312 	 * XXX - Note, if the dnode for the requested object is not
313 	 * already cached, we will do a *synchronous* read in the
314 	 * dnode_hold() call.  The same is true for any indirects.
315 	 */
316 	err = dnode_hold(os->os, object, FTAG, &dn);
317 	if (err != 0)
318 		return;
319 
320 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
321 	if (dn->dn_datablkshift) {
322 		int blkshift = dn->dn_datablkshift;
323 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
324 			P2ALIGN(offset, 1<<blkshift)) >> blkshift;
325 	} else {
326 		nblks = (offset < dn->dn_datablksz);
327 	}
328 
329 	if (nblks != 0) {
330 		blkid = dbuf_whichblock(dn, offset);
331 		for (i = 0; i < nblks; i++)
332 			dbuf_prefetch(dn, blkid+i);
333 	}
334 
335 	rw_exit(&dn->dn_struct_rwlock);
336 
337 	dnode_rele(dn, FTAG);
338 }
339 
340 int
341 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
342     uint64_t size, dmu_tx_t *tx)
343 {
344 	dnode_t *dn;
345 	int err = dnode_hold(os->os, object, FTAG, &dn);
346 	if (err)
347 		return (err);
348 	ASSERT(offset < UINT64_MAX);
349 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
350 	dnode_free_range(dn, offset, size, tx);
351 	dnode_rele(dn, FTAG);
352 	return (0);
353 }
354 
355 int
356 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
357     void *buf)
358 {
359 	dnode_t *dn;
360 	dmu_buf_t **dbp;
361 	int numbufs, i, err;
362 
363 	/*
364 	 * Deal with odd block sizes, where there can't be data past the
365 	 * first block.
366 	 */
367 	err = dnode_hold(os->os, object, FTAG, &dn);
368 	if (err)
369 		return (err);
370 	if (dn->dn_datablkshift == 0) {
371 		int newsz = offset > dn->dn_datablksz ? 0 :
372 		    MIN(size, dn->dn_datablksz - offset);
373 		bzero((char *)buf + newsz, size - newsz);
374 		size = newsz;
375 	}
376 
377 	while (size > 0) {
378 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
379 		int err;
380 
381 		/*
382 		 * NB: we could do this block-at-a-time, but it's nice
383 		 * to be reading in parallel.
384 		 */
385 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
386 		    TRUE, FTAG, &numbufs, &dbp);
387 		if (err)
388 			return (err);
389 
390 		for (i = 0; i < numbufs; i++) {
391 			int tocpy;
392 			int bufoff;
393 			dmu_buf_t *db = dbp[i];
394 
395 			ASSERT(size > 0);
396 
397 			bufoff = offset - db->db_offset;
398 			tocpy = (int)MIN(db->db_size - bufoff, size);
399 
400 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
401 
402 			offset += tocpy;
403 			size -= tocpy;
404 			buf = (char *)buf + tocpy;
405 		}
406 		dmu_buf_rele_array(dbp, numbufs, FTAG);
407 	}
408 	dnode_rele(dn, FTAG);
409 	return (0);
410 }
411 
412 void
413 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
414     const void *buf, dmu_tx_t *tx)
415 {
416 	dmu_buf_t **dbp;
417 	int numbufs, i;
418 
419 	if (size == 0)
420 		return;
421 
422 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
423 	    FALSE, FTAG, &numbufs, &dbp));
424 
425 	for (i = 0; i < numbufs; i++) {
426 		int tocpy;
427 		int bufoff;
428 		dmu_buf_t *db = dbp[i];
429 
430 		ASSERT(size > 0);
431 
432 		bufoff = offset - db->db_offset;
433 		tocpy = (int)MIN(db->db_size - bufoff, size);
434 
435 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
436 
437 		if (tocpy == db->db_size)
438 			dmu_buf_will_fill(db, tx);
439 		else
440 			dmu_buf_will_dirty(db, tx);
441 
442 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
443 
444 		if (tocpy == db->db_size)
445 			dmu_buf_fill_done(db, tx);
446 
447 		offset += tocpy;
448 		size -= tocpy;
449 		buf = (char *)buf + tocpy;
450 	}
451 	dmu_buf_rele_array(dbp, numbufs, FTAG);
452 }
453 
454 #ifdef _KERNEL
455 int
456 dmu_write_uio(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
457     uio_t *uio, dmu_tx_t *tx)
458 {
459 	dmu_buf_t **dbp;
460 	int numbufs, i;
461 	int err = 0;
462 
463 	if (size == 0)
464 		return (0);
465 
466 	err = dmu_buf_hold_array(os, object, offset, size,
467 	    FALSE, FTAG, &numbufs, &dbp);
468 	if (err)
469 		return (err);
470 
471 	for (i = 0; i < numbufs; i++) {
472 		int tocpy;
473 		int bufoff;
474 		dmu_buf_t *db = dbp[i];
475 
476 		ASSERT(size > 0);
477 
478 		bufoff = offset - db->db_offset;
479 		tocpy = (int)MIN(db->db_size - bufoff, size);
480 
481 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
482 
483 		if (tocpy == db->db_size)
484 			dmu_buf_will_fill(db, tx);
485 		else
486 			dmu_buf_will_dirty(db, tx);
487 
488 		/*
489 		 * XXX uiomove could block forever (eg. nfs-backed
490 		 * pages).  There needs to be a uiolockdown() function
491 		 * to lock the pages in memory, so that uiomove won't
492 		 * block.
493 		 */
494 		err = uiomove((char *)db->db_data + bufoff, tocpy,
495 		    UIO_WRITE, uio);
496 
497 		if (tocpy == db->db_size)
498 			dmu_buf_fill_done(db, tx);
499 
500 		if (err)
501 			break;
502 
503 		offset += tocpy;
504 		size -= tocpy;
505 	}
506 	dmu_buf_rele_array(dbp, numbufs, FTAG);
507 	return (err);
508 }
509 
510 int
511 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
512     page_t *pp, dmu_tx_t *tx)
513 {
514 	dmu_buf_t **dbp;
515 	int numbufs, i;
516 	int err;
517 
518 	if (size == 0)
519 		return (0);
520 
521 	err = dmu_buf_hold_array(os, object, offset, size,
522 	    FALSE, FTAG, &numbufs, &dbp);
523 	if (err)
524 		return (err);
525 
526 	for (i = 0; i < numbufs; i++) {
527 		int tocpy, copied, thiscpy;
528 		int bufoff;
529 		dmu_buf_t *db = dbp[i];
530 		caddr_t va;
531 
532 		ASSERT(size > 0);
533 		ASSERT3U(db->db_size, >=, PAGESIZE);
534 
535 		bufoff = offset - db->db_offset;
536 		tocpy = (int)MIN(db->db_size - bufoff, size);
537 
538 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
539 
540 		if (tocpy == db->db_size)
541 			dmu_buf_will_fill(db, tx);
542 		else
543 			dmu_buf_will_dirty(db, tx);
544 
545 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
546 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
547 			thiscpy = MIN(PAGESIZE, tocpy - copied);
548 			va = ppmapin(pp, PROT_READ, (caddr_t)-1);
549 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
550 			ppmapout(va);
551 			pp = pp->p_next;
552 			bufoff += PAGESIZE;
553 		}
554 
555 		if (tocpy == db->db_size)
556 			dmu_buf_fill_done(db, tx);
557 
558 		if (err)
559 			break;
560 
561 		offset += tocpy;
562 		size -= tocpy;
563 	}
564 	dmu_buf_rele_array(dbp, numbufs, FTAG);
565 	return (err);
566 }
567 #endif
568 
569 typedef struct {
570 	dbuf_dirty_record_t	*dr;
571 	dmu_sync_cb_t		*done;
572 	void			*arg;
573 } dmu_sync_arg_t;
574 
575 /* ARGSUSED */
576 static void
577 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
578 {
579 	dmu_sync_arg_t *in = varg;
580 	dbuf_dirty_record_t *dr = in->dr;
581 	dmu_buf_impl_t *db = dr->dr_dbuf;
582 	dmu_sync_cb_t *done = in->done;
583 
584 	if (!BP_IS_HOLE(zio->io_bp)) {
585 		zio->io_bp->blk_fill = 1;
586 		BP_SET_TYPE(zio->io_bp, db->db_dnode->dn_type);
587 		BP_SET_LEVEL(zio->io_bp, 0);
588 	}
589 
590 	mutex_enter(&db->db_mtx);
591 	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
592 	dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
593 	dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
594 	cv_broadcast(&db->db_changed);
595 	mutex_exit(&db->db_mtx);
596 
597 	if (done)
598 		done(&(db->db), in->arg);
599 
600 	kmem_free(in, sizeof (dmu_sync_arg_t));
601 }
602 
603 /*
604  * Intent log support: sync the block associated with db to disk.
605  * N.B. and XXX: the caller is responsible for making sure that the
606  * data isn't changing while dmu_sync() is writing it.
607  *
608  * Return values:
609  *
610  *	EEXIST: this txg has already been synced, so there's nothing to to.
611  *		The caller should not log the write.
612  *
613  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
614  *		The caller should not log the write.
615  *
616  *	EALREADY: this block is already in the process of being synced.
617  *		The caller should track its progress (somehow).
618  *
619  *	EINPROGRESS: the IO has been initiated.
620  *		The caller should log this blkptr in the callback.
621  *
622  *	0: completed.  Sets *bp to the blkptr just written.
623  *		The caller should log this blkptr immediately.
624  */
625 int
626 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
627     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
628 {
629 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
630 	objset_impl_t *os = db->db_objset;
631 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
632 	tx_state_t *tx = &dp->dp_tx;
633 	dbuf_dirty_record_t *dr;
634 	dmu_sync_arg_t *in;
635 	zbookmark_t zb;
636 	zio_t *zio;
637 	int err;
638 
639 	ASSERT(BP_IS_HOLE(bp));
640 	ASSERT(txg != 0);
641 
642 
643 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
644 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
645 
646 	/*
647 	 * XXX - would be nice if we could do this without suspending...
648 	 */
649 	txg_suspend(dp);
650 
651 	/*
652 	 * If this txg already synced, there's nothing to do.
653 	 */
654 	if (txg <= tx->tx_synced_txg) {
655 		txg_resume(dp);
656 		/*
657 		 * If we're running ziltest, we need the blkptr regardless.
658 		 */
659 		if (txg > spa_freeze_txg(dp->dp_spa)) {
660 			/* if db_blkptr == NULL, this was an empty write */
661 			if (db->db_blkptr)
662 				*bp = *db->db_blkptr; /* structure assignment */
663 			return (0);
664 		}
665 		return (EEXIST);
666 	}
667 
668 	mutex_enter(&db->db_mtx);
669 
670 	if (txg == tx->tx_syncing_txg) {
671 		while (db->db_data_pending) {
672 			/*
673 			 * IO is in-progress.  Wait for it to finish.
674 			 * XXX - would be nice to be able to somehow "attach"
675 			 * this zio to the parent zio passed in.
676 			 */
677 			cv_wait(&db->db_changed, &db->db_mtx);
678 			if (!db->db_data_pending &&
679 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
680 				/*
681 				 * IO was compressed away
682 				 */
683 				*bp = *db->db_blkptr; /* structure assignment */
684 				mutex_exit(&db->db_mtx);
685 				txg_resume(dp);
686 				return (0);
687 			}
688 			ASSERT(db->db_data_pending ||
689 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
690 		}
691 
692 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
693 			/*
694 			 * IO is already completed.
695 			 */
696 			*bp = *db->db_blkptr; /* structure assignment */
697 			mutex_exit(&db->db_mtx);
698 			txg_resume(dp);
699 			return (0);
700 		}
701 	}
702 
703 	dr = db->db_last_dirty;
704 	while (dr && dr->dr_txg > txg)
705 		dr = dr->dr_next;
706 	if (dr == NULL || dr->dr_txg < txg) {
707 		/*
708 		 * This dbuf isn't dirty, must have been free_range'd.
709 		 * There's no need to log writes to freed blocks, so we're done.
710 		 */
711 		mutex_exit(&db->db_mtx);
712 		txg_resume(dp);
713 		return (ENOENT);
714 	}
715 
716 	ASSERT(dr->dr_txg == txg);
717 	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
718 		/*
719 		 * We have already issued a sync write for this buffer.
720 		 */
721 		mutex_exit(&db->db_mtx);
722 		txg_resume(dp);
723 		return (EALREADY);
724 	} else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
725 		/*
726 		 * This buffer has already been synced.  It could not
727 		 * have been dirtied since, or we would have cleared the state.
728 		 */
729 		*bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
730 		mutex_exit(&db->db_mtx);
731 		txg_resume(dp);
732 		return (0);
733 	}
734 
735 	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
736 	in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
737 	in->dr = dr;
738 	in->done = done;
739 	in->arg = arg;
740 	mutex_exit(&db->db_mtx);
741 	txg_resume(dp);
742 
743 	zb.zb_objset = os->os_dsl_dataset->ds_object;
744 	zb.zb_object = db->db.db_object;
745 	zb.zb_level = db->db_level;
746 	zb.zb_blkid = db->db_blkid;
747 	zio = arc_write(pio, os->os_spa,
748 	    zio_checksum_select(db->db_dnode->dn_checksum, os->os_checksum),
749 	    zio_compress_select(db->db_dnode->dn_compress, os->os_compress),
750 	    dmu_get_replication_level(os->os_spa, &zb, db->db_dnode->dn_type),
751 	    txg, bp, dr->dt.dl.dr_data, NULL, dmu_sync_done, in,
752 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
753 
754 	if (pio) {
755 		zio_nowait(zio);
756 		err = EINPROGRESS;
757 	} else {
758 		err = zio_wait(zio);
759 		ASSERT(err == 0);
760 	}
761 	return (err);
762 }
763 
764 int
765 dmu_object_set_blocksize(objset_t *os, uint64_t object, uint64_t size, int ibs,
766 	dmu_tx_t *tx)
767 {
768 	dnode_t *dn;
769 	int err;
770 
771 	err = dnode_hold(os->os, object, FTAG, &dn);
772 	if (err)
773 		return (err);
774 	err = dnode_set_blksz(dn, size, ibs, tx);
775 	dnode_rele(dn, FTAG);
776 	return (err);
777 }
778 
779 void
780 dmu_object_set_checksum(objset_t *os, uint64_t object, uint8_t checksum,
781 	dmu_tx_t *tx)
782 {
783 	dnode_t *dn;
784 
785 	/* XXX assumes dnode_hold will not get an i/o error */
786 	(void) dnode_hold(os->os, object, FTAG, &dn);
787 	ASSERT(checksum < ZIO_CHECKSUM_FUNCTIONS);
788 	dn->dn_checksum = checksum;
789 	dnode_setdirty(dn, tx);
790 	dnode_rele(dn, FTAG);
791 }
792 
793 void
794 dmu_object_set_compress(objset_t *os, uint64_t object, uint8_t compress,
795 	dmu_tx_t *tx)
796 {
797 	dnode_t *dn;
798 
799 	/* XXX assumes dnode_hold will not get an i/o error */
800 	(void) dnode_hold(os->os, object, FTAG, &dn);
801 	ASSERT(compress < ZIO_COMPRESS_FUNCTIONS);
802 	dn->dn_compress = compress;
803 	dnode_setdirty(dn, tx);
804 	dnode_rele(dn, FTAG);
805 }
806 
807 /*
808  * XXX - eventually, this should take into account per-dataset (or
809  *       even per-object?) user requests for higher levels of replication.
810  */
811 int
812 dmu_get_replication_level(spa_t *spa, zbookmark_t *zb, dmu_object_type_t ot)
813 {
814 	int ncopies = 1;
815 
816 	if (dmu_ot[ot].ot_metadata)
817 		ncopies++;
818 	if (zb->zb_level != 0)
819 		ncopies++;
820 	if (zb->zb_objset == 0 && zb->zb_object == 0)
821 		ncopies++;
822 	return (MIN(ncopies, spa_max_replication(spa)));
823 }
824 
825 int
826 dmu_offset_next(objset_t *os, uint64_t object, boolean_t hole, uint64_t *off)
827 {
828 	dnode_t *dn;
829 	int i, err;
830 
831 	err = dnode_hold(os->os, object, FTAG, &dn);
832 	if (err)
833 		return (err);
834 	/*
835 	 * Sync any current changes before
836 	 * we go trundling through the block pointers.
837 	 */
838 	for (i = 0; i < TXG_SIZE; i++) {
839 		if (list_link_active(&dn->dn_dirty_link[i]))
840 			break;
841 	}
842 	if (i != TXG_SIZE) {
843 		dnode_rele(dn, FTAG);
844 		txg_wait_synced(dmu_objset_pool(os), 0);
845 		err = dnode_hold(os->os, object, FTAG, &dn);
846 		if (err)
847 			return (err);
848 	}
849 
850 	err = dnode_next_offset(dn, hole, off, 1, 1, 0);
851 	dnode_rele(dn, FTAG);
852 
853 	return (err);
854 }
855 
856 void
857 dmu_object_info_from_dnode(dnode_t *dn, dmu_object_info_t *doi)
858 {
859 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
860 	mutex_enter(&dn->dn_mtx);
861 
862 	doi->doi_data_block_size = dn->dn_datablksz;
863 	doi->doi_metadata_block_size = dn->dn_indblkshift ?
864 	    1ULL << dn->dn_indblkshift : 0;
865 	doi->doi_indirection = dn->dn_nlevels;
866 	doi->doi_checksum = dn->dn_checksum;
867 	doi->doi_compress = dn->dn_compress;
868 	doi->doi_physical_blks = (DN_USED_BYTES(dn->dn_phys) +
869 	    SPA_MINBLOCKSIZE/2) >> SPA_MINBLOCKSHIFT;
870 	doi->doi_max_block_offset = dn->dn_phys->dn_maxblkid;
871 	doi->doi_type = dn->dn_type;
872 	doi->doi_bonus_size = dn->dn_bonuslen;
873 	doi->doi_bonus_type = dn->dn_bonustype;
874 
875 	mutex_exit(&dn->dn_mtx);
876 	rw_exit(&dn->dn_struct_rwlock);
877 }
878 
879 /*
880  * Get information on a DMU object.
881  * If doi is NULL, just indicates whether the object exists.
882  */
883 int
884 dmu_object_info(objset_t *os, uint64_t object, dmu_object_info_t *doi)
885 {
886 	dnode_t *dn;
887 	int err = dnode_hold(os->os, object, FTAG, &dn);
888 
889 	if (err)
890 		return (err);
891 
892 	if (doi != NULL)
893 		dmu_object_info_from_dnode(dn, doi);
894 
895 	dnode_rele(dn, FTAG);
896 	return (0);
897 }
898 
899 /*
900  * As above, but faster; can be used when you have a held dbuf in hand.
901  */
902 void
903 dmu_object_info_from_db(dmu_buf_t *db, dmu_object_info_t *doi)
904 {
905 	dmu_object_info_from_dnode(((dmu_buf_impl_t *)db)->db_dnode, doi);
906 }
907 
908 /*
909  * Faster still when you only care about the size.
910  * This is specifically optimized for zfs_getattr().
911  */
912 void
913 dmu_object_size_from_db(dmu_buf_t *db, uint32_t *blksize, u_longlong_t *nblk512)
914 {
915 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
916 
917 	*blksize = dn->dn_datablksz;
918 	/* add 1 for dnode space */
919 	*nblk512 = ((DN_USED_BYTES(dn->dn_phys) + SPA_MINBLOCKSIZE/2) >>
920 	    SPA_MINBLOCKSHIFT) + 1;
921 }
922 
923 void
924 byteswap_uint64_array(void *vbuf, size_t size)
925 {
926 	uint64_t *buf = vbuf;
927 	size_t count = size >> 3;
928 	int i;
929 
930 	ASSERT((size & 7) == 0);
931 
932 	for (i = 0; i < count; i++)
933 		buf[i] = BSWAP_64(buf[i]);
934 }
935 
936 void
937 byteswap_uint32_array(void *vbuf, size_t size)
938 {
939 	uint32_t *buf = vbuf;
940 	size_t count = size >> 2;
941 	int i;
942 
943 	ASSERT((size & 3) == 0);
944 
945 	for (i = 0; i < count; i++)
946 		buf[i] = BSWAP_32(buf[i]);
947 }
948 
949 void
950 byteswap_uint16_array(void *vbuf, size_t size)
951 {
952 	uint16_t *buf = vbuf;
953 	size_t count = size >> 1;
954 	int i;
955 
956 	ASSERT((size & 1) == 0);
957 
958 	for (i = 0; i < count; i++)
959 		buf[i] = BSWAP_16(buf[i]);
960 }
961 
962 /* ARGSUSED */
963 void
964 byteswap_uint8_array(void *vbuf, size_t size)
965 {
966 }
967 
968 void
969 dmu_init(void)
970 {
971 	dbuf_init();
972 	dnode_init();
973 	arc_init();
974 }
975 
976 void
977 dmu_fini(void)
978 {
979 	arc_fini();
980 	dnode_fini();
981 	dbuf_fini();
982 }
983