xref: /illumos-gate/usr/src/uts/common/fs/zfs/txg.c (revision 66e150d7d3c0cb2de3c45c74612784ffd3e73de6)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/zfs_context.h>
29 #include <sys/txg_impl.h>
30 #include <sys/dmu_impl.h>
31 #include <sys/dsl_pool.h>
32 #include <sys/callb.h>
33 
34 /*
35  * Pool-wide transaction groups.
36  */
37 
38 static void txg_sync_thread(dsl_pool_t *dp);
39 static void txg_quiesce_thread(dsl_pool_t *dp);
40 
41 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
42 int zfs_txg_synctime = 5;	/* target seconds to sync a txg */
43 
44 int zfs_write_limit_shift = 3;	/* 1/8th of physical memory */
45 
46 uint64_t zfs_write_limit_min = 32 << 20; /* min write limit is 32MB */
47 uint64_t zfs_write_limit_max = 0; /* max data payload per txg */
48 uint64_t zfs_write_limit_inflated = 0;
49 
50 /*
51  * Prepare the txg subsystem.
52  */
53 void
54 txg_init(dsl_pool_t *dp, uint64_t txg)
55 {
56 	tx_state_t *tx = &dp->dp_tx;
57 	int c;
58 	bzero(tx, sizeof (tx_state_t));
59 
60 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
61 
62 	for (c = 0; c < max_ncpus; c++) {
63 		int i;
64 
65 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
66 		for (i = 0; i < TXG_SIZE; i++) {
67 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
68 			    NULL);
69 		}
70 	}
71 
72 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
73 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
74 
75 	tx->tx_open_txg = txg;
76 }
77 
78 /*
79  * Close down the txg subsystem.
80  */
81 void
82 txg_fini(dsl_pool_t *dp)
83 {
84 	tx_state_t *tx = &dp->dp_tx;
85 	int c;
86 
87 	ASSERT(tx->tx_threads == 0);
88 
89 	rw_destroy(&tx->tx_suspend);
90 	mutex_destroy(&tx->tx_sync_lock);
91 
92 	for (c = 0; c < max_ncpus; c++) {
93 		int i;
94 
95 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
96 		for (i = 0; i < TXG_SIZE; i++)
97 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
98 	}
99 
100 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
101 
102 	bzero(tx, sizeof (tx_state_t));
103 }
104 
105 /*
106  * Start syncing transaction groups.
107  */
108 void
109 txg_sync_start(dsl_pool_t *dp)
110 {
111 	tx_state_t *tx = &dp->dp_tx;
112 
113 	mutex_enter(&tx->tx_sync_lock);
114 
115 	dprintf("pool %p\n", dp);
116 
117 	ASSERT(tx->tx_threads == 0);
118 
119 	tx->tx_threads = 2;
120 
121 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
122 	    dp, 0, &p0, TS_RUN, minclsyspri);
123 
124 	/*
125 	 * The sync thread can need a larger-than-default stack size on
126 	 * 32-bit x86.  This is due in part to nested pools and
127 	 * scrub_visitbp() recursion.
128 	 */
129 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
130 	    dp, 0, &p0, TS_RUN, minclsyspri);
131 
132 	mutex_exit(&tx->tx_sync_lock);
133 }
134 
135 static void
136 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
137 {
138 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
139 	mutex_enter(&tx->tx_sync_lock);
140 }
141 
142 static void
143 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
144 {
145 	ASSERT(*tpp != NULL);
146 	*tpp = NULL;
147 	tx->tx_threads--;
148 	cv_broadcast(&tx->tx_exit_cv);
149 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
150 	thread_exit();
151 }
152 
153 static void
154 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
155 {
156 	CALLB_CPR_SAFE_BEGIN(cpr);
157 
158 	if (time)
159 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
160 	else
161 		cv_wait(cv, &tx->tx_sync_lock);
162 
163 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
164 }
165 
166 /*
167  * Stop syncing transaction groups.
168  */
169 void
170 txg_sync_stop(dsl_pool_t *dp)
171 {
172 	tx_state_t *tx = &dp->dp_tx;
173 
174 	dprintf("pool %p\n", dp);
175 	/*
176 	 * Finish off any work in progress.
177 	 */
178 	ASSERT(tx->tx_threads == 2);
179 	txg_wait_synced(dp, 0);
180 
181 	/*
182 	 * Wake all sync threads and wait for them to die.
183 	 */
184 	mutex_enter(&tx->tx_sync_lock);
185 
186 	ASSERT(tx->tx_threads == 2);
187 
188 	tx->tx_exiting = 1;
189 
190 	cv_broadcast(&tx->tx_quiesce_more_cv);
191 	cv_broadcast(&tx->tx_quiesce_done_cv);
192 	cv_broadcast(&tx->tx_sync_more_cv);
193 
194 	while (tx->tx_threads != 0)
195 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
196 
197 	tx->tx_exiting = 0;
198 
199 	mutex_exit(&tx->tx_sync_lock);
200 }
201 
202 uint64_t
203 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
204 {
205 	tx_state_t *tx = &dp->dp_tx;
206 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
207 	uint64_t txg;
208 
209 	mutex_enter(&tc->tc_lock);
210 
211 	txg = tx->tx_open_txg;
212 	tc->tc_count[txg & TXG_MASK]++;
213 
214 	th->th_cpu = tc;
215 	th->th_txg = txg;
216 
217 	return (txg);
218 }
219 
220 void
221 txg_rele_to_quiesce(txg_handle_t *th)
222 {
223 	tx_cpu_t *tc = th->th_cpu;
224 
225 	mutex_exit(&tc->tc_lock);
226 }
227 
228 void
229 txg_rele_to_sync(txg_handle_t *th)
230 {
231 	tx_cpu_t *tc = th->th_cpu;
232 	int g = th->th_txg & TXG_MASK;
233 
234 	mutex_enter(&tc->tc_lock);
235 	ASSERT(tc->tc_count[g] != 0);
236 	if (--tc->tc_count[g] == 0)
237 		cv_broadcast(&tc->tc_cv[g]);
238 	mutex_exit(&tc->tc_lock);
239 
240 	th->th_cpu = NULL;	/* defensive */
241 }
242 
243 static void
244 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
245 {
246 	tx_state_t *tx = &dp->dp_tx;
247 	int g = txg & TXG_MASK;
248 	int c;
249 
250 	/*
251 	 * Grab all tx_cpu locks so nobody else can get into this txg.
252 	 */
253 	for (c = 0; c < max_ncpus; c++)
254 		mutex_enter(&tx->tx_cpu[c].tc_lock);
255 
256 	ASSERT(txg == tx->tx_open_txg);
257 	tx->tx_open_txg++;
258 
259 	/*
260 	 * Now that we've incremented tx_open_txg, we can let threads
261 	 * enter the next transaction group.
262 	 */
263 	for (c = 0; c < max_ncpus; c++)
264 		mutex_exit(&tx->tx_cpu[c].tc_lock);
265 
266 	/*
267 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
268 	 */
269 	for (c = 0; c < max_ncpus; c++) {
270 		tx_cpu_t *tc = &tx->tx_cpu[c];
271 		mutex_enter(&tc->tc_lock);
272 		while (tc->tc_count[g] != 0)
273 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
274 		mutex_exit(&tc->tc_lock);
275 	}
276 }
277 
278 static void
279 txg_sync_thread(dsl_pool_t *dp)
280 {
281 	tx_state_t *tx = &dp->dp_tx;
282 	callb_cpr_t cpr;
283 	uint64_t timeout, start, delta, timer;
284 	int target;
285 
286 	txg_thread_enter(tx, &cpr);
287 
288 	start = delta = 0;
289 	timeout = zfs_txg_timeout * hz;
290 	for (;;) {
291 		uint64_t txg, written;
292 
293 		/*
294 		 * We sync when there's someone waiting on us, or the
295 		 * quiesce thread has handed off a txg to us, or we have
296 		 * reached our timeout.
297 		 */
298 		timer = (delta >= timeout ? 0 : timeout - delta);
299 		while (!tx->tx_exiting && timer > 0 &&
300 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
301 		    tx->tx_quiesced_txg == 0) {
302 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
303 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
304 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
305 			delta = lbolt - start;
306 			timer = (delta > timeout ? 0 : timeout - delta);
307 		}
308 
309 		/*
310 		 * Wait until the quiesce thread hands off a txg to us,
311 		 * prompting it to do so if necessary.
312 		 */
313 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
314 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
315 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
316 			cv_broadcast(&tx->tx_quiesce_more_cv);
317 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
318 		}
319 
320 		if (tx->tx_exiting)
321 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
322 
323 		rw_enter(&tx->tx_suspend, RW_WRITER);
324 
325 		/*
326 		 * Consume the quiesced txg which has been handed off to
327 		 * us.  This may cause the quiescing thread to now be
328 		 * able to quiesce another txg, so we must signal it.
329 		 */
330 		txg = tx->tx_quiesced_txg;
331 		tx->tx_quiesced_txg = 0;
332 		tx->tx_syncing_txg = txg;
333 		cv_broadcast(&tx->tx_quiesce_more_cv);
334 		rw_exit(&tx->tx_suspend);
335 
336 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
337 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
338 		mutex_exit(&tx->tx_sync_lock);
339 		start = lbolt;
340 		spa_sync(dp->dp_spa, txg);
341 		delta = (lbolt - start) + 1;
342 
343 		written = dp->dp_space_towrite[txg & TXG_MASK];
344 		dp->dp_space_towrite[txg & TXG_MASK] = 0;
345 		ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
346 
347 		/*
348 		 * If the write limit max has not been explicitly set, set it
349 		 * to a fraction of available phisical memory (default 1/8th).
350 		 * Note that we must inflate the limit because the spa
351 		 * inflates write sizes to account for data replication.
352 		 * Check this each sync phase to catch changing memory size.
353 		 */
354 		if (zfs_write_limit_inflated == 0 ||
355 		    (zfs_write_limit_shift && zfs_write_limit_max !=
356 		    physmem * PAGESIZE >> zfs_write_limit_shift)) {
357 			zfs_write_limit_max =
358 			    physmem * PAGESIZE >> zfs_write_limit_shift;
359 			zfs_write_limit_inflated =
360 			    spa_get_asize(dp->dp_spa, zfs_write_limit_max);
361 			if (zfs_write_limit_min > zfs_write_limit_inflated)
362 				zfs_write_limit_inflated = zfs_write_limit_min;
363 		}
364 
365 		/*
366 		 * Attempt to keep the sync time consistant by adjusting the
367 		 * amount of write traffic allowed into each transaction group.
368 		 */
369 		target = zfs_txg_synctime * hz;
370 		if (delta > target) {
371 			uint64_t old = MIN(dp->dp_write_limit, written);
372 
373 			dp->dp_write_limit = MAX(zfs_write_limit_min,
374 			    old * target / delta);
375 		} else if (written >= dp->dp_write_limit &&
376 		    delta >> 3 < target >> 3) {
377 			uint64_t rescale =
378 			    MIN((100 * target) / delta, 200);
379 
380 			dp->dp_write_limit = MIN(zfs_write_limit_inflated,
381 			    written * rescale / 100);
382 		}
383 
384 		mutex_enter(&tx->tx_sync_lock);
385 		rw_enter(&tx->tx_suspend, RW_WRITER);
386 		tx->tx_synced_txg = txg;
387 		tx->tx_syncing_txg = 0;
388 		rw_exit(&tx->tx_suspend);
389 		cv_broadcast(&tx->tx_sync_done_cv);
390 	}
391 }
392 
393 static void
394 txg_quiesce_thread(dsl_pool_t *dp)
395 {
396 	tx_state_t *tx = &dp->dp_tx;
397 	callb_cpr_t cpr;
398 
399 	txg_thread_enter(tx, &cpr);
400 
401 	for (;;) {
402 		uint64_t txg;
403 
404 		/*
405 		 * We quiesce when there's someone waiting on us.
406 		 * However, we can only have one txg in "quiescing" or
407 		 * "quiesced, waiting to sync" state.  So we wait until
408 		 * the "quiesced, waiting to sync" txg has been consumed
409 		 * by the sync thread.
410 		 */
411 		while (!tx->tx_exiting &&
412 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
413 		    tx->tx_quiesced_txg != 0))
414 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
415 
416 		if (tx->tx_exiting)
417 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
418 
419 		txg = tx->tx_open_txg;
420 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
421 		    txg, tx->tx_quiesce_txg_waiting,
422 		    tx->tx_sync_txg_waiting);
423 		mutex_exit(&tx->tx_sync_lock);
424 		txg_quiesce(dp, txg);
425 		mutex_enter(&tx->tx_sync_lock);
426 
427 		/*
428 		 * Hand this txg off to the sync thread.
429 		 */
430 		dprintf("quiesce done, handing off txg %llu\n", txg);
431 		tx->tx_quiesced_txg = txg;
432 		cv_broadcast(&tx->tx_sync_more_cv);
433 		cv_broadcast(&tx->tx_quiesce_done_cv);
434 	}
435 }
436 
437 /*
438  * Delay this thread by 'ticks' if we are still in the open transaction
439  * group and there is already a waiting txg quiesing or quiesced.  Abort
440  * the delay if this txg stalls or enters the quiesing state.
441  */
442 void
443 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
444 {
445 	tx_state_t *tx = &dp->dp_tx;
446 	int timeout = lbolt + ticks;
447 
448 	/* don't delay if this txg could transition to quiesing immediately */
449 	if (tx->tx_open_txg > txg ||
450 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
451 		return;
452 
453 	mutex_enter(&tx->tx_sync_lock);
454 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
455 		mutex_exit(&tx->tx_sync_lock);
456 		return;
457 	}
458 
459 	while (lbolt < timeout &&
460 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
461 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
462 		    timeout);
463 
464 	mutex_exit(&tx->tx_sync_lock);
465 }
466 
467 void
468 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
469 {
470 	tx_state_t *tx = &dp->dp_tx;
471 
472 	mutex_enter(&tx->tx_sync_lock);
473 	ASSERT(tx->tx_threads == 2);
474 	if (txg == 0)
475 		txg = tx->tx_open_txg;
476 	if (tx->tx_sync_txg_waiting < txg)
477 		tx->tx_sync_txg_waiting = txg;
478 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
479 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
480 	while (tx->tx_synced_txg < txg) {
481 		dprintf("broadcasting sync more "
482 		    "tx_synced=%llu waiting=%llu dp=%p\n",
483 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
484 		cv_broadcast(&tx->tx_sync_more_cv);
485 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
486 	}
487 	mutex_exit(&tx->tx_sync_lock);
488 }
489 
490 void
491 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
492 {
493 	tx_state_t *tx = &dp->dp_tx;
494 
495 	mutex_enter(&tx->tx_sync_lock);
496 	ASSERT(tx->tx_threads == 2);
497 	if (txg == 0)
498 		txg = tx->tx_open_txg + 1;
499 	if (tx->tx_quiesce_txg_waiting < txg)
500 		tx->tx_quiesce_txg_waiting = txg;
501 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
502 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
503 	while (tx->tx_open_txg < txg) {
504 		cv_broadcast(&tx->tx_quiesce_more_cv);
505 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
506 	}
507 	mutex_exit(&tx->tx_sync_lock);
508 }
509 
510 boolean_t
511 txg_stalled(dsl_pool_t *dp)
512 {
513 	tx_state_t *tx = &dp->dp_tx;
514 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
515 }
516 
517 boolean_t
518 txg_sync_waiting(dsl_pool_t *dp)
519 {
520 	tx_state_t *tx = &dp->dp_tx;
521 
522 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
523 	    tx->tx_quiesced_txg != 0);
524 }
525 
526 void
527 txg_suspend(dsl_pool_t *dp)
528 {
529 	tx_state_t *tx = &dp->dp_tx;
530 	/* XXX some code paths suspend when they are already suspended! */
531 	rw_enter(&tx->tx_suspend, RW_READER);
532 }
533 
534 void
535 txg_resume(dsl_pool_t *dp)
536 {
537 	tx_state_t *tx = &dp->dp_tx;
538 	rw_exit(&tx->tx_suspend);
539 }
540 
541 /*
542  * Per-txg object lists.
543  */
544 void
545 txg_list_create(txg_list_t *tl, size_t offset)
546 {
547 	int t;
548 
549 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
550 
551 	tl->tl_offset = offset;
552 
553 	for (t = 0; t < TXG_SIZE; t++)
554 		tl->tl_head[t] = NULL;
555 }
556 
557 void
558 txg_list_destroy(txg_list_t *tl)
559 {
560 	int t;
561 
562 	for (t = 0; t < TXG_SIZE; t++)
563 		ASSERT(txg_list_empty(tl, t));
564 
565 	mutex_destroy(&tl->tl_lock);
566 }
567 
568 int
569 txg_list_empty(txg_list_t *tl, uint64_t txg)
570 {
571 	return (tl->tl_head[txg & TXG_MASK] == NULL);
572 }
573 
574 /*
575  * Add an entry to the list.
576  * Returns 0 if it's a new entry, 1 if it's already there.
577  */
578 int
579 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
580 {
581 	int t = txg & TXG_MASK;
582 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
583 	int already_on_list;
584 
585 	mutex_enter(&tl->tl_lock);
586 	already_on_list = tn->tn_member[t];
587 	if (!already_on_list) {
588 		tn->tn_member[t] = 1;
589 		tn->tn_next[t] = tl->tl_head[t];
590 		tl->tl_head[t] = tn;
591 	}
592 	mutex_exit(&tl->tl_lock);
593 
594 	return (already_on_list);
595 }
596 
597 /*
598  * Remove the head of the list and return it.
599  */
600 void *
601 txg_list_remove(txg_list_t *tl, uint64_t txg)
602 {
603 	int t = txg & TXG_MASK;
604 	txg_node_t *tn;
605 	void *p = NULL;
606 
607 	mutex_enter(&tl->tl_lock);
608 	if ((tn = tl->tl_head[t]) != NULL) {
609 		p = (char *)tn - tl->tl_offset;
610 		tl->tl_head[t] = tn->tn_next[t];
611 		tn->tn_next[t] = NULL;
612 		tn->tn_member[t] = 0;
613 	}
614 	mutex_exit(&tl->tl_lock);
615 
616 	return (p);
617 }
618 
619 /*
620  * Remove a specific item from the list and return it.
621  */
622 void *
623 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
624 {
625 	int t = txg & TXG_MASK;
626 	txg_node_t *tn, **tp;
627 
628 	mutex_enter(&tl->tl_lock);
629 
630 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
631 		if ((char *)tn - tl->tl_offset == p) {
632 			*tp = tn->tn_next[t];
633 			tn->tn_next[t] = NULL;
634 			tn->tn_member[t] = 0;
635 			mutex_exit(&tl->tl_lock);
636 			return (p);
637 		}
638 	}
639 
640 	mutex_exit(&tl->tl_lock);
641 
642 	return (NULL);
643 }
644 
645 int
646 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
647 {
648 	int t = txg & TXG_MASK;
649 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
650 
651 	return (tn->tn_member[t]);
652 }
653 
654 /*
655  * Walk a txg list -- only safe if you know it's not changing.
656  */
657 void *
658 txg_list_head(txg_list_t *tl, uint64_t txg)
659 {
660 	int t = txg & TXG_MASK;
661 	txg_node_t *tn = tl->tl_head[t];
662 
663 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
664 }
665 
666 void *
667 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
668 {
669 	int t = txg & TXG_MASK;
670 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
671 
672 	tn = tn->tn_next[t];
673 
674 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
675 }
676