1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #include <sys/zfs_context.h> 27 #include <sys/txg_impl.h> 28 #include <sys/dmu_impl.h> 29 #include <sys/dmu_tx.h> 30 #include <sys/dsl_pool.h> 31 #include <sys/callb.h> 32 33 /* 34 * Pool-wide transaction groups. 35 */ 36 37 static void txg_sync_thread(dsl_pool_t *dp); 38 static void txg_quiesce_thread(dsl_pool_t *dp); 39 40 int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ 41 42 /* 43 * Prepare the txg subsystem. 44 */ 45 void 46 txg_init(dsl_pool_t *dp, uint64_t txg) 47 { 48 tx_state_t *tx = &dp->dp_tx; 49 int c; 50 bzero(tx, sizeof (tx_state_t)); 51 52 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 53 54 for (c = 0; c < max_ncpus; c++) { 55 int i; 56 57 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 58 for (i = 0; i < TXG_SIZE; i++) { 59 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 60 NULL); 61 list_create(&tx->tx_cpu[c].tc_callbacks[i], 62 sizeof (dmu_tx_callback_t), 63 offsetof(dmu_tx_callback_t, dcb_node)); 64 } 65 } 66 67 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 68 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 69 70 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 71 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 72 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 73 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 74 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 75 76 tx->tx_open_txg = txg; 77 } 78 79 /* 80 * Close down the txg subsystem. 81 */ 82 void 83 txg_fini(dsl_pool_t *dp) 84 { 85 tx_state_t *tx = &dp->dp_tx; 86 int c; 87 88 ASSERT(tx->tx_threads == 0); 89 90 rw_destroy(&tx->tx_suspend); 91 mutex_destroy(&tx->tx_sync_lock); 92 93 cv_destroy(&tx->tx_sync_more_cv); 94 cv_destroy(&tx->tx_sync_done_cv); 95 cv_destroy(&tx->tx_quiesce_more_cv); 96 cv_destroy(&tx->tx_quiesce_done_cv); 97 cv_destroy(&tx->tx_exit_cv); 98 99 for (c = 0; c < max_ncpus; c++) { 100 int i; 101 102 mutex_destroy(&tx->tx_cpu[c].tc_lock); 103 for (i = 0; i < TXG_SIZE; i++) { 104 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 105 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 106 } 107 } 108 109 if (tx->tx_commit_cb_taskq != NULL) 110 taskq_destroy(tx->tx_commit_cb_taskq); 111 112 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 113 114 bzero(tx, sizeof (tx_state_t)); 115 } 116 117 /* 118 * Start syncing transaction groups. 119 */ 120 void 121 txg_sync_start(dsl_pool_t *dp) 122 { 123 tx_state_t *tx = &dp->dp_tx; 124 125 mutex_enter(&tx->tx_sync_lock); 126 127 dprintf("pool %p\n", dp); 128 129 ASSERT(tx->tx_threads == 0); 130 131 tx->tx_threads = 2; 132 133 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 134 dp, 0, &p0, TS_RUN, minclsyspri); 135 136 /* 137 * The sync thread can need a larger-than-default stack size on 138 * 32-bit x86. This is due in part to nested pools and 139 * scrub_visitbp() recursion. 140 */ 141 tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread, 142 dp, 0, &p0, TS_RUN, minclsyspri); 143 144 mutex_exit(&tx->tx_sync_lock); 145 } 146 147 static void 148 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 149 { 150 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 151 mutex_enter(&tx->tx_sync_lock); 152 } 153 154 static void 155 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 156 { 157 ASSERT(*tpp != NULL); 158 *tpp = NULL; 159 tx->tx_threads--; 160 cv_broadcast(&tx->tx_exit_cv); 161 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 162 thread_exit(); 163 } 164 165 static void 166 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 167 { 168 CALLB_CPR_SAFE_BEGIN(cpr); 169 170 if (time) 171 (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time); 172 else 173 cv_wait(cv, &tx->tx_sync_lock); 174 175 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 176 } 177 178 /* 179 * Stop syncing transaction groups. 180 */ 181 void 182 txg_sync_stop(dsl_pool_t *dp) 183 { 184 tx_state_t *tx = &dp->dp_tx; 185 186 dprintf("pool %p\n", dp); 187 /* 188 * Finish off any work in progress. 189 */ 190 ASSERT(tx->tx_threads == 2); 191 txg_wait_synced(dp, 0); 192 193 /* 194 * Wake all sync threads and wait for them to die. 195 */ 196 mutex_enter(&tx->tx_sync_lock); 197 198 ASSERT(tx->tx_threads == 2); 199 200 tx->tx_exiting = 1; 201 202 cv_broadcast(&tx->tx_quiesce_more_cv); 203 cv_broadcast(&tx->tx_quiesce_done_cv); 204 cv_broadcast(&tx->tx_sync_more_cv); 205 206 while (tx->tx_threads != 0) 207 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 208 209 tx->tx_exiting = 0; 210 211 mutex_exit(&tx->tx_sync_lock); 212 } 213 214 uint64_t 215 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 216 { 217 tx_state_t *tx = &dp->dp_tx; 218 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 219 uint64_t txg; 220 221 mutex_enter(&tc->tc_lock); 222 223 txg = tx->tx_open_txg; 224 tc->tc_count[txg & TXG_MASK]++; 225 226 th->th_cpu = tc; 227 th->th_txg = txg; 228 229 return (txg); 230 } 231 232 void 233 txg_rele_to_quiesce(txg_handle_t *th) 234 { 235 tx_cpu_t *tc = th->th_cpu; 236 237 mutex_exit(&tc->tc_lock); 238 } 239 240 void 241 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 242 { 243 tx_cpu_t *tc = th->th_cpu; 244 int g = th->th_txg & TXG_MASK; 245 246 mutex_enter(&tc->tc_lock); 247 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 248 mutex_exit(&tc->tc_lock); 249 } 250 251 void 252 txg_rele_to_sync(txg_handle_t *th) 253 { 254 tx_cpu_t *tc = th->th_cpu; 255 int g = th->th_txg & TXG_MASK; 256 257 mutex_enter(&tc->tc_lock); 258 ASSERT(tc->tc_count[g] != 0); 259 if (--tc->tc_count[g] == 0) 260 cv_broadcast(&tc->tc_cv[g]); 261 mutex_exit(&tc->tc_lock); 262 263 th->th_cpu = NULL; /* defensive */ 264 } 265 266 static void 267 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 268 { 269 tx_state_t *tx = &dp->dp_tx; 270 int g = txg & TXG_MASK; 271 int c; 272 273 /* 274 * Grab all tx_cpu locks so nobody else can get into this txg. 275 */ 276 for (c = 0; c < max_ncpus; c++) 277 mutex_enter(&tx->tx_cpu[c].tc_lock); 278 279 ASSERT(txg == tx->tx_open_txg); 280 tx->tx_open_txg++; 281 282 /* 283 * Now that we've incremented tx_open_txg, we can let threads 284 * enter the next transaction group. 285 */ 286 for (c = 0; c < max_ncpus; c++) 287 mutex_exit(&tx->tx_cpu[c].tc_lock); 288 289 /* 290 * Quiesce the transaction group by waiting for everyone to txg_exit(). 291 */ 292 for (c = 0; c < max_ncpus; c++) { 293 tx_cpu_t *tc = &tx->tx_cpu[c]; 294 mutex_enter(&tc->tc_lock); 295 while (tc->tc_count[g] != 0) 296 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 297 mutex_exit(&tc->tc_lock); 298 } 299 } 300 301 static void 302 txg_do_callbacks(list_t *cb_list) 303 { 304 dmu_tx_do_callbacks(cb_list, 0); 305 306 list_destroy(cb_list); 307 308 kmem_free(cb_list, sizeof (list_t)); 309 } 310 311 /* 312 * Dispatch the commit callbacks registered on this txg to worker threads. 313 */ 314 static void 315 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 316 { 317 int c; 318 tx_state_t *tx = &dp->dp_tx; 319 list_t *cb_list; 320 321 for (c = 0; c < max_ncpus; c++) { 322 tx_cpu_t *tc = &tx->tx_cpu[c]; 323 /* No need to lock tx_cpu_t at this point */ 324 325 int g = txg & TXG_MASK; 326 327 if (list_is_empty(&tc->tc_callbacks[g])) 328 continue; 329 330 if (tx->tx_commit_cb_taskq == NULL) { 331 /* 332 * Commit callback taskq hasn't been created yet. 333 */ 334 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 335 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 336 TASKQ_PREPOPULATE); 337 } 338 339 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 340 list_create(cb_list, sizeof (dmu_tx_callback_t), 341 offsetof(dmu_tx_callback_t, dcb_node)); 342 343 list_move_tail(&tc->tc_callbacks[g], cb_list); 344 345 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 346 txg_do_callbacks, cb_list, TQ_SLEEP); 347 } 348 } 349 350 static void 351 txg_sync_thread(dsl_pool_t *dp) 352 { 353 tx_state_t *tx = &dp->dp_tx; 354 callb_cpr_t cpr; 355 uint64_t start, delta; 356 357 txg_thread_enter(tx, &cpr); 358 359 start = delta = 0; 360 for (;;) { 361 uint64_t timer, timeout = zfs_txg_timeout * hz; 362 uint64_t txg; 363 364 /* 365 * We sync when we're scrubbing, there's someone waiting 366 * on us, or the quiesce thread has handed off a txg to 367 * us, or we have reached our timeout. 368 */ 369 timer = (delta >= timeout ? 0 : timeout - delta); 370 while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || 371 spa_shutting_down(dp->dp_spa)) && 372 !tx->tx_exiting && timer > 0 && 373 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 374 tx->tx_quiesced_txg == 0) { 375 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 376 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 377 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 378 delta = lbolt - start; 379 timer = (delta > timeout ? 0 : timeout - delta); 380 } 381 382 /* 383 * Wait until the quiesce thread hands off a txg to us, 384 * prompting it to do so if necessary. 385 */ 386 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 387 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 388 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 389 cv_broadcast(&tx->tx_quiesce_more_cv); 390 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 391 } 392 393 if (tx->tx_exiting) 394 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 395 396 rw_enter(&tx->tx_suspend, RW_WRITER); 397 398 /* 399 * Consume the quiesced txg which has been handed off to 400 * us. This may cause the quiescing thread to now be 401 * able to quiesce another txg, so we must signal it. 402 */ 403 txg = tx->tx_quiesced_txg; 404 tx->tx_quiesced_txg = 0; 405 tx->tx_syncing_txg = txg; 406 cv_broadcast(&tx->tx_quiesce_more_cv); 407 rw_exit(&tx->tx_suspend); 408 409 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 410 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 411 mutex_exit(&tx->tx_sync_lock); 412 413 start = lbolt; 414 spa_sync(dp->dp_spa, txg); 415 delta = lbolt - start; 416 417 mutex_enter(&tx->tx_sync_lock); 418 rw_enter(&tx->tx_suspend, RW_WRITER); 419 tx->tx_synced_txg = txg; 420 tx->tx_syncing_txg = 0; 421 rw_exit(&tx->tx_suspend); 422 cv_broadcast(&tx->tx_sync_done_cv); 423 424 /* 425 * Dispatch commit callbacks to worker threads. 426 */ 427 txg_dispatch_callbacks(dp, txg); 428 } 429 } 430 431 static void 432 txg_quiesce_thread(dsl_pool_t *dp) 433 { 434 tx_state_t *tx = &dp->dp_tx; 435 callb_cpr_t cpr; 436 437 txg_thread_enter(tx, &cpr); 438 439 for (;;) { 440 uint64_t txg; 441 442 /* 443 * We quiesce when there's someone waiting on us. 444 * However, we can only have one txg in "quiescing" or 445 * "quiesced, waiting to sync" state. So we wait until 446 * the "quiesced, waiting to sync" txg has been consumed 447 * by the sync thread. 448 */ 449 while (!tx->tx_exiting && 450 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 451 tx->tx_quiesced_txg != 0)) 452 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 453 454 if (tx->tx_exiting) 455 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 456 457 txg = tx->tx_open_txg; 458 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 459 txg, tx->tx_quiesce_txg_waiting, 460 tx->tx_sync_txg_waiting); 461 mutex_exit(&tx->tx_sync_lock); 462 txg_quiesce(dp, txg); 463 mutex_enter(&tx->tx_sync_lock); 464 465 /* 466 * Hand this txg off to the sync thread. 467 */ 468 dprintf("quiesce done, handing off txg %llu\n", txg); 469 tx->tx_quiesced_txg = txg; 470 cv_broadcast(&tx->tx_sync_more_cv); 471 cv_broadcast(&tx->tx_quiesce_done_cv); 472 } 473 } 474 475 /* 476 * Delay this thread by 'ticks' if we are still in the open transaction 477 * group and there is already a waiting txg quiesing or quiesced. Abort 478 * the delay if this txg stalls or enters the quiesing state. 479 */ 480 void 481 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 482 { 483 tx_state_t *tx = &dp->dp_tx; 484 int timeout = lbolt + ticks; 485 486 /* don't delay if this txg could transition to quiesing immediately */ 487 if (tx->tx_open_txg > txg || 488 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 489 return; 490 491 mutex_enter(&tx->tx_sync_lock); 492 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 493 mutex_exit(&tx->tx_sync_lock); 494 return; 495 } 496 497 while (lbolt < timeout && 498 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 499 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 500 timeout); 501 502 mutex_exit(&tx->tx_sync_lock); 503 } 504 505 void 506 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 507 { 508 tx_state_t *tx = &dp->dp_tx; 509 510 mutex_enter(&tx->tx_sync_lock); 511 ASSERT(tx->tx_threads == 2); 512 if (txg == 0) 513 txg = tx->tx_open_txg; 514 if (tx->tx_sync_txg_waiting < txg) 515 tx->tx_sync_txg_waiting = txg; 516 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 517 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 518 while (tx->tx_synced_txg < txg) { 519 dprintf("broadcasting sync more " 520 "tx_synced=%llu waiting=%llu dp=%p\n", 521 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 522 cv_broadcast(&tx->tx_sync_more_cv); 523 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 524 } 525 mutex_exit(&tx->tx_sync_lock); 526 } 527 528 void 529 txg_wait_open(dsl_pool_t *dp, uint64_t txg) 530 { 531 tx_state_t *tx = &dp->dp_tx; 532 533 mutex_enter(&tx->tx_sync_lock); 534 ASSERT(tx->tx_threads == 2); 535 if (txg == 0) 536 txg = tx->tx_open_txg + 1; 537 if (tx->tx_quiesce_txg_waiting < txg) 538 tx->tx_quiesce_txg_waiting = txg; 539 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 540 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 541 while (tx->tx_open_txg < txg) { 542 cv_broadcast(&tx->tx_quiesce_more_cv); 543 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 544 } 545 mutex_exit(&tx->tx_sync_lock); 546 } 547 548 boolean_t 549 txg_stalled(dsl_pool_t *dp) 550 { 551 tx_state_t *tx = &dp->dp_tx; 552 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 553 } 554 555 boolean_t 556 txg_sync_waiting(dsl_pool_t *dp) 557 { 558 tx_state_t *tx = &dp->dp_tx; 559 560 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 561 tx->tx_quiesced_txg != 0); 562 } 563 564 void 565 txg_suspend(dsl_pool_t *dp) 566 { 567 tx_state_t *tx = &dp->dp_tx; 568 /* XXX some code paths suspend when they are already suspended! */ 569 rw_enter(&tx->tx_suspend, RW_READER); 570 } 571 572 void 573 txg_resume(dsl_pool_t *dp) 574 { 575 tx_state_t *tx = &dp->dp_tx; 576 rw_exit(&tx->tx_suspend); 577 } 578 579 /* 580 * Per-txg object lists. 581 */ 582 void 583 txg_list_create(txg_list_t *tl, size_t offset) 584 { 585 int t; 586 587 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 588 589 tl->tl_offset = offset; 590 591 for (t = 0; t < TXG_SIZE; t++) 592 tl->tl_head[t] = NULL; 593 } 594 595 void 596 txg_list_destroy(txg_list_t *tl) 597 { 598 int t; 599 600 for (t = 0; t < TXG_SIZE; t++) 601 ASSERT(txg_list_empty(tl, t)); 602 603 mutex_destroy(&tl->tl_lock); 604 } 605 606 int 607 txg_list_empty(txg_list_t *tl, uint64_t txg) 608 { 609 return (tl->tl_head[txg & TXG_MASK] == NULL); 610 } 611 612 /* 613 * Add an entry to the list. 614 * Returns 0 if it's a new entry, 1 if it's already there. 615 */ 616 int 617 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 618 { 619 int t = txg & TXG_MASK; 620 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 621 int already_on_list; 622 623 mutex_enter(&tl->tl_lock); 624 already_on_list = tn->tn_member[t]; 625 if (!already_on_list) { 626 tn->tn_member[t] = 1; 627 tn->tn_next[t] = tl->tl_head[t]; 628 tl->tl_head[t] = tn; 629 } 630 mutex_exit(&tl->tl_lock); 631 632 return (already_on_list); 633 } 634 635 /* 636 * Remove the head of the list and return it. 637 */ 638 void * 639 txg_list_remove(txg_list_t *tl, uint64_t txg) 640 { 641 int t = txg & TXG_MASK; 642 txg_node_t *tn; 643 void *p = NULL; 644 645 mutex_enter(&tl->tl_lock); 646 if ((tn = tl->tl_head[t]) != NULL) { 647 p = (char *)tn - tl->tl_offset; 648 tl->tl_head[t] = tn->tn_next[t]; 649 tn->tn_next[t] = NULL; 650 tn->tn_member[t] = 0; 651 } 652 mutex_exit(&tl->tl_lock); 653 654 return (p); 655 } 656 657 /* 658 * Remove a specific item from the list and return it. 659 */ 660 void * 661 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 662 { 663 int t = txg & TXG_MASK; 664 txg_node_t *tn, **tp; 665 666 mutex_enter(&tl->tl_lock); 667 668 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 669 if ((char *)tn - tl->tl_offset == p) { 670 *tp = tn->tn_next[t]; 671 tn->tn_next[t] = NULL; 672 tn->tn_member[t] = 0; 673 mutex_exit(&tl->tl_lock); 674 return (p); 675 } 676 } 677 678 mutex_exit(&tl->tl_lock); 679 680 return (NULL); 681 } 682 683 int 684 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 685 { 686 int t = txg & TXG_MASK; 687 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 688 689 return (tn->tn_member[t]); 690 } 691 692 /* 693 * Walk a txg list -- only safe if you know it's not changing. 694 */ 695 void * 696 txg_list_head(txg_list_t *tl, uint64_t txg) 697 { 698 int t = txg & TXG_MASK; 699 txg_node_t *tn = tl->tl_head[t]; 700 701 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 702 } 703 704 void * 705 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 706 { 707 int t = txg & TXG_MASK; 708 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 709 710 tn = tn->tn_next[t]; 711 712 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 713 } 714