xref: /titanic_44/usr/src/uts/common/fs/zfs/dbuf.c (revision 1b5f7228f8eb94767e87bc125455926c72572e72)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/zfs_context.h>
29 #include <sys/dmu.h>
30 #include <sys/dmu_impl.h>
31 #include <sys/dbuf.h>
32 #include <sys/dmu_objset.h>
33 #include <sys/dsl_dataset.h>
34 #include <sys/dsl_dir.h>
35 #include <sys/dmu_tx.h>
36 #include <sys/spa.h>
37 #include <sys/zio.h>
38 #include <sys/dmu_zfetch.h>
39 
40 static void dbuf_destroy(dmu_buf_impl_t *db);
41 static int dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx);
42 static arc_done_func_t dbuf_write_done;
43 
44 int zfs_mdcomp_disable = 0;
45 
46 /*
47  * Global data structures and functions for the dbuf cache.
48  */
49 taskq_t *dbuf_tq;
50 static kmem_cache_t *dbuf_cache;
51 
52 /* ARGSUSED */
53 static int
54 dbuf_cons(void *vdb, void *unused, int kmflag)
55 {
56 	dmu_buf_impl_t *db = vdb;
57 	bzero(db, sizeof (dmu_buf_impl_t));
58 
59 	mutex_init(&db->db_mtx, NULL, MUTEX_DEFAULT, NULL);
60 	cv_init(&db->db_changed, NULL, CV_DEFAULT, NULL);
61 	refcount_create(&db->db_holds);
62 	return (0);
63 }
64 
65 /* ARGSUSED */
66 static void
67 dbuf_dest(void *vdb, void *unused)
68 {
69 	dmu_buf_impl_t *db = vdb;
70 	mutex_destroy(&db->db_mtx);
71 	cv_destroy(&db->db_changed);
72 	refcount_destroy(&db->db_holds);
73 }
74 
75 /*
76  * dbuf hash table routines
77  */
78 static dbuf_hash_table_t dbuf_hash_table;
79 
80 static uint64_t dbuf_hash_count;
81 
82 static uint64_t
83 dbuf_hash(void *os, uint64_t obj, uint8_t lvl, uint64_t blkid)
84 {
85 	uintptr_t osv = (uintptr_t)os;
86 	uint64_t crc = -1ULL;
87 
88 	ASSERT(zfs_crc64_table[128] == ZFS_CRC64_POLY);
89 	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (lvl)) & 0xFF];
90 	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (osv >> 6)) & 0xFF];
91 	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (obj >> 0)) & 0xFF];
92 	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (obj >> 8)) & 0xFF];
93 	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (blkid >> 0)) & 0xFF];
94 	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (blkid >> 8)) & 0xFF];
95 
96 	crc ^= (osv>>14) ^ (obj>>16) ^ (blkid>>16);
97 
98 	return (crc);
99 }
100 
101 #define	DBUF_HASH(os, obj, level, blkid) dbuf_hash(os, obj, level, blkid);
102 
103 #define	DBUF_EQUAL(dbuf, os, obj, level, blkid)		\
104 	((dbuf)->db.db_object == (obj) &&		\
105 	(dbuf)->db_objset == (os) &&			\
106 	(dbuf)->db_level == (level) &&			\
107 	(dbuf)->db_blkid == (blkid))
108 
109 dmu_buf_impl_t *
110 dbuf_find(dnode_t *dn, uint8_t level, uint64_t blkid)
111 {
112 	dbuf_hash_table_t *h = &dbuf_hash_table;
113 	objset_impl_t *os = dn->dn_objset;
114 	uint64_t obj = dn->dn_object;
115 	uint64_t hv = DBUF_HASH(os, obj, level, blkid);
116 	uint64_t idx = hv & h->hash_table_mask;
117 	dmu_buf_impl_t *db;
118 
119 	mutex_enter(DBUF_HASH_MUTEX(h, idx));
120 	for (db = h->hash_table[idx]; db != NULL; db = db->db_hash_next) {
121 		if (DBUF_EQUAL(db, os, obj, level, blkid)) {
122 			mutex_enter(&db->db_mtx);
123 			if (db->db_state != DB_EVICTING) {
124 				mutex_exit(DBUF_HASH_MUTEX(h, idx));
125 				return (db);
126 			}
127 			mutex_exit(&db->db_mtx);
128 		}
129 	}
130 	mutex_exit(DBUF_HASH_MUTEX(h, idx));
131 	return (NULL);
132 }
133 
134 /*
135  * Insert an entry into the hash table.  If there is already an element
136  * equal to elem in the hash table, then the already existing element
137  * will be returned and the new element will not be inserted.
138  * Otherwise returns NULL.
139  */
140 static dmu_buf_impl_t *
141 dbuf_hash_insert(dmu_buf_impl_t *db)
142 {
143 	dbuf_hash_table_t *h = &dbuf_hash_table;
144 	objset_impl_t *os = db->db_objset;
145 	uint64_t obj = db->db.db_object;
146 	int level = db->db_level;
147 	uint64_t blkid = db->db_blkid;
148 	uint64_t hv = DBUF_HASH(os, obj, level, blkid);
149 	uint64_t idx = hv & h->hash_table_mask;
150 	dmu_buf_impl_t *dbf;
151 
152 	mutex_enter(DBUF_HASH_MUTEX(h, idx));
153 	for (dbf = h->hash_table[idx]; dbf != NULL; dbf = dbf->db_hash_next) {
154 		if (DBUF_EQUAL(dbf, os, obj, level, blkid)) {
155 			mutex_enter(&dbf->db_mtx);
156 			if (dbf->db_state != DB_EVICTING) {
157 				mutex_exit(DBUF_HASH_MUTEX(h, idx));
158 				return (dbf);
159 			}
160 			mutex_exit(&dbf->db_mtx);
161 		}
162 	}
163 
164 	mutex_enter(&db->db_mtx);
165 	db->db_hash_next = h->hash_table[idx];
166 	h->hash_table[idx] = db;
167 	mutex_exit(DBUF_HASH_MUTEX(h, idx));
168 	atomic_add_64(&dbuf_hash_count, 1);
169 
170 	return (NULL);
171 }
172 
173 /*
174  * Remove an entry from the hash table.  This operation will
175  * fail if there are any existing holds on the db.
176  */
177 static void
178 dbuf_hash_remove(dmu_buf_impl_t *db)
179 {
180 	dbuf_hash_table_t *h = &dbuf_hash_table;
181 	uint64_t hv = DBUF_HASH(db->db_objset, db->db.db_object,
182 	    db->db_level, db->db_blkid);
183 	uint64_t idx = hv & h->hash_table_mask;
184 	dmu_buf_impl_t *dbf, **dbp;
185 
186 	/*
187 	 * We musn't hold db_mtx to maintin lock ordering:
188 	 * DBUF_HASH_MUTEX > db_mtx.
189 	 */
190 	ASSERT(refcount_is_zero(&db->db_holds));
191 	ASSERT(db->db_state == DB_EVICTING);
192 	ASSERT(!MUTEX_HELD(&db->db_mtx));
193 
194 	mutex_enter(DBUF_HASH_MUTEX(h, idx));
195 	dbp = &h->hash_table[idx];
196 	while ((dbf = *dbp) != db) {
197 		dbp = &dbf->db_hash_next;
198 		ASSERT(dbf != NULL);
199 	}
200 	*dbp = db->db_hash_next;
201 	db->db_hash_next = NULL;
202 	mutex_exit(DBUF_HASH_MUTEX(h, idx));
203 	atomic_add_64(&dbuf_hash_count, -1);
204 }
205 
206 static arc_evict_func_t dbuf_do_evict;
207 
208 static void
209 dbuf_evict_user(dmu_buf_impl_t *db)
210 {
211 	ASSERT(MUTEX_HELD(&db->db_mtx));
212 
213 	if (db->db_level != 0 || db->db_d.db_evict_func == NULL)
214 		return;
215 
216 	if (db->db_d.db_user_data_ptr_ptr)
217 		*db->db_d.db_user_data_ptr_ptr = db->db.db_data;
218 	db->db_d.db_evict_func(&db->db, db->db_d.db_user_ptr);
219 	db->db_d.db_user_ptr = NULL;
220 	db->db_d.db_user_data_ptr_ptr = NULL;
221 	db->db_d.db_evict_func = NULL;
222 }
223 
224 void
225 dbuf_evict(dmu_buf_impl_t *db)
226 {
227 	int i;
228 
229 	ASSERT(MUTEX_HELD(&db->db_mtx));
230 	ASSERT(db->db_buf == NULL);
231 
232 #ifdef ZFS_DEBUG
233 	for (i = 0; i < TXG_SIZE; i++) {
234 		ASSERT(!list_link_active(&db->db_dirty_node[i]));
235 		ASSERT(db->db_level != 0 || db->db_d.db_data_old[i] == NULL);
236 	}
237 #endif
238 	dbuf_clear(db);
239 	dbuf_destroy(db);
240 }
241 
242 void
243 dbuf_init(void)
244 {
245 	uint64_t hsize = 1ULL << 16;
246 	dbuf_hash_table_t *h = &dbuf_hash_table;
247 	int i;
248 
249 	/*
250 	 * The hash table is big enough to fill all of physical memory
251 	 * with an average 4K block size.  The table will take up
252 	 * totalmem*sizeof(void*)/4K (i.e. 2MB/GB with 8-byte pointers).
253 	 */
254 	while (hsize * 4096 < physmem * PAGESIZE)
255 		hsize <<= 1;
256 
257 retry:
258 	h->hash_table_mask = hsize - 1;
259 	h->hash_table = kmem_zalloc(hsize * sizeof (void *), KM_NOSLEEP);
260 	if (h->hash_table == NULL) {
261 		/* XXX - we should really return an error instead of assert */
262 		ASSERT(hsize > (1ULL << 10));
263 		hsize >>= 1;
264 		goto retry;
265 	}
266 
267 	dbuf_cache = kmem_cache_create("dmu_buf_impl_t",
268 	    sizeof (dmu_buf_impl_t),
269 	    0, dbuf_cons, dbuf_dest, NULL, NULL, NULL, 0);
270 	dbuf_tq = taskq_create("dbuf_tq", 8, maxclsyspri, 50, INT_MAX,
271 	    TASKQ_PREPOPULATE);
272 
273 	for (i = 0; i < DBUF_MUTEXES; i++)
274 		mutex_init(&h->hash_mutexes[i], NULL, MUTEX_DEFAULT, NULL);
275 }
276 
277 void
278 dbuf_fini(void)
279 {
280 	dbuf_hash_table_t *h = &dbuf_hash_table;
281 	int i;
282 
283 	taskq_destroy(dbuf_tq);
284 	dbuf_tq = NULL;
285 
286 	for (i = 0; i < DBUF_MUTEXES; i++)
287 		mutex_destroy(&h->hash_mutexes[i]);
288 	kmem_free(h->hash_table, (h->hash_table_mask + 1) * sizeof (void *));
289 	kmem_cache_destroy(dbuf_cache);
290 }
291 
292 /*
293  * Other stuff.
294  */
295 
296 #ifdef ZFS_DEBUG
297 static void
298 dbuf_verify(dmu_buf_impl_t *db)
299 {
300 	int i;
301 	dnode_t *dn = db->db_dnode;
302 
303 	ASSERT(MUTEX_HELD(&db->db_mtx));
304 
305 	if (!(zfs_flags & ZFS_DEBUG_DBUF_VERIFY))
306 		return;
307 
308 	ASSERT(db->db_objset != NULL);
309 	if (dn == NULL) {
310 		ASSERT(db->db_parent == NULL);
311 		ASSERT(db->db_blkptr == NULL);
312 	} else {
313 		ASSERT3U(db->db.db_object, ==, dn->dn_object);
314 		ASSERT3P(db->db_objset, ==, dn->dn_objset);
315 		ASSERT3U(db->db_level, <, dn->dn_nlevels);
316 		ASSERT(db->db_blkid == DB_BONUS_BLKID ||
317 		    list_head(&dn->dn_dbufs));
318 	}
319 	if (db->db_blkid == DB_BONUS_BLKID) {
320 		ASSERT(dn != NULL);
321 		ASSERT3U(db->db.db_size, ==, dn->dn_bonuslen);
322 		ASSERT3U(db->db.db_offset, ==, DB_BONUS_BLKID);
323 	} else {
324 		ASSERT3U(db->db.db_offset, ==, db->db_blkid * db->db.db_size);
325 	}
326 
327 	if (db->db_level == 0) {
328 		/* we can be momentarily larger in dnode_set_blksz() */
329 		if (db->db_blkid != DB_BONUS_BLKID && dn) {
330 			ASSERT3U(db->db.db_size, >=, dn->dn_datablksz);
331 		}
332 		if (db->db.db_object == DMU_META_DNODE_OBJECT) {
333 			for (i = 0; i < TXG_SIZE; i++) {
334 				/*
335 				 * it should only be modified in syncing
336 				 * context, so make sure we only have
337 				 * one copy of the data.
338 				 */
339 				ASSERT(db->db_d.db_data_old[i] == NULL ||
340 				    db->db_d.db_data_old[i] == db->db_buf);
341 			}
342 		}
343 	}
344 
345 	/* verify db->db_blkptr */
346 	if (db->db_blkptr) {
347 		if (db->db_parent == dn->dn_dbuf) {
348 			/* db is pointed to by the dnode */
349 			/* ASSERT3U(db->db_blkid, <, dn->dn_nblkptr); */
350 			if (db->db.db_object == DMU_META_DNODE_OBJECT)
351 				ASSERT(db->db_parent == NULL);
352 			else
353 				ASSERT(db->db_parent != NULL);
354 			ASSERT3P(db->db_blkptr, ==,
355 			    &dn->dn_phys->dn_blkptr[db->db_blkid]);
356 		} else {
357 			/* db is pointed to by an indirect block */
358 			int epb = db->db_parent->db.db_size >> SPA_BLKPTRSHIFT;
359 			ASSERT3U(db->db_parent->db_level, ==, db->db_level+1);
360 			ASSERT3U(db->db_parent->db.db_object, ==,
361 			    db->db.db_object);
362 			/*
363 			 * dnode_grow_indblksz() can make this fail if we don't
364 			 * have the struct_rwlock.  XXX indblksz no longer
365 			 * grows.  safe to do this now?
366 			 */
367 			if (RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock)) {
368 				ASSERT3P(db->db_blkptr, ==,
369 				    ((blkptr_t *)db->db_parent->db.db_data +
370 				    db->db_blkid % epb));
371 			}
372 		}
373 	}
374 	if ((db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr)) &&
375 	    db->db.db_data && db->db_blkid != DB_BONUS_BLKID &&
376 	    db->db_state != DB_FILL && !dn->dn_free_txg) {
377 		/*
378 		 * If the blkptr isn't set but they have nonzero data,
379 		 * it had better be dirty, otherwise we'll lose that
380 		 * data when we evict this buffer.
381 		 */
382 		if (db->db_dirtycnt == 0) {
383 			uint64_t *buf = db->db.db_data;
384 			int i;
385 
386 			for (i = 0; i < db->db.db_size >> 3; i++) {
387 				ASSERT(buf[i] == 0);
388 			}
389 		}
390 	}
391 }
392 #endif
393 
394 static void
395 dbuf_update_data(dmu_buf_impl_t *db)
396 {
397 	ASSERT(MUTEX_HELD(&db->db_mtx));
398 	if (db->db_level == 0 && db->db_d.db_user_data_ptr_ptr) {
399 		ASSERT(!refcount_is_zero(&db->db_holds));
400 		*db->db_d.db_user_data_ptr_ptr = db->db.db_data;
401 	}
402 }
403 
404 static void
405 dbuf_set_data(dmu_buf_impl_t *db, arc_buf_t *buf)
406 {
407 	ASSERT(MUTEX_HELD(&db->db_mtx));
408 	ASSERT(db->db_buf == NULL || !arc_has_callback(db->db_buf));
409 	db->db_buf = buf;
410 	if (buf != NULL) {
411 		ASSERT(buf->b_data != NULL);
412 		db->db.db_data = buf->b_data;
413 		if (!arc_released(buf))
414 			arc_set_callback(buf, dbuf_do_evict, db);
415 		dbuf_update_data(db);
416 	} else {
417 		dbuf_evict_user(db);
418 		db->db.db_data = NULL;
419 		db->db_state = DB_UNCACHED;
420 	}
421 }
422 
423 uint64_t
424 dbuf_whichblock(dnode_t *dn, uint64_t offset)
425 {
426 	if (dn->dn_datablkshift) {
427 		return (offset >> dn->dn_datablkshift);
428 	} else {
429 		ASSERT3U(offset, <, dn->dn_datablksz);
430 		return (0);
431 	}
432 }
433 
434 static void
435 dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
436 {
437 	dmu_buf_impl_t *db = vdb;
438 
439 	mutex_enter(&db->db_mtx);
440 	ASSERT3U(db->db_state, ==, DB_READ);
441 	/*
442 	 * All reads are synchronous, so we must have a hold on the dbuf
443 	 */
444 	ASSERT(refcount_count(&db->db_holds) > 0);
445 	ASSERT(db->db_buf == NULL);
446 	ASSERT(db->db.db_data == NULL);
447 	if (db->db_level == 0 && db->db_d.db_freed_in_flight) {
448 		/* we were freed in flight; disregard any error */
449 		arc_release(buf, db);
450 		bzero(buf->b_data, db->db.db_size);
451 		arc_buf_freeze(buf);
452 		db->db_d.db_freed_in_flight = FALSE;
453 		dbuf_set_data(db, buf);
454 		db->db_state = DB_CACHED;
455 	} else if (zio == NULL || zio->io_error == 0) {
456 		dbuf_set_data(db, buf);
457 		db->db_state = DB_CACHED;
458 	} else {
459 		ASSERT(db->db_blkid != DB_BONUS_BLKID);
460 		ASSERT3P(db->db_buf, ==, NULL);
461 		VERIFY(arc_buf_remove_ref(buf, db) == 1);
462 		db->db_state = DB_UNCACHED;
463 	}
464 	cv_broadcast(&db->db_changed);
465 	mutex_exit(&db->db_mtx);
466 	dbuf_rele(db, NULL);
467 }
468 
469 static void
470 dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
471 {
472 	blkptr_t *bp;
473 	zbookmark_t zb;
474 	uint32_t aflags = ARC_NOWAIT;
475 
476 	ASSERT(!refcount_is_zero(&db->db_holds));
477 	/* We need the struct_rwlock to prevent db_blkptr from changing. */
478 	ASSERT(RW_LOCK_HELD(&db->db_dnode->dn_struct_rwlock));
479 	ASSERT(MUTEX_HELD(&db->db_mtx));
480 	ASSERT(db->db_state == DB_UNCACHED);
481 	ASSERT(db->db_buf == NULL);
482 
483 	if (db->db_blkid == DB_BONUS_BLKID) {
484 		ASSERT3U(db->db_dnode->dn_bonuslen, ==, db->db.db_size);
485 		db->db.db_data = zio_buf_alloc(DN_MAX_BONUSLEN);
486 		if (db->db.db_size < DN_MAX_BONUSLEN)
487 			bzero(db->db.db_data, DN_MAX_BONUSLEN);
488 		bcopy(DN_BONUS(db->db_dnode->dn_phys), db->db.db_data,
489 		    db->db.db_size);
490 		dbuf_update_data(db);
491 		db->db_state = DB_CACHED;
492 		mutex_exit(&db->db_mtx);
493 		return;
494 	}
495 
496 	if (db->db_level == 0 && dnode_block_freed(db->db_dnode, db->db_blkid))
497 		bp = NULL;
498 	else
499 		bp = db->db_blkptr;
500 
501 	if (bp == NULL)
502 		dprintf_dbuf(db, "blkptr: %s\n", "NULL");
503 	else
504 		dprintf_dbuf_bp(db, bp, "%s", "blkptr:");
505 
506 	if (bp == NULL || BP_IS_HOLE(bp)) {
507 		ASSERT(bp == NULL || BP_IS_HOLE(bp));
508 		dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
509 		    db->db.db_size, db));
510 		bzero(db->db.db_data, db->db.db_size);
511 		db->db_state = DB_CACHED;
512 		*flags |= DB_RF_CACHED;
513 		mutex_exit(&db->db_mtx);
514 		return;
515 	}
516 
517 	db->db_state = DB_READ;
518 	mutex_exit(&db->db_mtx);
519 
520 	zb.zb_objset = db->db_objset->os_dsl_dataset ?
521 	    db->db_objset->os_dsl_dataset->ds_object : 0;
522 	zb.zb_object = db->db.db_object;
523 	zb.zb_level = db->db_level;
524 	zb.zb_blkid = db->db_blkid;
525 
526 	dbuf_add_ref(db, NULL);
527 	/* ZIO_FLAG_CANFAIL callers have to check the parent zio's error */
528 	(void) arc_read(zio, db->db_dnode->dn_objset->os_spa, bp,
529 	    db->db_level > 0 ? byteswap_uint64_array :
530 	    dmu_ot[db->db_dnode->dn_type].ot_byteswap,
531 	    dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
532 	    (*flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
533 	    &aflags, &zb);
534 	if (aflags & ARC_CACHED)
535 		*flags |= DB_RF_CACHED;
536 }
537 
538 int
539 dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
540 {
541 	int err = 0;
542 	int havepzio = (zio != NULL);
543 	int prefetch;
544 
545 	/*
546 	 * We don't have to hold the mutex to check db_state because it
547 	 * can't be freed while we have a hold on the buffer.
548 	 */
549 	ASSERT(!refcount_is_zero(&db->db_holds));
550 
551 	if ((flags & DB_RF_HAVESTRUCT) == 0)
552 		rw_enter(&db->db_dnode->dn_struct_rwlock, RW_READER);
553 
554 	prefetch = db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID &&
555 	    (flags & DB_RF_NOPREFETCH) == 0 && db->db_dnode != NULL;
556 
557 	mutex_enter(&db->db_mtx);
558 	if (db->db_state == DB_CACHED) {
559 		mutex_exit(&db->db_mtx);
560 		if (prefetch)
561 			dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
562 			    db->db.db_size, TRUE);
563 		if ((flags & DB_RF_HAVESTRUCT) == 0)
564 			rw_exit(&db->db_dnode->dn_struct_rwlock);
565 	} else if (db->db_state == DB_UNCACHED) {
566 		if (zio == NULL) {
567 			zio = zio_root(db->db_dnode->dn_objset->os_spa,
568 			    NULL, NULL, ZIO_FLAG_CANFAIL);
569 		}
570 		dbuf_read_impl(db, zio, &flags);
571 
572 		/* dbuf_read_impl has dropped db_mtx for us */
573 
574 		if (prefetch)
575 			dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
576 			    db->db.db_size, flags & DB_RF_CACHED);
577 
578 		if ((flags & DB_RF_HAVESTRUCT) == 0)
579 			rw_exit(&db->db_dnode->dn_struct_rwlock);
580 
581 		if (!havepzio)
582 			err = zio_wait(zio);
583 	} else {
584 		mutex_exit(&db->db_mtx);
585 		if (prefetch)
586 			dmu_zfetch(&db->db_dnode->dn_zfetch, db->db.db_offset,
587 			    db->db.db_size, TRUE);
588 		if ((flags & DB_RF_HAVESTRUCT) == 0)
589 			rw_exit(&db->db_dnode->dn_struct_rwlock);
590 
591 		mutex_enter(&db->db_mtx);
592 		if ((flags & DB_RF_NEVERWAIT) == 0) {
593 			while (db->db_state == DB_READ ||
594 			    db->db_state == DB_FILL) {
595 				ASSERT(db->db_state == DB_READ ||
596 				    (flags & DB_RF_HAVESTRUCT) == 0);
597 				cv_wait(&db->db_changed, &db->db_mtx);
598 			}
599 			if (db->db_state == DB_UNCACHED)
600 				err = EIO;
601 		}
602 		mutex_exit(&db->db_mtx);
603 	}
604 
605 	ASSERT(err || havepzio || db->db_state == DB_CACHED);
606 	return (err);
607 }
608 
609 static void
610 dbuf_noread(dmu_buf_impl_t *db)
611 {
612 	ASSERT(!refcount_is_zero(&db->db_holds));
613 	ASSERT(db->db_blkid != DB_BONUS_BLKID);
614 	mutex_enter(&db->db_mtx);
615 	while (db->db_state == DB_READ || db->db_state == DB_FILL)
616 		cv_wait(&db->db_changed, &db->db_mtx);
617 	if (db->db_state == DB_UNCACHED) {
618 		ASSERT(db->db_buf == NULL);
619 		ASSERT(db->db.db_data == NULL);
620 		dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
621 		    db->db.db_size, db));
622 		db->db_state = DB_FILL;
623 	} else {
624 		ASSERT3U(db->db_state, ==, DB_CACHED);
625 	}
626 	mutex_exit(&db->db_mtx);
627 }
628 
629 /*
630  * This is our just-in-time copy function.  It makes a copy of
631  * buffers, that have been modified in a previous transaction
632  * group, before we modify them in the current active group.
633  *
634  * This function is used in two places: when we are dirtying a
635  * buffer for the first time in a txg, and when we are freeing
636  * a range in a dnode that includes this buffer.
637  *
638  * Note that when we are called from dbuf_free_range() we do
639  * not put a hold on the buffer, we just traverse the active
640  * dbuf list for the dnode.
641  */
642 static void
643 dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
644 {
645 	arc_buf_t **quiescing, **syncing;
646 
647 	ASSERT(MUTEX_HELD(&db->db_mtx));
648 	ASSERT(db->db.db_data != NULL);
649 	ASSERT(db->db_blkid != DB_BONUS_BLKID);
650 
651 	quiescing = &db->db_d.db_data_old[(txg-1)&TXG_MASK];
652 	syncing = &db->db_d.db_data_old[(txg-2)&TXG_MASK];
653 
654 	/*
655 	 * If this buffer is referenced from the current quiescing
656 	 * transaction group: either make a copy and reset the reference
657 	 * to point to the copy, or (if there a no active holders) just
658 	 * null out the current db_data pointer.
659 	 */
660 	if (*quiescing == db->db_buf) {
661 		/*
662 		 * If the quiescing txg is "dirty", then we better not
663 		 * be referencing the same buffer from the syncing txg.
664 		 */
665 		ASSERT(*syncing != db->db_buf);
666 		if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
667 			int size = db->db.db_size;
668 			*quiescing = arc_buf_alloc(
669 			    db->db_dnode->dn_objset->os_spa, size, db);
670 			bcopy(db->db.db_data, (*quiescing)->b_data, size);
671 		} else {
672 			dbuf_set_data(db, NULL);
673 		}
674 		return;
675 	}
676 
677 	/*
678 	 * If this buffer is referenced from the current syncing
679 	 * transaction group: either
680 	 *	1 - make a copy and reset the reference, or
681 	 *	2 - if there are no holders, just null the current db_data.
682 	 */
683 	if (*syncing == db->db_buf) {
684 		ASSERT3P(*quiescing, ==, NULL);
685 		ASSERT3U(db->db_dirtycnt, ==, 1);
686 		if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
687 			int size = db->db.db_size;
688 			/* we can't copy if we have already started a write */
689 			ASSERT(*syncing != db->db_data_pending);
690 			*syncing = arc_buf_alloc(
691 			    db->db_dnode->dn_objset->os_spa, size, db);
692 			bcopy(db->db.db_data, (*syncing)->b_data, size);
693 		} else {
694 			dbuf_set_data(db, NULL);
695 		}
696 	}
697 }
698 
699 /*
700  * This is the "bonus buffer" version of the above routine
701  */
702 static void
703 dbuf_fix_old_bonus_data(dmu_buf_impl_t *db, uint64_t txg)
704 {
705 	arc_buf_t **quiescing, **syncing;
706 
707 	ASSERT(MUTEX_HELD(&db->db_mtx));
708 	ASSERT(db->db.db_data != NULL);
709 	ASSERT(db->db_blkid == DB_BONUS_BLKID);
710 
711 	quiescing = &db->db_d.db_data_old[(txg-1)&TXG_MASK];
712 	syncing = &db->db_d.db_data_old[(txg-2)&TXG_MASK];
713 
714 	if (*quiescing == db->db.db_data) {
715 		ASSERT(*syncing != db->db.db_data);
716 		*quiescing = zio_buf_alloc(DN_MAX_BONUSLEN);
717 		bcopy(db->db.db_data, *quiescing, DN_MAX_BONUSLEN);
718 	} else if (*syncing == db->db.db_data) {
719 		ASSERT3P(*quiescing, ==, NULL);
720 		ASSERT3U(db->db_dirtycnt, ==, 1);
721 		*syncing = zio_buf_alloc(DN_MAX_BONUSLEN);
722 		bcopy(db->db.db_data, *syncing, DN_MAX_BONUSLEN);
723 	}
724 }
725 
726 void
727 dbuf_unoverride(dmu_buf_impl_t *db, uint64_t txg)
728 {
729 	ASSERT(db->db_blkid != DB_BONUS_BLKID);
730 	ASSERT(MUTEX_HELD(&db->db_mtx));
731 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] != IN_DMU_SYNC);
732 
733 	if (db->db_d.db_overridden_by[txg&TXG_MASK] != NULL) {
734 		/* free this block */
735 		ASSERT(list_link_active(&db->db_dirty_node[txg&TXG_MASK]) ||
736 		    db->db_dnode->dn_free_txg == txg);
737 		if (!BP_IS_HOLE(db->db_d.db_overridden_by[txg&TXG_MASK])) {
738 			/* XXX can get silent EIO here */
739 			(void) arc_free(NULL, db->db_dnode->dn_objset->os_spa,
740 			    txg, db->db_d.db_overridden_by[txg&TXG_MASK],
741 			    NULL, NULL, ARC_WAIT);
742 		}
743 		kmem_free(db->db_d.db_overridden_by[txg&TXG_MASK],
744 		    sizeof (blkptr_t));
745 		db->db_d.db_overridden_by[txg&TXG_MASK] = NULL;
746 		/*
747 		 * Release the already-written buffer, so we leave it in
748 		 * a consistent dirty state.  Note that all callers are
749 		 * modifying the buffer, so they will immediately do
750 		 * another (redundant) arc_release().  Therefore, leave
751 		 * the buf thawed to save the effort of freezing &
752 		 * immediately re-thawing it.
753 		 */
754 		arc_release(db->db_d.db_data_old[txg&TXG_MASK], db);
755 	}
756 }
757 
758 void
759 dbuf_free_range(dnode_t *dn, uint64_t blkid, uint64_t nblks, dmu_tx_t *tx)
760 {
761 	dmu_buf_impl_t *db, *db_next;
762 	uint64_t txg = tx->tx_txg;
763 
764 	dprintf_dnode(dn, "blkid=%llu nblks=%llu\n", blkid, nblks);
765 	mutex_enter(&dn->dn_dbufs_mtx);
766 	for (db = list_head(&dn->dn_dbufs); db; db = db_next) {
767 		db_next = list_next(&dn->dn_dbufs, db);
768 		ASSERT(db->db_blkid != DB_BONUS_BLKID);
769 		if (db->db_level != 0)
770 			continue;
771 		dprintf_dbuf(db, "found buf %s\n", "");
772 		if (db->db_blkid < blkid ||
773 		    db->db_blkid >= blkid+nblks)
774 			continue;
775 
776 		/* found a level 0 buffer in the range */
777 		if (dbuf_undirty(db, tx))
778 			continue;
779 
780 		mutex_enter(&db->db_mtx);
781 		if (db->db_state == DB_UNCACHED ||
782 		    db->db_state == DB_EVICTING) {
783 			ASSERT(db->db.db_data == NULL);
784 			mutex_exit(&db->db_mtx);
785 			continue;
786 		}
787 		if (db->db_state == DB_READ || db->db_state == DB_FILL) {
788 			/* will be handled in dbuf_read_done or dbuf_rele */
789 			db->db_d.db_freed_in_flight = TRUE;
790 			mutex_exit(&db->db_mtx);
791 			continue;
792 		}
793 		if (refcount_count(&db->db_holds) == 0) {
794 			ASSERT(db->db_buf);
795 			dbuf_clear(db);
796 			continue;
797 		}
798 		/* The dbuf is CACHED and referenced */
799 
800 		if (!list_link_active(&db->db_dirty_node[txg & TXG_MASK])) {
801 			/*
802 			 * This dbuf is not currently dirty.  Either
803 			 * uncache it (if its not referenced in the open
804 			 * context) or reset its contents to empty.
805 			 */
806 			dbuf_fix_old_data(db, txg);
807 		} else {
808 			if (db->db_d.db_overridden_by[txg & TXG_MASK] != NULL) {
809 				/*
810 				 * This dbuf is overridden.  Clear that state.
811 				 */
812 				dbuf_unoverride(db, txg);
813 			}
814 			if (db->db_blkid > dn->dn_maxblkid)
815 				dn->dn_maxblkid = db->db_blkid;
816 		}
817 		/* fill in with appropriate data */
818 		if (db->db_state == DB_CACHED) {
819 			ASSERT(db->db.db_data != NULL);
820 			arc_release(db->db_buf, db);
821 			bzero(db->db.db_data, db->db.db_size);
822 			arc_buf_freeze(db->db_buf);
823 		}
824 
825 		mutex_exit(&db->db_mtx);
826 	}
827 	mutex_exit(&dn->dn_dbufs_mtx);
828 }
829 
830 static int
831 dbuf_new_block(dmu_buf_impl_t *db)
832 {
833 	dsl_dataset_t *ds = db->db_objset->os_dsl_dataset;
834 	uint64_t birth_txg = 0;
835 
836 	/* Don't count meta-objects */
837 	if (ds == NULL)
838 		return (FALSE);
839 
840 	/*
841 	 * We don't need any locking to protect db_blkptr:
842 	 * If it's syncing, then db_dirtied will be set so we'll
843 	 * ignore db_blkptr.
844 	 */
845 	ASSERT(MUTEX_HELD(&db->db_mtx)); /* XXX strictly necessary? */
846 	/* If we have been dirtied since the last snapshot, its not new */
847 	if (db->db_dirtied)
848 		birth_txg = db->db_dirtied;
849 	else if (db->db_blkptr)
850 		birth_txg = db->db_blkptr->blk_birth;
851 
852 	if (birth_txg)
853 		return (!dsl_dataset_block_freeable(ds, birth_txg));
854 	else
855 		return (TRUE);
856 }
857 
858 void
859 dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
860 {
861 	arc_buf_t *buf, *obuf;
862 	int osize = db->db.db_size;
863 
864 	ASSERT(db->db_blkid != DB_BONUS_BLKID);
865 
866 	/* XXX does *this* func really need the lock? */
867 	ASSERT(RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock));
868 
869 	/*
870 	 * This call to dbuf_will_dirty() with the dn_struct_rwlock held
871 	 * is OK, because there can be no other references to the db
872 	 * when we are changing its size, so no concurrent DB_FILL can
873 	 * be happening.
874 	 */
875 	/*
876 	 * XXX we should be doing a dbuf_read, checking the return
877 	 * value and returning that up to our callers
878 	 */
879 	dbuf_will_dirty(db, tx);
880 
881 	/* create the data buffer for the new block */
882 	buf = arc_buf_alloc(db->db_dnode->dn_objset->os_spa, size, db);
883 
884 	/* copy old block data to the new block */
885 	obuf = db->db_buf;
886 	bcopy(obuf->b_data, buf->b_data, MIN(osize, size));
887 	/* zero the remainder */
888 	if (size > osize)
889 		bzero((uint8_t *)buf->b_data + osize, size - osize);
890 
891 	mutex_enter(&db->db_mtx);
892 	dbuf_set_data(db, buf);
893 	VERIFY(arc_buf_remove_ref(obuf, db) == 1);
894 	db->db.db_size = size;
895 
896 	if (db->db_level == 0)
897 		db->db_d.db_data_old[tx->tx_txg&TXG_MASK] = buf;
898 	mutex_exit(&db->db_mtx);
899 
900 	dnode_willuse_space(db->db_dnode, size-osize, tx);
901 }
902 
903 void
904 dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
905 {
906 	dnode_t *dn = db->db_dnode;
907 	objset_impl_t *os = dn->dn_objset;
908 	int drop_struct_lock = FALSE;
909 	int txgoff = tx->tx_txg & TXG_MASK;
910 
911 	ASSERT(tx->tx_txg != 0);
912 	ASSERT(!refcount_is_zero(&db->db_holds));
913 	DMU_TX_DIRTY_BUF(tx, db);
914 
915 	/*
916 	 * Shouldn't dirty a regular buffer in syncing context.  Private
917 	 * objects may be dirtied in syncing context, but only if they
918 	 * were already pre-dirtied in open context.
919 	 * XXX We may want to prohibit dirtying in syncing context even
920 	 * if they did pre-dirty.
921 	 */
922 	ASSERT(!(dmu_tx_is_syncing(tx) &&
923 	    !BP_IS_HOLE(&dn->dn_objset->os_rootbp) &&
924 	    dn->dn_object != DMU_META_DNODE_OBJECT &&
925 	    dn->dn_objset->os_dsl_dataset != NULL &&
926 	    !dsl_dir_is_private(
927 	    dn->dn_objset->os_dsl_dataset->ds_dir)));
928 
929 	/*
930 	 * We make this assert for private objects as well, but after we
931 	 * check if we're already dirty.  They are allowed to re-dirty
932 	 * in syncing context.
933 	 */
934 	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
935 	    dn->dn_dirtyctx == DN_UNDIRTIED ||
936 	    dn->dn_dirtyctx ==
937 	    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
938 
939 	mutex_enter(&db->db_mtx);
940 	/* XXX make this true for indirects too? */
941 	ASSERT(db->db_level != 0 || db->db_state == DB_CACHED ||
942 	    db->db_state == DB_FILL);
943 
944 	/*
945 	 * If this buffer is currently part of an "overridden" region,
946 	 * we now need to remove it from that region.
947 	 */
948 	if (db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID &&
949 	    db->db_d.db_overridden_by[txgoff] != NULL) {
950 		dbuf_unoverride(db, tx->tx_txg);
951 	}
952 
953 	mutex_enter(&dn->dn_mtx);
954 	/*
955 	 * Don't set dirtyctx to SYNC if we're just modifying this as we
956 	 * initialize the objset.
957 	 */
958 	if (dn->dn_dirtyctx == DN_UNDIRTIED &&
959 	    !BP_IS_HOLE(&dn->dn_objset->os_rootbp)) {
960 		dn->dn_dirtyctx =
961 		    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN);
962 		ASSERT(dn->dn_dirtyctx_firstset == NULL);
963 		dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_SLEEP);
964 	}
965 	mutex_exit(&dn->dn_mtx);
966 
967 	/*
968 	 * If this buffer is already dirty, we're done.
969 	 */
970 	if (list_link_active(&db->db_dirty_node[txgoff])) {
971 		if (db->db_blkid != DB_BONUS_BLKID && db->db_level == 0 &&
972 		    db->db.db_object != DMU_META_DNODE_OBJECT)
973 			arc_buf_thaw(db->db_buf);
974 
975 		mutex_exit(&db->db_mtx);
976 		return;
977 	}
978 
979 	/*
980 	 * Only valid if not already dirty.
981 	 */
982 	ASSERT(dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
983 	    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
984 
985 	ASSERT3U(dn->dn_nlevels, >, db->db_level);
986 	ASSERT((dn->dn_phys->dn_nlevels == 0 && db->db_level == 0) ||
987 	    dn->dn_phys->dn_nlevels > db->db_level ||
988 	    dn->dn_next_nlevels[txgoff] > db->db_level ||
989 	    dn->dn_next_nlevels[(tx->tx_txg-1) & TXG_MASK] > db->db_level ||
990 	    dn->dn_next_nlevels[(tx->tx_txg-2) & TXG_MASK] > db->db_level);
991 
992 	/*
993 	 * We should only be dirtying in syncing context if it's the
994 	 * mos, a spa os, or we're initializing the os.  However, we are
995 	 * allowed to dirty in syncing context provided we already
996 	 * dirtied it in open context.  Hence we must make this
997 	 * assertion only if we're not already dirty.
998 	 */
999 	ASSERT(!dmu_tx_is_syncing(tx) ||
1000 	    os->os_dsl_dataset == NULL ||
1001 	    !dsl_dir_is_private(os->os_dsl_dataset->ds_dir) ||
1002 	    !BP_IS_HOLE(&os->os_rootbp));
1003 	ASSERT(db->db.db_size != 0);
1004 
1005 	dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
1006 
1007 	/*
1008 	 * If this buffer is dirty in an old transaction group we need
1009 	 * to make a copy of it so that the changes we make in this
1010 	 * transaction group won't leak out when we sync the older txg.
1011 	 */
1012 	if (db->db_blkid == DB_BONUS_BLKID) {
1013 		ASSERT(db->db.db_data != NULL);
1014 		ASSERT(db->db_d.db_data_old[txgoff] == NULL);
1015 		dbuf_fix_old_bonus_data(db, tx->tx_txg);
1016 		db->db_d.db_data_old[txgoff] = db->db.db_data;
1017 	} else if (db->db_level == 0) {
1018 		/*
1019 		 * Release the data buffer from the cache so that we
1020 		 * can modify it without impacting possible other users
1021 		 * of this cached data block.  Note that indirect blocks
1022 		 * and private objects are not released until the syncing
1023 		 * state (since they are only modified then).
1024 		 */
1025 		ASSERT(db->db_buf != NULL);
1026 		ASSERT(db->db_d.db_data_old[txgoff] == NULL);
1027 		if (db->db.db_object != DMU_META_DNODE_OBJECT) {
1028 			arc_release(db->db_buf, db);
1029 			dbuf_fix_old_data(db, tx->tx_txg);
1030 			ASSERT(db->db_buf != NULL);
1031 		}
1032 		db->db_d.db_data_old[txgoff] = db->db_buf;
1033 	}
1034 
1035 	mutex_enter(&dn->dn_mtx);
1036 	/*
1037 	 * We could have been freed_in_flight between the dbuf_noread
1038 	 * and dbuf_dirty.  We win, as though the dbuf_noread() had
1039 	 * happened after the free.
1040 	 */
1041 	if (db->db_level == 0 && db->db_blkid != DB_BONUS_BLKID) {
1042 		dnode_clear_range(dn, db->db_blkid, 1, tx);
1043 		db->db_d.db_freed_in_flight = FALSE;
1044 	}
1045 
1046 	db->db_dirtied = tx->tx_txg;
1047 	list_insert_tail(&dn->dn_dirty_dbufs[txgoff], db);
1048 	mutex_exit(&dn->dn_mtx);
1049 
1050 	if (db->db_blkid != DB_BONUS_BLKID) {
1051 		/*
1052 		 * Update the accounting.
1053 		 */
1054 		if (!dbuf_new_block(db) && db->db_blkptr) {
1055 			/*
1056 			 * This is only a guess -- if the dbuf is dirty
1057 			 * in a previous txg, we don't know how much
1058 			 * space it will use on disk yet.  We should
1059 			 * really have the struct_rwlock to access
1060 			 * db_blkptr, but since this is just a guess,
1061 			 * it's OK if we get an odd answer.
1062 			 */
1063 			dnode_willuse_space(dn,
1064 			    -bp_get_dasize(os->os_spa, db->db_blkptr), tx);
1065 		}
1066 		dnode_willuse_space(dn, db->db.db_size, tx);
1067 	}
1068 
1069 	/*
1070 	 * This buffer is now part of this txg
1071 	 */
1072 	dbuf_add_ref(db, (void *)(uintptr_t)tx->tx_txg);
1073 	db->db_dirtycnt += 1;
1074 	ASSERT3U(db->db_dirtycnt, <=, 3);
1075 
1076 	mutex_exit(&db->db_mtx);
1077 
1078 	if (db->db_blkid == DB_BONUS_BLKID) {
1079 		dnode_setdirty(dn, tx);
1080 		return;
1081 	}
1082 
1083 	if (db->db_level == 0) {
1084 		dnode_new_blkid(dn, db->db_blkid, tx);
1085 		ASSERT(dn->dn_maxblkid >= db->db_blkid);
1086 	}
1087 
1088 	if (!RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
1089 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
1090 		drop_struct_lock = TRUE;
1091 	}
1092 
1093 	if (db->db_level+1 < dn->dn_nlevels) {
1094 		int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
1095 		dmu_buf_impl_t *parent;
1096 		parent = dbuf_hold_level(dn, db->db_level+1,
1097 		    db->db_blkid >> epbs, FTAG);
1098 		if (drop_struct_lock)
1099 			rw_exit(&dn->dn_struct_rwlock);
1100 		dbuf_dirty(parent, tx);
1101 		dbuf_rele(parent, FTAG);
1102 	} else {
1103 		if (drop_struct_lock)
1104 			rw_exit(&dn->dn_struct_rwlock);
1105 	}
1106 
1107 	dnode_setdirty(dn, tx);
1108 }
1109 
1110 static int
1111 dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
1112 {
1113 	dnode_t *dn = db->db_dnode;
1114 	int txgoff = tx->tx_txg & TXG_MASK;
1115 	int64_t holds;
1116 
1117 	ASSERT(tx->tx_txg != 0);
1118 	ASSERT(db->db_blkid != DB_BONUS_BLKID);
1119 
1120 	mutex_enter(&db->db_mtx);
1121 
1122 	/*
1123 	 * If this buffer is not dirty, we're done.
1124 	 */
1125 	if (!list_link_active(&db->db_dirty_node[txgoff])) {
1126 		mutex_exit(&db->db_mtx);
1127 		return (0);
1128 	}
1129 
1130 	/*
1131 	 * If this buffer is currently held, we cannot undirty
1132 	 * it, since one of the current holders may be in the
1133 	 * middle of an update.  Note that users of dbuf_undirty()
1134 	 * should not place a hold on the dbuf before the call.
1135 	 */
1136 	if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
1137 		mutex_exit(&db->db_mtx);
1138 		/* Make sure we don't toss this buffer at sync phase */
1139 		mutex_enter(&dn->dn_mtx);
1140 		dnode_clear_range(dn, db->db_blkid, 1, tx);
1141 		mutex_exit(&dn->dn_mtx);
1142 		return (0);
1143 	}
1144 
1145 	dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
1146 
1147 	dbuf_unoverride(db, tx->tx_txg);
1148 
1149 	ASSERT(db->db.db_size != 0);
1150 	if (db->db_level == 0) {
1151 		ASSERT(db->db_buf != NULL);
1152 		ASSERT(db->db_d.db_data_old[txgoff] != NULL);
1153 		if (db->db_d.db_data_old[txgoff] != db->db_buf)
1154 			VERIFY(arc_buf_remove_ref(
1155 			    db->db_d.db_data_old[txgoff], db) == 1);
1156 		db->db_d.db_data_old[txgoff] = NULL;
1157 	}
1158 
1159 	/* XXX would be nice to fix up dn_towrite_space[] */
1160 	/* XXX undo db_dirtied? but how? */
1161 	/* db->db_dirtied = tx->tx_txg; */
1162 
1163 	mutex_enter(&dn->dn_mtx);
1164 	list_remove(&dn->dn_dirty_dbufs[txgoff], db);
1165 	mutex_exit(&dn->dn_mtx);
1166 
1167 	ASSERT(db->db_dirtycnt > 0);
1168 	db->db_dirtycnt -= 1;
1169 
1170 	if ((holds = refcount_remove(&db->db_holds,
1171 	    (void *)(uintptr_t)tx->tx_txg)) == 0) {
1172 		arc_buf_t *buf = db->db_buf;
1173 
1174 		ASSERT(arc_released(buf));
1175 		dbuf_set_data(db, NULL);
1176 		VERIFY(arc_buf_remove_ref(buf, db) == 1);
1177 		dbuf_evict(db);
1178 		return (1);
1179 	}
1180 	ASSERT(holds > 0);
1181 
1182 	mutex_exit(&db->db_mtx);
1183 	return (0);
1184 }
1185 
1186 #pragma weak dmu_buf_will_dirty = dbuf_will_dirty
1187 void
1188 dbuf_will_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
1189 {
1190 	int rf = DB_RF_MUST_SUCCEED;
1191 
1192 	ASSERT(tx->tx_txg != 0);
1193 	ASSERT(!refcount_is_zero(&db->db_holds));
1194 
1195 	if (RW_WRITE_HELD(&db->db_dnode->dn_struct_rwlock))
1196 		rf |= DB_RF_HAVESTRUCT;
1197 	(void) dbuf_read(db, NULL, rf);
1198 	dbuf_dirty(db, tx);
1199 }
1200 
1201 void
1202 dmu_buf_will_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
1203 {
1204 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1205 
1206 	ASSERT(db->db_blkid != DB_BONUS_BLKID);
1207 	ASSERT(tx->tx_txg != 0);
1208 	ASSERT(db->db_level == 0);
1209 	ASSERT(!refcount_is_zero(&db->db_holds));
1210 
1211 	ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT ||
1212 	    dmu_tx_private_ok(tx));
1213 
1214 	dbuf_noread(db);
1215 	dbuf_dirty(db, tx);
1216 }
1217 
1218 #pragma weak dmu_buf_fill_done = dbuf_fill_done
1219 /* ARGSUSED */
1220 void
1221 dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
1222 {
1223 	mutex_enter(&db->db_mtx);
1224 	DBUF_VERIFY(db);
1225 
1226 	if (db->db_state == DB_FILL) {
1227 		if (db->db_level == 0 && db->db_d.db_freed_in_flight) {
1228 			ASSERT(db->db_blkid != DB_BONUS_BLKID);
1229 			/* we were freed while filling */
1230 			/* XXX dbuf_undirty? */
1231 			bzero(db->db.db_data, db->db.db_size);
1232 			db->db_d.db_freed_in_flight = FALSE;
1233 		}
1234 		db->db_state = DB_CACHED;
1235 		cv_broadcast(&db->db_changed);
1236 	}
1237 	mutex_exit(&db->db_mtx);
1238 }
1239 
1240 /*
1241  * "Clear" the contents of this dbuf.  This will mark the dbuf
1242  * EVICTING and clear *most* of its references.  Unfortunetely,
1243  * when we are not holding the dn_dbufs_mtx, we can't clear the
1244  * entry in the dn_dbufs list.  We have to wait until dbuf_destroy()
1245  * in this case.  For callers from the DMU we will usually see:
1246  *	dbuf_clear()->arc_buf_evict()->dbuf_do_evict()->dbuf_destroy()
1247  * For the arc callback, we will usually see:
1248  * 	dbuf_do_evict()->dbuf_clear();dbuf_destroy()
1249  * Sometimes, though, we will get a mix of these two:
1250  *	DMU: dbuf_clear()->arc_buf_evict()
1251  *	ARC: dbuf_do_evict()->dbuf_destroy()
1252  */
1253 void
1254 dbuf_clear(dmu_buf_impl_t *db)
1255 {
1256 	dnode_t *dn = db->db_dnode;
1257 	dmu_buf_impl_t *parent = db->db_parent;
1258 	dmu_buf_impl_t *dndb = dn->dn_dbuf;
1259 	int dbuf_gone = FALSE;
1260 
1261 	ASSERT(MUTEX_HELD(&db->db_mtx));
1262 	ASSERT(refcount_is_zero(&db->db_holds));
1263 
1264 	dbuf_evict_user(db);
1265 
1266 	if (db->db_state == DB_CACHED) {
1267 		ASSERT(db->db.db_data != NULL);
1268 		if (db->db_blkid == DB_BONUS_BLKID)
1269 			zio_buf_free(db->db.db_data, DN_MAX_BONUSLEN);
1270 		db->db.db_data = NULL;
1271 		db->db_state = DB_UNCACHED;
1272 	}
1273 
1274 	ASSERT3U(db->db_state, ==, DB_UNCACHED);
1275 	ASSERT(db->db_data_pending == NULL);
1276 
1277 	db->db_state = DB_EVICTING;
1278 	db->db_blkptr = NULL;
1279 
1280 	if (db->db_blkid != DB_BONUS_BLKID && MUTEX_HELD(&dn->dn_dbufs_mtx)) {
1281 		list_remove(&dn->dn_dbufs, db);
1282 		dnode_rele(dn, db);
1283 	}
1284 
1285 	if (db->db_buf)
1286 		dbuf_gone = arc_buf_evict(db->db_buf);
1287 
1288 	if (!dbuf_gone)
1289 		mutex_exit(&db->db_mtx);
1290 
1291 	/*
1292 	 * If this dbuf is referened from an indirect dbuf,
1293 	 * decrement the ref count on the indirect dbuf.
1294 	 */
1295 	if (parent && parent != dndb)
1296 		dbuf_rele(parent, db);
1297 }
1298 
1299 static int
1300 dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
1301     dmu_buf_impl_t **parentp, blkptr_t **bpp)
1302 {
1303 	int nlevels, epbs;
1304 
1305 	*parentp = NULL;
1306 	*bpp = NULL;
1307 
1308 	ASSERT(blkid != DB_BONUS_BLKID);
1309 
1310 	if (dn->dn_phys->dn_nlevels == 0)
1311 		nlevels = 1;
1312 	else
1313 		nlevels = dn->dn_phys->dn_nlevels;
1314 
1315 	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
1316 
1317 	ASSERT3U(level * epbs, <, 64);
1318 	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1319 	if (level >= nlevels ||
1320 	    (blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))) {
1321 		/* the buffer has no parent yet */
1322 		return (ENOENT);
1323 	} else if (level < nlevels-1) {
1324 		/* this block is referenced from an indirect block */
1325 		int err = dbuf_hold_impl(dn, level+1,
1326 		    blkid >> epbs, fail_sparse, NULL, parentp);
1327 		if (err)
1328 			return (err);
1329 		err = dbuf_read(*parentp, NULL,
1330 		    (DB_RF_HAVESTRUCT | DB_RF_NOPREFETCH | DB_RF_CANFAIL));
1331 		if (err) {
1332 			dbuf_rele(*parentp, NULL);
1333 			*parentp = NULL;
1334 			return (err);
1335 		}
1336 		*bpp = ((blkptr_t *)(*parentp)->db.db_data) +
1337 		    (blkid & ((1ULL << epbs) - 1));
1338 		return (0);
1339 	} else {
1340 		/* the block is referenced from the dnode */
1341 		ASSERT3U(level, ==, nlevels-1);
1342 		ASSERT(dn->dn_phys->dn_nblkptr == 0 ||
1343 		    blkid < dn->dn_phys->dn_nblkptr);
1344 		if (dn->dn_dbuf) {
1345 			dbuf_add_ref(dn->dn_dbuf, NULL);
1346 			*parentp = dn->dn_dbuf;
1347 		}
1348 		*bpp = &dn->dn_phys->dn_blkptr[blkid];
1349 		return (0);
1350 	}
1351 }
1352 
1353 static dmu_buf_impl_t *
1354 dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
1355     dmu_buf_impl_t *parent, blkptr_t *blkptr)
1356 {
1357 	objset_impl_t *os = dn->dn_objset;
1358 	dmu_buf_impl_t *db, *odb;
1359 
1360 	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1361 	ASSERT(dn->dn_type != DMU_OT_NONE);
1362 
1363 	db = kmem_cache_alloc(dbuf_cache, KM_SLEEP);
1364 
1365 	db->db_objset = os;
1366 	db->db.db_object = dn->dn_object;
1367 	db->db_level = level;
1368 	db->db_blkid = blkid;
1369 	db->db_dirtied = 0;
1370 	db->db_dirtycnt = 0;
1371 	db->db_dnode = dn;
1372 	db->db_parent = parent;
1373 	db->db_blkptr = blkptr;
1374 
1375 	bzero(&db->db_d, sizeof (db->db_d));
1376 
1377 	if (blkid == DB_BONUS_BLKID) {
1378 		ASSERT3P(parent, ==, dn->dn_dbuf);
1379 		db->db.db_size = dn->dn_bonuslen;
1380 		db->db.db_offset = DB_BONUS_BLKID;
1381 		db->db_state = DB_UNCACHED;
1382 		/* the bonus dbuf is not placed in the hash table */
1383 		return (db);
1384 	} else {
1385 		int blocksize =
1386 		    db->db_level ? 1<<dn->dn_indblkshift :  dn->dn_datablksz;
1387 		db->db.db_size = blocksize;
1388 		db->db.db_offset = db->db_blkid * blocksize;
1389 	}
1390 
1391 	/*
1392 	 * Hold the dn_dbufs_mtx while we get the new dbuf
1393 	 * in the hash table *and* added to the dbufs list.
1394 	 * This prevents a possible deadlock with someone
1395 	 * trying to look up this dbuf before its added to the
1396 	 * dn_dbufs list.
1397 	 */
1398 	mutex_enter(&dn->dn_dbufs_mtx);
1399 	db->db_state = DB_EVICTING;
1400 	if ((odb = dbuf_hash_insert(db)) != NULL) {
1401 		/* someone else inserted it first */
1402 		kmem_cache_free(dbuf_cache, db);
1403 		mutex_exit(&dn->dn_dbufs_mtx);
1404 		return (odb);
1405 	}
1406 	list_insert_head(&dn->dn_dbufs, db);
1407 	db->db_state = DB_UNCACHED;
1408 	mutex_exit(&dn->dn_dbufs_mtx);
1409 
1410 	if (parent && parent != dn->dn_dbuf)
1411 		dbuf_add_ref(parent, db);
1412 
1413 	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
1414 	    refcount_count(&dn->dn_holds) > 0);
1415 	(void) refcount_add(&dn->dn_holds, db);
1416 
1417 	dprintf_dbuf(db, "db=%p\n", db);
1418 
1419 	return (db);
1420 }
1421 
1422 static int
1423 dbuf_do_evict(void *private)
1424 {
1425 	arc_buf_t *buf = private;
1426 	dmu_buf_impl_t *db = buf->b_private;
1427 
1428 	if (!MUTEX_HELD(&db->db_mtx))
1429 		mutex_enter(&db->db_mtx);
1430 
1431 	ASSERT(refcount_is_zero(&db->db_holds));
1432 
1433 	if (db->db_state != DB_EVICTING) {
1434 		ASSERT(db->db_state == DB_CACHED);
1435 		DBUF_VERIFY(db);
1436 		db->db_buf = NULL;
1437 		dbuf_evict(db);
1438 	} else {
1439 		mutex_exit(&db->db_mtx);
1440 		dbuf_destroy(db);
1441 	}
1442 	return (0);
1443 }
1444 
1445 static void
1446 dbuf_destroy(dmu_buf_impl_t *db)
1447 {
1448 	ASSERT(refcount_is_zero(&db->db_holds));
1449 
1450 	if (db->db_blkid != DB_BONUS_BLKID) {
1451 		dnode_t *dn = db->db_dnode;
1452 
1453 		/*
1454 		 * If this dbuf is still on the dn_dbufs list,
1455 		 * remove it from that list.
1456 		 */
1457 		if (list_link_active(&db->db_link)) {
1458 			mutex_enter(&dn->dn_dbufs_mtx);
1459 			list_remove(&dn->dn_dbufs, db);
1460 			mutex_exit(&dn->dn_dbufs_mtx);
1461 
1462 			dnode_rele(dn, db);
1463 		}
1464 		dbuf_hash_remove(db);
1465 	}
1466 	db->db_parent = NULL;
1467 	db->db_dnode = NULL;
1468 	db->db_buf = NULL;
1469 
1470 	ASSERT(db->db.db_data == NULL);
1471 	ASSERT(db->db_hash_next == NULL);
1472 	ASSERT(db->db_blkptr == NULL);
1473 	ASSERT(db->db_data_pending == NULL);
1474 
1475 	kmem_cache_free(dbuf_cache, db);
1476 }
1477 
1478 void
1479 dbuf_prefetch(dnode_t *dn, uint64_t blkid)
1480 {
1481 	dmu_buf_impl_t *db = NULL;
1482 	blkptr_t *bp = NULL;
1483 
1484 	ASSERT(blkid != DB_BONUS_BLKID);
1485 	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1486 
1487 	if (dnode_block_freed(dn, blkid))
1488 		return;
1489 
1490 	/* dbuf_find() returns with db_mtx held */
1491 	if (db = dbuf_find(dn, 0, blkid)) {
1492 		if (refcount_count(&db->db_holds) > 0) {
1493 			/*
1494 			 * This dbuf is active.  We assume that it is
1495 			 * already CACHED, or else about to be either
1496 			 * read or filled.
1497 			 */
1498 			mutex_exit(&db->db_mtx);
1499 			return;
1500 		}
1501 		mutex_exit(&db->db_mtx);
1502 		db = NULL;
1503 	}
1504 
1505 	if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp) == 0) {
1506 		if (bp && !BP_IS_HOLE(bp)) {
1507 			uint32_t aflags = ARC_NOWAIT | ARC_PREFETCH;
1508 			zbookmark_t zb;
1509 			zb.zb_objset = dn->dn_objset->os_dsl_dataset ?
1510 			    dn->dn_objset->os_dsl_dataset->ds_object : 0;
1511 			zb.zb_object = dn->dn_object;
1512 			zb.zb_level = 0;
1513 			zb.zb_blkid = blkid;
1514 
1515 			(void) arc_read(NULL, dn->dn_objset->os_spa, bp,
1516 			    dmu_ot[dn->dn_type].ot_byteswap,
1517 			    NULL, NULL, ZIO_PRIORITY_ASYNC_READ,
1518 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
1519 			    &aflags, &zb);
1520 		}
1521 		if (db)
1522 			dbuf_rele(db, NULL);
1523 	}
1524 }
1525 
1526 /*
1527  * Returns with db_holds incremented, and db_mtx not held.
1528  * Note: dn_struct_rwlock must be held.
1529  */
1530 int
1531 dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
1532     void *tag, dmu_buf_impl_t **dbp)
1533 {
1534 	dmu_buf_impl_t *db, *parent = NULL;
1535 
1536 	ASSERT(blkid != DB_BONUS_BLKID);
1537 	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1538 	ASSERT3U(dn->dn_nlevels, >, level);
1539 
1540 	*dbp = NULL;
1541 top:
1542 	/* dbuf_find() returns with db_mtx held */
1543 	db = dbuf_find(dn, level, blkid);
1544 
1545 	if (db == NULL) {
1546 		blkptr_t *bp = NULL;
1547 		int err;
1548 
1549 		ASSERT3P(parent, ==, NULL);
1550 		err = dbuf_findbp(dn, level, blkid, fail_sparse, &parent, &bp);
1551 		if (fail_sparse) {
1552 			if (err == 0 && bp && BP_IS_HOLE(bp))
1553 				err = ENOENT;
1554 			if (err) {
1555 				if (parent)
1556 					dbuf_rele(parent, NULL);
1557 				return (err);
1558 			}
1559 		}
1560 		if (err && err != ENOENT)
1561 			return (err);
1562 		db = dbuf_create(dn, level, blkid, parent, bp);
1563 	}
1564 
1565 	if (db->db_buf && refcount_is_zero(&db->db_holds)) {
1566 		arc_buf_add_ref(db->db_buf, db);
1567 		if (db->db_buf->b_data == NULL) {
1568 			dbuf_clear(db);
1569 			if (parent) {
1570 				dbuf_rele(parent, NULL);
1571 				parent = NULL;
1572 			}
1573 			goto top;
1574 		}
1575 		ASSERT3P(db->db.db_data, ==, db->db_buf->b_data);
1576 	}
1577 
1578 	ASSERT(db->db_buf == NULL || arc_referenced(db->db_buf));
1579 
1580 	/*
1581 	 * If this buffer is currently syncing out, and we are
1582 	 * are still referencing it from db_data, we need to make
1583 	 * a copy of it in case we decide we want to dirty it
1584 	 * again in this txg.
1585 	 */
1586 	if (db->db_level == 0 && db->db_state == DB_CACHED &&
1587 	    dn->dn_object != DMU_META_DNODE_OBJECT &&
1588 	    db->db_data_pending == db->db_buf) {
1589 		int size = (db->db_blkid == DB_BONUS_BLKID) ?
1590 		    DN_MAX_BONUSLEN : db->db.db_size;
1591 
1592 		dbuf_set_data(db, arc_buf_alloc(db->db_dnode->dn_objset->os_spa,
1593 		    size, db));
1594 		bcopy(db->db_data_pending->b_data, db->db.db_data,
1595 		    db->db.db_size);
1596 	}
1597 
1598 	(void) refcount_add(&db->db_holds, tag);
1599 	dbuf_update_data(db);
1600 	DBUF_VERIFY(db);
1601 	mutex_exit(&db->db_mtx);
1602 
1603 	/* NOTE: we can't rele the parent until after we drop the db_mtx */
1604 	if (parent)
1605 		dbuf_rele(parent, NULL);
1606 
1607 	ASSERT3P(db->db_dnode, ==, dn);
1608 	ASSERT3U(db->db_blkid, ==, blkid);
1609 	ASSERT3U(db->db_level, ==, level);
1610 	*dbp = db;
1611 
1612 	return (0);
1613 }
1614 
1615 dmu_buf_impl_t *
1616 dbuf_hold(dnode_t *dn, uint64_t blkid, void *tag)
1617 {
1618 	dmu_buf_impl_t *db;
1619 	int err = dbuf_hold_impl(dn, 0, blkid, FALSE, tag, &db);
1620 	return (err ? NULL : db);
1621 }
1622 
1623 dmu_buf_impl_t *
1624 dbuf_hold_level(dnode_t *dn, int level, uint64_t blkid, void *tag)
1625 {
1626 	dmu_buf_impl_t *db;
1627 	int err = dbuf_hold_impl(dn, level, blkid, FALSE, tag, &db);
1628 	return (err ? NULL : db);
1629 }
1630 
1631 dmu_buf_impl_t *
1632 dbuf_create_bonus(dnode_t *dn)
1633 {
1634 	dmu_buf_impl_t *db = dn->dn_bonus;
1635 
1636 	ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
1637 
1638 	ASSERT(dn->dn_bonus == NULL);
1639 	db = dbuf_create(dn, 0, DB_BONUS_BLKID, dn->dn_dbuf, NULL);
1640 	return (db);
1641 }
1642 
1643 #pragma weak dmu_buf_add_ref = dbuf_add_ref
1644 void
1645 dbuf_add_ref(dmu_buf_impl_t *db, void *tag)
1646 {
1647 	int64_t holds = refcount_add(&db->db_holds, tag);
1648 	ASSERT(holds > 1);
1649 }
1650 
1651 #pragma weak dmu_buf_rele = dbuf_rele
1652 void
1653 dbuf_rele(dmu_buf_impl_t *db, void *tag)
1654 {
1655 	int64_t holds;
1656 
1657 	mutex_enter(&db->db_mtx);
1658 	DBUF_VERIFY(db);
1659 
1660 	holds = refcount_remove(&db->db_holds, tag);
1661 	ASSERT(holds >= 0);
1662 
1663 	if (db->db_buf && holds == db->db_dirtycnt)
1664 		arc_buf_freeze(db->db_buf);
1665 
1666 	if (holds == db->db_dirtycnt &&
1667 	    db->db_level == 0 && db->db_d.db_immediate_evict)
1668 		dbuf_evict_user(db);
1669 
1670 	if (holds == 0) {
1671 		if (db->db_blkid == DB_BONUS_BLKID) {
1672 			mutex_exit(&db->db_mtx);
1673 			dnode_rele(db->db_dnode, db);
1674 		} else if (db->db_buf == NULL) {
1675 			/*
1676 			 * This is a special case: we never associated this
1677 			 * dbuf with any data allocated from the ARC.
1678 			 */
1679 			ASSERT3U(db->db_state, ==, DB_UNCACHED);
1680 			dbuf_evict(db);
1681 		} else if (arc_released(db->db_buf)) {
1682 			arc_buf_t *buf = db->db_buf;
1683 			/*
1684 			 * This dbuf has anonymous data associated with it.
1685 			 */
1686 			dbuf_set_data(db, NULL);
1687 			VERIFY(arc_buf_remove_ref(buf, db) == 1);
1688 			dbuf_evict(db);
1689 		} else {
1690 			VERIFY(arc_buf_remove_ref(db->db_buf, db) == 0);
1691 			mutex_exit(&db->db_mtx);
1692 		}
1693 	} else {
1694 		mutex_exit(&db->db_mtx);
1695 	}
1696 }
1697 
1698 #pragma weak dmu_buf_refcount = dbuf_refcount
1699 uint64_t
1700 dbuf_refcount(dmu_buf_impl_t *db)
1701 {
1702 	return (refcount_count(&db->db_holds));
1703 }
1704 
1705 void *
1706 dmu_buf_set_user(dmu_buf_t *db_fake, void *user_ptr, void *user_data_ptr_ptr,
1707     dmu_buf_evict_func_t *evict_func)
1708 {
1709 	return (dmu_buf_update_user(db_fake, NULL, user_ptr,
1710 	    user_data_ptr_ptr, evict_func));
1711 }
1712 
1713 void *
1714 dmu_buf_set_user_ie(dmu_buf_t *db_fake, void *user_ptr, void *user_data_ptr_ptr,
1715     dmu_buf_evict_func_t *evict_func)
1716 {
1717 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1718 
1719 	db->db_d.db_immediate_evict = TRUE;
1720 	return (dmu_buf_update_user(db_fake, NULL, user_ptr,
1721 	    user_data_ptr_ptr, evict_func));
1722 }
1723 
1724 void *
1725 dmu_buf_update_user(dmu_buf_t *db_fake, void *old_user_ptr, void *user_ptr,
1726     void *user_data_ptr_ptr, dmu_buf_evict_func_t *evict_func)
1727 {
1728 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1729 	ASSERT(db->db_level == 0);
1730 
1731 	ASSERT((user_ptr == NULL) == (evict_func == NULL));
1732 
1733 	mutex_enter(&db->db_mtx);
1734 
1735 	if (db->db_d.db_user_ptr == old_user_ptr) {
1736 		db->db_d.db_user_ptr = user_ptr;
1737 		db->db_d.db_user_data_ptr_ptr = user_data_ptr_ptr;
1738 		db->db_d.db_evict_func = evict_func;
1739 
1740 		dbuf_update_data(db);
1741 	} else {
1742 		old_user_ptr = db->db_d.db_user_ptr;
1743 	}
1744 
1745 	mutex_exit(&db->db_mtx);
1746 	return (old_user_ptr);
1747 }
1748 
1749 void *
1750 dmu_buf_get_user(dmu_buf_t *db_fake)
1751 {
1752 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1753 	ASSERT(!refcount_is_zero(&db->db_holds));
1754 
1755 	return (db->db_d.db_user_ptr);
1756 }
1757 
1758 void
1759 dbuf_sync(dmu_buf_impl_t *db, zio_t *zio, dmu_tx_t *tx)
1760 {
1761 	arc_buf_t **data;
1762 	uint64_t txg = tx->tx_txg;
1763 	dnode_t *dn = db->db_dnode;
1764 	objset_impl_t *os = dn->dn_objset;
1765 	int epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
1766 	int checksum, compress;
1767 	zbookmark_t zb;
1768 	int blksz;
1769 
1770 	ASSERT(dmu_tx_is_syncing(tx));
1771 
1772 	dprintf_dbuf_bp(db, db->db_blkptr, "blkptr=%p", db->db_blkptr);
1773 
1774 	mutex_enter(&db->db_mtx);
1775 	/*
1776 	 * To be synced, we must be dirtied.  But we
1777 	 * might have been freed after the dirty.
1778 	 */
1779 	if (db->db_state == DB_UNCACHED) {
1780 		/* This buffer has been freed since it was dirtied */
1781 		ASSERT(db->db.db_data == NULL);
1782 	} else if (db->db_state == DB_FILL) {
1783 		/* This buffer was freed and is now being re-filled */
1784 		ASSERT(db->db.db_data != db->db_d.db_data_old[txg&TXG_MASK]);
1785 	} else {
1786 		ASSERT3U(db->db_state, ==, DB_CACHED);
1787 	}
1788 	DBUF_VERIFY(db);
1789 
1790 	/*
1791 	 * Don't need a lock on db_dirty (dn_mtx), because it can't
1792 	 * be modified yet.
1793 	 */
1794 
1795 	if (db->db_blkid == DB_BONUS_BLKID) {
1796 		arc_buf_t **datap = &db->db_d.db_data_old[txg&TXG_MASK];
1797 		/*
1798 		 * Simply copy the bonus data into the dnode.  It will
1799 		 * be written out when the dnode is synced (and it will
1800 		 * be synced, since it must have been dirty for dbuf_sync
1801 		 * to be called).
1802 		 */
1803 		/*
1804 		 * Use dn_phys->dn_bonuslen since db.db_size is the length
1805 		 * of the bonus buffer in the open transaction rather than
1806 		 * the syncing transaction.
1807 		 */
1808 		ASSERT(*datap != NULL);
1809 		ASSERT3U(db->db_level, ==, 0);
1810 		ASSERT3U(dn->dn_phys->dn_bonuslen, <=, DN_MAX_BONUSLEN);
1811 		bcopy(*datap, DN_BONUS(dn->dn_phys), dn->dn_phys->dn_bonuslen);
1812 		if (*datap != db->db.db_data)
1813 			zio_buf_free(*datap, DN_MAX_BONUSLEN);
1814 		db->db_d.db_data_old[txg&TXG_MASK] = NULL;
1815 		db->db_data_pending = NULL;
1816 		if (db->db_dirtied == txg)
1817 			db->db_dirtied = 0;
1818 		ASSERT(db->db_dirtycnt > 0);
1819 		db->db_dirtycnt -= 1;
1820 		mutex_exit(&db->db_mtx);
1821 		dbuf_rele(db, (void *)(uintptr_t)txg);
1822 		return;
1823 	}
1824 
1825 	if (db->db_level == 0) {
1826 		data = &db->db_d.db_data_old[txg&TXG_MASK];
1827 		blksz = arc_buf_size(*data);
1828 
1829 		/*
1830 		 * This buffer is in the middle of an immdiate write.
1831 		 * Wait for the synchronous IO to complete.
1832 		 */
1833 		while (db->db_d.db_overridden_by[txg&TXG_MASK] == IN_DMU_SYNC) {
1834 			ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
1835 			cv_wait(&db->db_changed, &db->db_mtx);
1836 			ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK]);
1837 		}
1838 		/*
1839 		 * If this buffer is currently "in use" (i.e., there are
1840 		 * active holds and db_data still references it), then make
1841 		 * a copy before we start the write so that any modifications
1842 		 * from the open txg will not leak into this write.
1843 		 *
1844 		 * NOTE: this copy does not need to be made for objects only
1845 		 * modified in the syncing context (e.g. DNONE_DNODE blocks)
1846 		 * or if there is no actual write involved (bonus blocks).
1847 		 */
1848 		if (dn->dn_object != DMU_META_DNODE_OBJECT &&
1849 		    db->db_d.db_overridden_by[txg&TXG_MASK] == NULL) {
1850 			if (refcount_count(&db->db_holds) > 1 &&
1851 			    *data == db->db_buf) {
1852 				*data = arc_buf_alloc(os->os_spa, blksz, db);
1853 				bcopy(db->db.db_data, (*data)->b_data, blksz);
1854 			}
1855 			db->db_data_pending = *data;
1856 		} else if (dn->dn_object == DMU_META_DNODE_OBJECT) {
1857 			/*
1858 			 * Private object buffers are released here rather
1859 			 * than in dbuf_dirty() since they are only modified
1860 			 * in the syncing context and we don't want the
1861 			 * overhead of making multiple copies of the data.
1862 			 */
1863 			arc_release(db->db_buf, db);
1864 		}
1865 	} else {
1866 		data = &db->db_buf;
1867 		if (*data == NULL) {
1868 			/*
1869 			 * This can happen if we dirty and then free
1870 			 * the level-0 data blocks in the same txg. So
1871 			 * this indirect remains unchanged.
1872 			 */
1873 			if (db->db_dirtied == txg)
1874 				db->db_dirtied = 0;
1875 			ASSERT(db->db_dirtycnt > 0);
1876 			db->db_dirtycnt -= 1;
1877 			mutex_exit(&db->db_mtx);
1878 			dbuf_rele(db, (void *)(uintptr_t)txg);
1879 			return;
1880 		}
1881 		blksz = db->db.db_size;
1882 		ASSERT3U(blksz, ==, 1<<dn->dn_phys->dn_indblkshift);
1883 	}
1884 
1885 	ASSERT(*data != NULL);
1886 
1887 	if (db->db_level > 0 && !arc_released(db->db_buf)) {
1888 		/*
1889 		 * This indirect buffer was marked dirty, but
1890 		 * never modified (if it had been modified, then
1891 		 * we would have released the buffer).  There is
1892 		 * no reason to write anything.
1893 		 */
1894 		db->db_data_pending = NULL;
1895 		if (db->db_dirtied == txg)
1896 			db->db_dirtied = 0;
1897 		ASSERT(db->db_dirtycnt > 0);
1898 		db->db_dirtycnt -= 1;
1899 		mutex_exit(&db->db_mtx);
1900 		dbuf_rele(db, (void *)(uintptr_t)txg);
1901 		return;
1902 	} else if (db->db_blkptr == NULL &&
1903 	    db->db_level == dn->dn_phys->dn_nlevels-1 &&
1904 	    db->db_blkid < dn->dn_phys->dn_nblkptr) {
1905 		/*
1906 		 * This buffer was allocated at a time when there was
1907 		 * no available blkptrs from the dnode, or it was
1908 		 * inappropriate to hook it in (i.e., nlevels mis-match).
1909 		 */
1910 		ASSERT(db->db_blkptr == NULL);
1911 		ASSERT(db->db_parent == NULL);
1912 		db->db_parent = dn->dn_dbuf;
1913 		db->db_blkptr = &dn->dn_phys->dn_blkptr[db->db_blkid];
1914 		DBUF_VERIFY(db);
1915 		mutex_exit(&db->db_mtx);
1916 	} else if (db->db_blkptr == NULL) {
1917 		dmu_buf_impl_t *parent = db->db_parent;
1918 
1919 		mutex_exit(&db->db_mtx);
1920 		ASSERT(dn->dn_phys->dn_nlevels > 1);
1921 		if (parent == NULL) {
1922 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
1923 			(void) dbuf_hold_impl(dn, db->db_level+1,
1924 			    db->db_blkid >> epbs, FALSE, FTAG, &parent);
1925 			rw_exit(&dn->dn_struct_rwlock);
1926 			dbuf_add_ref(parent, db);
1927 			db->db_parent = parent;
1928 			dbuf_rele(parent, FTAG);
1929 		}
1930 		(void) dbuf_read(parent, NULL, DB_RF_MUST_SUCCEED);
1931 	} else {
1932 		mutex_exit(&db->db_mtx);
1933 	}
1934 
1935 	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT || db->db_parent != NULL);
1936 
1937 	if (db->db_level > 0 &&
1938 	    db->db_blkid > dn->dn_phys->dn_maxblkid >> (db->db_level * epbs)) {
1939 		/*
1940 		 * Don't write indirect blocks past EOF.
1941 		 * We get these when we truncate a file *after* dirtying
1942 		 * blocks in the truncate range (we undirty the level 0
1943 		 * blocks in dbuf_free_range(), but not the indirects).
1944 		 */
1945 #ifdef ZFS_DEBUG
1946 		/*
1947 		 * Verify that this indirect block is empty.
1948 		 */
1949 		blkptr_t *bplist;
1950 		int i;
1951 
1952 		mutex_enter(&db->db_mtx);
1953 		bplist = db->db.db_data;
1954 		for (i = 0; i < (1 << epbs); i++) {
1955 			if (!BP_IS_HOLE(&bplist[i])) {
1956 				panic("data past EOF: "
1957 				    "db=%p level=%d id=%llu i=%d\n",
1958 				    db, db->db_level,
1959 				    (u_longlong_t)db->db_blkid, i);
1960 			}
1961 		}
1962 		mutex_exit(&db->db_mtx);
1963 #endif
1964 		ASSERT(db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr));
1965 		mutex_enter(&db->db_mtx);
1966 		db->db_dirtycnt -= 1;
1967 		mutex_exit(&db->db_mtx);
1968 		dbuf_rele(db, (void *)(uintptr_t)txg);
1969 		return;
1970 	}
1971 
1972 	if (db->db_parent != dn->dn_dbuf) {
1973 		dmu_buf_impl_t *parent = db->db_parent;
1974 
1975 		mutex_enter(&db->db_mtx);
1976 		ASSERT(db->db_level == parent->db_level-1);
1977 		ASSERT(list_link_active(&parent->db_dirty_node[txg&TXG_MASK]));
1978 		/*
1979 		 * We may have read this indirect block after we dirtied it,
1980 		 * so never released it from the cache.
1981 		 */
1982 		arc_release(parent->db_buf, parent);
1983 
1984 		db->db_blkptr = (blkptr_t *)parent->db.db_data +
1985 		    (db->db_blkid & ((1ULL << epbs) - 1));
1986 		DBUF_VERIFY(db);
1987 		mutex_exit(&db->db_mtx);
1988 #ifdef ZFS_DEBUG
1989 	} else {
1990 		/*
1991 		 * We don't need to dnode_setdirty(dn) because if we got
1992 		 * here then the parent is already dirty.
1993 		 */
1994 		ASSERT(db->db_level == dn->dn_phys->dn_nlevels-1);
1995 		ASSERT3P(db->db_blkptr, ==,
1996 		    &dn->dn_phys->dn_blkptr[db->db_blkid]);
1997 #endif
1998 	}
1999 	ASSERT(db->db_parent == NULL || arc_released(db->db_parent->db_buf));
2000 
2001 	if (db->db_level == 0 &&
2002 	    db->db_d.db_overridden_by[txg&TXG_MASK] != NULL) {
2003 		arc_buf_t **old = &db->db_d.db_data_old[txg&TXG_MASK];
2004 		blkptr_t **bpp = &db->db_d.db_overridden_by[txg&TXG_MASK];
2005 		int old_size = bp_get_dasize(os->os_spa, db->db_blkptr);
2006 		int new_size = bp_get_dasize(os->os_spa, *bpp);
2007 
2008 		ASSERT(db->db_blkid != DB_BONUS_BLKID);
2009 
2010 		dnode_diduse_space(dn, new_size-old_size);
2011 		mutex_enter(&dn->dn_mtx);
2012 		if (db->db_blkid > dn->dn_phys->dn_maxblkid)
2013 			dn->dn_phys->dn_maxblkid = db->db_blkid;
2014 		mutex_exit(&dn->dn_mtx);
2015 
2016 		dsl_dataset_block_born(os->os_dsl_dataset, *bpp, tx);
2017 		if (!BP_IS_HOLE(db->db_blkptr))
2018 			dsl_dataset_block_kill(os->os_dsl_dataset,
2019 			    db->db_blkptr, os->os_synctx);
2020 
2021 		mutex_enter(&db->db_mtx);
2022 		*db->db_blkptr = **bpp;
2023 		kmem_free(*bpp, sizeof (blkptr_t));
2024 		*bpp = NULL;
2025 
2026 		if (*old != db->db_buf)
2027 			VERIFY(arc_buf_remove_ref(*old, db) == 1);
2028 		else if (!BP_IS_HOLE(db->db_blkptr))
2029 			arc_set_callback(db->db_buf, dbuf_do_evict, db);
2030 		else
2031 			ASSERT(arc_released(db->db_buf));
2032 		*old = NULL;
2033 		db->db_data_pending = NULL;
2034 
2035 		cv_broadcast(&db->db_changed);
2036 
2037 		ASSERT(db->db_dirtycnt > 0);
2038 		db->db_dirtycnt -= 1;
2039 		mutex_exit(&db->db_mtx);
2040 		dbuf_rele(db, (void *)(uintptr_t)txg);
2041 		return;
2042 	}
2043 
2044 	if (db->db_level > 0) {
2045 		/*
2046 		 * XXX -- we should design a compression algorithm
2047 		 * that specializes in arrays of bps.
2048 		 */
2049 		checksum = ZIO_CHECKSUM_FLETCHER_4;
2050 		if (zfs_mdcomp_disable)
2051 			compress = ZIO_COMPRESS_EMPTY;
2052 		else
2053 			compress = ZIO_COMPRESS_LZJB;
2054 	} else {
2055 		/*
2056 		 * Allow dnode settings to override objset settings,
2057 		 * except for metadata checksums.
2058 		 */
2059 		if (dmu_ot[dn->dn_type].ot_metadata) {
2060 			checksum = os->os_md_checksum;
2061 			compress = zio_compress_select(dn->dn_compress,
2062 			    os->os_md_compress);
2063 		} else {
2064 			checksum = zio_checksum_select(dn->dn_checksum,
2065 			    os->os_checksum);
2066 			compress = zio_compress_select(dn->dn_compress,
2067 			    os->os_compress);
2068 		}
2069 	}
2070 #ifdef ZFS_DEBUG
2071 	if (db->db_parent) {
2072 		ASSERT(list_link_active(
2073 		    &db->db_parent->db_dirty_node[txg&TXG_MASK]));
2074 		ASSERT(db->db_parent == dn->dn_dbuf ||
2075 		    db->db_parent->db_level > 0);
2076 		if (dn->dn_object == DMU_META_DNODE_OBJECT || db->db_level > 0)
2077 			ASSERT(*data == db->db_buf);
2078 	}
2079 #endif
2080 	ASSERT3U(db->db_blkptr->blk_birth, <=, tx->tx_txg);
2081 	zb.zb_objset = os->os_dsl_dataset ? os->os_dsl_dataset->ds_object : 0;
2082 	zb.zb_object = db->db.db_object;
2083 	zb.zb_level = db->db_level;
2084 	zb.zb_blkid = db->db_blkid;
2085 
2086 	(void) arc_write(zio, os->os_spa, checksum, compress,
2087 	    dmu_get_replication_level(os->os_spa, &zb, dn->dn_type), txg,
2088 	    db->db_blkptr, *data, dbuf_write_done, db,
2089 	    ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, ARC_NOWAIT, &zb);
2090 	/*
2091 	 * We can't access db after arc_write, since it could finish
2092 	 * and be freed, and we have no locks on it.
2093 	 */
2094 }
2095 
2096 struct dbuf_arg {
2097 	objset_impl_t *os;
2098 	blkptr_t bp;
2099 };
2100 
2101 static void
2102 dbuf_do_born(void *arg)
2103 {
2104 	struct dbuf_arg *da = arg;
2105 	dsl_dataset_block_born(da->os->os_dsl_dataset,
2106 	    &da->bp, da->os->os_synctx);
2107 	kmem_free(da, sizeof (struct dbuf_arg));
2108 }
2109 
2110 static void
2111 dbuf_do_kill(void *arg)
2112 {
2113 	struct dbuf_arg *da = arg;
2114 	dsl_dataset_block_kill(da->os->os_dsl_dataset,
2115 	    &da->bp, da->os->os_synctx);
2116 	kmem_free(da, sizeof (struct dbuf_arg));
2117 }
2118 
2119 /* ARGSUSED */
2120 static void
2121 dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
2122 {
2123 	dmu_buf_impl_t *db = vdb;
2124 	dnode_t *dn = db->db_dnode;
2125 	objset_impl_t *os = dn->dn_objset;
2126 	uint64_t txg = zio->io_txg;
2127 	uint64_t fill = 0;
2128 	int i;
2129 	int old_size, new_size;
2130 
2131 	ASSERT3U(zio->io_error, ==, 0);
2132 
2133 	dprintf_dbuf_bp(db, &zio->io_bp_orig, "bp_orig: %s", "");
2134 
2135 	old_size = bp_get_dasize(os->os_spa, &zio->io_bp_orig);
2136 	new_size = bp_get_dasize(os->os_spa, zio->io_bp);
2137 
2138 	dnode_diduse_space(dn, new_size-old_size);
2139 
2140 	mutex_enter(&db->db_mtx);
2141 
2142 	ASSERT(db->db_d.db_overridden_by[txg&TXG_MASK] == NULL);
2143 
2144 	if (db->db_dirtied == txg)
2145 		db->db_dirtied = 0;
2146 
2147 	if (db->db_level == 0) {
2148 		arc_buf_t **old = &db->db_d.db_data_old[txg&TXG_MASK];
2149 
2150 		ASSERT(db->db_blkid != DB_BONUS_BLKID);
2151 
2152 		if (*old != db->db_buf)
2153 			VERIFY(arc_buf_remove_ref(*old, db) == 1);
2154 		else if (!BP_IS_HOLE(db->db_blkptr))
2155 			arc_set_callback(db->db_buf, dbuf_do_evict, db);
2156 		else
2157 			ASSERT(arc_released(db->db_buf));
2158 		*old = NULL;
2159 		db->db_data_pending = NULL;
2160 
2161 		mutex_enter(&dn->dn_mtx);
2162 		if (db->db_blkid > dn->dn_phys->dn_maxblkid &&
2163 		    !BP_IS_HOLE(db->db_blkptr))
2164 			dn->dn_phys->dn_maxblkid = db->db_blkid;
2165 		mutex_exit(&dn->dn_mtx);
2166 
2167 		if (dn->dn_type == DMU_OT_DNODE) {
2168 			dnode_phys_t *dnp = db->db.db_data;
2169 			for (i = db->db.db_size >> DNODE_SHIFT; i > 0;
2170 			    i--, dnp++) {
2171 				if (dnp->dn_type != DMU_OT_NONE)
2172 					fill++;
2173 			}
2174 		} else {
2175 			if (!BP_IS_HOLE(db->db_blkptr))
2176 				fill = 1;
2177 		}
2178 	} else {
2179 		blkptr_t *bp = db->db.db_data;
2180 		ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
2181 		if (!BP_IS_HOLE(db->db_blkptr)) {
2182 			int epbs =
2183 			    dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
2184 			ASSERT3U(BP_GET_LSIZE(zio->io_bp), ==, db->db.db_size);
2185 			ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
2186 			    db->db.db_size);
2187 			ASSERT3U(dn->dn_phys->dn_maxblkid
2188 			    >> (db->db_level * epbs), >=, db->db_blkid);
2189 			arc_set_callback(db->db_buf, dbuf_do_evict, db);
2190 		}
2191 		for (i = db->db.db_size >> SPA_BLKPTRSHIFT; i > 0; i--, bp++) {
2192 			if (BP_IS_HOLE(bp))
2193 				continue;
2194 			ASSERT3U(BP_GET_LSIZE(bp), ==,
2195 			    db->db_level == 1 ? dn->dn_datablksz :
2196 			    (1<<dn->dn_phys->dn_indblkshift));
2197 			fill += bp->blk_fill;
2198 		}
2199 	}
2200 
2201 	if (!BP_IS_HOLE(db->db_blkptr)) {
2202 		db->db_blkptr->blk_fill = fill;
2203 		BP_SET_TYPE(db->db_blkptr, dn->dn_type);
2204 		BP_SET_LEVEL(db->db_blkptr, db->db_level);
2205 	} else {
2206 		ASSERT3U(fill, ==, 0);
2207 		ASSERT3U(db->db_blkptr->blk_fill, ==, 0);
2208 	}
2209 
2210 	dprintf_dbuf_bp(db, db->db_blkptr,
2211 	    "wrote %llu bytes to blkptr:", zio->io_size);
2212 
2213 	ASSERT(db->db_parent == NULL ||
2214 	    list_link_active(&db->db_parent->db_dirty_node[txg&TXG_MASK]));
2215 	cv_broadcast(&db->db_changed);
2216 	ASSERT(db->db_dirtycnt > 0);
2217 	db->db_dirtycnt -= 1;
2218 	mutex_exit(&db->db_mtx);
2219 
2220 	/* We must do this after we've set the bp's type and level */
2221 	if (!DVA_EQUAL(BP_IDENTITY(zio->io_bp),
2222 	    BP_IDENTITY(&zio->io_bp_orig))) {
2223 		struct dbuf_arg *da;
2224 		da = kmem_alloc(sizeof (struct dbuf_arg), KM_SLEEP);
2225 		da->os = os;
2226 		da->bp = *zio->io_bp;
2227 		(void) taskq_dispatch(dbuf_tq, dbuf_do_born, da, 0);
2228 		if (!BP_IS_HOLE(&zio->io_bp_orig)) {
2229 			da = kmem_alloc(sizeof (struct dbuf_arg), KM_SLEEP);
2230 			da->os = os;
2231 			da->bp = zio->io_bp_orig;
2232 			(void) taskq_dispatch(dbuf_tq, dbuf_do_kill, da, 0);
2233 		}
2234 	}
2235 
2236 	dbuf_rele(db, (void *)(uintptr_t)txg);
2237 }
2238