1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * The contents of this file are subject to the terms of the 6 * Common Development and Distribution License (the "License"). 7 * You may not use this file except in compliance with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or https://opensource.org/licenses/CDDL-1.0. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 24 * Portions Copyright 2011 Martin Matuska 25 * Copyright (c) 2012, 2019 by Delphix. All rights reserved. 26 * Copyright (c) 2025, Klara, Inc. 27 */ 28 29 #include <sys/zfs_context.h> 30 #include <sys/txg_impl.h> 31 #include <sys/dmu_impl.h> 32 #include <sys/spa_impl.h> 33 #include <sys/dmu_tx.h> 34 #include <sys/dsl_pool.h> 35 #include <sys/dsl_scan.h> 36 #include <sys/zil.h> 37 #include <sys/callb.h> 38 #include <sys/trace_zfs.h> 39 40 /* 41 * ZFS Transaction Groups 42 * ---------------------- 43 * 44 * ZFS transaction groups are, as the name implies, groups of transactions 45 * that act on persistent state. ZFS asserts consistency at the granularity of 46 * these transaction groups. Each successive transaction group (txg) is 47 * assigned a 64-bit consecutive identifier. There are three active 48 * transaction group states: open, quiescing, or syncing. At any given time, 49 * there may be an active txg associated with each state; each active txg may 50 * either be processing, or blocked waiting to enter the next state. There may 51 * be up to three active txgs, and there is always a txg in the open state 52 * (though it may be blocked waiting to enter the quiescing state). In broad 53 * strokes, transactions -- operations that change in-memory structures -- are 54 * accepted into the txg in the open state, and are completed while the txg is 55 * in the open or quiescing states. The accumulated changes are written to 56 * disk in the syncing state. 57 * 58 * Open 59 * 60 * When a new txg becomes active, it first enters the open state. New 61 * transactions -- updates to in-memory structures -- are assigned to the 62 * currently open txg. There is always a txg in the open state so that ZFS can 63 * accept new changes (though the txg may refuse new changes if it has hit 64 * some limit). ZFS advances the open txg to the next state for a variety of 65 * reasons such as it hitting a time or size threshold, or the execution of an 66 * administrative action that must be completed in the syncing state. 67 * 68 * Quiescing 69 * 70 * After a txg exits the open state, it enters the quiescing state. The 71 * quiescing state is intended to provide a buffer between accepting new 72 * transactions in the open state and writing them out to stable storage in 73 * the syncing state. While quiescing, transactions can continue their 74 * operation without delaying either of the other states. Typically, a txg is 75 * in the quiescing state very briefly since the operations are bounded by 76 * software latencies rather than, say, slower I/O latencies. After all 77 * transactions complete, the txg is ready to enter the next state. 78 * 79 * Syncing 80 * 81 * In the syncing state, the in-memory state built up during the open and (to 82 * a lesser degree) the quiescing states is written to stable storage. The 83 * process of writing out modified data can, in turn modify more data. For 84 * example when we write new blocks, we need to allocate space for them; those 85 * allocations modify metadata (space maps)... which themselves must be 86 * written to stable storage. During the sync state, ZFS iterates, writing out 87 * data until it converges and all in-memory changes have been written out. 88 * The first such pass is the largest as it encompasses all the modified user 89 * data (as opposed to filesystem metadata). Subsequent passes typically have 90 * far less data to write as they consist exclusively of filesystem metadata. 91 * 92 * To ensure convergence, after a certain number of passes ZFS begins 93 * overwriting locations on stable storage that had been allocated earlier in 94 * the syncing state (and subsequently freed). ZFS usually allocates new 95 * blocks to optimize for large, continuous, writes. For the syncing state to 96 * converge however it must complete a pass where no new blocks are allocated 97 * since each allocation requires a modification of persistent metadata. 98 * Further, to hasten convergence, after a prescribed number of passes, ZFS 99 * also defers frees, and stops compressing. 100 * 101 * In addition to writing out user data, we must also execute synctasks during 102 * the syncing context. A synctask is the mechanism by which some 103 * administrative activities work such as creating and destroying snapshots or 104 * datasets. Note that when a synctask is initiated it enters the open txg, 105 * and ZFS then pushes that txg as quickly as possible to completion of the 106 * syncing state in order to reduce the latency of the administrative 107 * activity. To complete the syncing state, ZFS writes out a new uberblock, 108 * the root of the tree of blocks that comprise all state stored on the ZFS 109 * pool. Finally, if there is a quiesced txg waiting, we signal that it can 110 * now transition to the syncing state. 111 */ 112 113 static __attribute__((noreturn)) void txg_sync_thread(void *arg); 114 static __attribute__((noreturn)) void txg_quiesce_thread(void *arg); 115 116 uint_t zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 117 118 /* 119 * Prepare the txg subsystem. 120 */ 121 void 122 txg_init(dsl_pool_t *dp, uint64_t txg) 123 { 124 tx_state_t *tx = &dp->dp_tx; 125 int c; 126 memset(tx, 0, sizeof (tx_state_t)); 127 128 tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 129 130 for (c = 0; c < max_ncpus; c++) { 131 int i; 132 133 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 134 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_NOLOCKDEP, 135 NULL); 136 for (i = 0; i < TXG_SIZE; i++) { 137 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 138 NULL); 139 list_create(&tx->tx_cpu[c].tc_callbacks[i], 140 sizeof (dmu_tx_callback_t), 141 offsetof(dmu_tx_callback_t, dcb_node)); 142 } 143 } 144 145 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 146 147 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 148 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 149 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 150 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 151 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 152 153 tx->tx_open_txg = txg; 154 } 155 156 /* 157 * Close down the txg subsystem. 158 */ 159 void 160 txg_fini(dsl_pool_t *dp) 161 { 162 tx_state_t *tx = &dp->dp_tx; 163 int c; 164 165 ASSERT0(tx->tx_threads); 166 167 mutex_destroy(&tx->tx_sync_lock); 168 169 cv_destroy(&tx->tx_sync_more_cv); 170 cv_destroy(&tx->tx_sync_done_cv); 171 cv_destroy(&tx->tx_quiesce_more_cv); 172 cv_destroy(&tx->tx_quiesce_done_cv); 173 cv_destroy(&tx->tx_exit_cv); 174 175 for (c = 0; c < max_ncpus; c++) { 176 int i; 177 178 mutex_destroy(&tx->tx_cpu[c].tc_open_lock); 179 mutex_destroy(&tx->tx_cpu[c].tc_lock); 180 for (i = 0; i < TXG_SIZE; i++) { 181 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 182 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 183 } 184 } 185 186 if (tx->tx_commit_cb_taskq != NULL) 187 taskq_destroy(tx->tx_commit_cb_taskq); 188 189 vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 190 191 memset(tx, 0, sizeof (tx_state_t)); 192 } 193 194 /* 195 * Start syncing transaction groups. 196 */ 197 void 198 txg_sync_start(dsl_pool_t *dp) 199 { 200 tx_state_t *tx = &dp->dp_tx; 201 202 mutex_enter(&tx->tx_sync_lock); 203 204 dprintf("pool %p\n", dp); 205 206 ASSERT0(tx->tx_threads); 207 208 tx->tx_threads = 2; 209 210 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 211 dp, 0, &p0, TS_RUN, defclsyspri); 212 213 /* 214 * The sync thread can need a larger-than-default stack size on 215 * 32-bit x86. This is due in part to nested pools and 216 * scrub_visitbp() recursion. 217 */ 218 tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 219 dp, 0, &p0, TS_RUN, defclsyspri); 220 221 mutex_exit(&tx->tx_sync_lock); 222 } 223 224 static void 225 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 226 { 227 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 228 mutex_enter(&tx->tx_sync_lock); 229 } 230 231 static void 232 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 233 { 234 ASSERT(*tpp != NULL); 235 *tpp = NULL; 236 tx->tx_threads--; 237 cv_broadcast(&tx->tx_exit_cv); 238 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 239 thread_exit(); 240 } 241 242 static void 243 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time) 244 { 245 CALLB_CPR_SAFE_BEGIN(cpr); 246 247 if (time) { 248 (void) cv_timedwait_idle(cv, &tx->tx_sync_lock, 249 ddi_get_lbolt() + time); 250 } else { 251 cv_wait_idle(cv, &tx->tx_sync_lock); 252 } 253 254 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 255 } 256 257 /* 258 * Stop syncing transaction groups. 259 */ 260 void 261 txg_sync_stop(dsl_pool_t *dp) 262 { 263 tx_state_t *tx = &dp->dp_tx; 264 265 dprintf("pool %p\n", dp); 266 /* 267 * Finish off any work in progress. 268 */ 269 ASSERT3U(tx->tx_threads, ==, 2); 270 271 /* 272 * We need to ensure that we've vacated the deferred metaslab trees. 273 */ 274 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 275 276 /* 277 * Wake all sync threads and wait for them to die. 278 */ 279 mutex_enter(&tx->tx_sync_lock); 280 281 ASSERT3U(tx->tx_threads, ==, 2); 282 283 tx->tx_exiting = 1; 284 285 cv_broadcast(&tx->tx_quiesce_more_cv); 286 cv_broadcast(&tx->tx_quiesce_done_cv); 287 cv_broadcast(&tx->tx_sync_more_cv); 288 289 while (tx->tx_threads != 0) 290 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 291 292 tx->tx_exiting = 0; 293 294 mutex_exit(&tx->tx_sync_lock); 295 } 296 297 /* 298 * Get a handle on the currently open txg and keep it open. 299 * 300 * The txg is guaranteed to stay open until txg_rele_to_quiesce() is called for 301 * the handle. Once txg_rele_to_quiesce() has been called, the txg stays 302 * in quiescing state until txg_rele_to_sync() is called for the handle. 303 * 304 * It is guaranteed that subsequent calls return monotonically increasing 305 * txgs for the same dsl_pool_t. Of course this is not strong monotonicity, 306 * because the same txg can be returned multiple times in a row. This 307 * guarantee holds both for subsequent calls from one thread and for multiple 308 * threads. For example, it is impossible to observe the following sequence 309 * of events: 310 * 311 * Thread 1 Thread 2 312 * 313 * 1 <- txg_hold_open(P, ...) 314 * 2 <- txg_hold_open(P, ...) 315 * 1 <- txg_hold_open(P, ...) 316 * 317 */ 318 uint64_t 319 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 320 { 321 tx_state_t *tx = &dp->dp_tx; 322 tx_cpu_t *tc; 323 uint64_t txg; 324 325 /* 326 * It appears the processor id is simply used as a "random" 327 * number to index into the array, and there isn't any other 328 * significance to the chosen tx_cpu. Because.. Why not use 329 * the current cpu to index into the array? 330 */ 331 tc = &tx->tx_cpu[CPU_SEQID_UNSTABLE]; 332 333 mutex_enter(&tc->tc_open_lock); 334 txg = tx->tx_open_txg; 335 336 mutex_enter(&tc->tc_lock); 337 tc->tc_count[txg & TXG_MASK]++; 338 mutex_exit(&tc->tc_lock); 339 340 th->th_cpu = tc; 341 th->th_txg = txg; 342 343 return (txg); 344 } 345 346 void 347 txg_rele_to_quiesce(txg_handle_t *th) 348 { 349 tx_cpu_t *tc = th->th_cpu; 350 351 ASSERT(!MUTEX_HELD(&tc->tc_lock)); 352 mutex_exit(&tc->tc_open_lock); 353 } 354 355 void 356 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 357 { 358 tx_cpu_t *tc = th->th_cpu; 359 int g = th->th_txg & TXG_MASK; 360 361 mutex_enter(&tc->tc_lock); 362 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 363 mutex_exit(&tc->tc_lock); 364 } 365 366 void 367 txg_rele_to_sync(txg_handle_t *th) 368 { 369 tx_cpu_t *tc = th->th_cpu; 370 int g = th->th_txg & TXG_MASK; 371 372 mutex_enter(&tc->tc_lock); 373 ASSERT(tc->tc_count[g] != 0); 374 if (--tc->tc_count[g] == 0) 375 cv_broadcast(&tc->tc_cv[g]); 376 mutex_exit(&tc->tc_lock); 377 378 th->th_cpu = NULL; /* defensive */ 379 } 380 381 /* 382 * Blocks until all transactions in the group are committed. 383 * 384 * On return, the transaction group has reached a stable state in which it can 385 * then be passed off to the syncing context. 386 */ 387 static void 388 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 389 { 390 tx_state_t *tx = &dp->dp_tx; 391 uint64_t tx_open_time; 392 int g = txg & TXG_MASK; 393 int c; 394 395 /* 396 * Grab all tc_open_locks so nobody else can get into this txg. 397 */ 398 for (c = 0; c < max_ncpus; c++) 399 mutex_enter(&tx->tx_cpu[c].tc_open_lock); 400 401 ASSERT(txg == tx->tx_open_txg); 402 tx->tx_open_txg++; 403 tx->tx_open_time = tx_open_time = gethrtime(); 404 405 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg); 406 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg); 407 408 /* 409 * Now that we've incremented tx_open_txg, we can let threads 410 * enter the next transaction group. 411 */ 412 for (c = 0; c < max_ncpus; c++) 413 mutex_exit(&tx->tx_cpu[c].tc_open_lock); 414 415 spa_txg_history_set(dp->dp_spa, txg, TXG_STATE_OPEN, tx_open_time); 416 spa_txg_history_add(dp->dp_spa, txg + 1, tx_open_time); 417 418 /* 419 * Quiesce the transaction group by waiting for everyone to 420 * call txg_rele_to_sync() for their open transaction handles. 421 */ 422 for (c = 0; c < max_ncpus; c++) { 423 tx_cpu_t *tc = &tx->tx_cpu[c]; 424 mutex_enter(&tc->tc_lock); 425 while (tc->tc_count[g] != 0) 426 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 427 mutex_exit(&tc->tc_lock); 428 } 429 430 spa_txg_history_set(dp->dp_spa, txg, TXG_STATE_QUIESCED, gethrtime()); 431 } 432 433 static void 434 txg_do_callbacks(void *cb_list) 435 { 436 dmu_tx_do_callbacks(cb_list, 0); 437 438 list_destroy(cb_list); 439 440 kmem_free(cb_list, sizeof (list_t)); 441 } 442 443 /* 444 * Dispatch the commit callbacks registered on this txg to worker threads. 445 * 446 * If no callbacks are registered for a given TXG, nothing happens. 447 * This function creates a taskq for the associated pool, if needed. 448 */ 449 static void 450 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 451 { 452 int c; 453 tx_state_t *tx = &dp->dp_tx; 454 list_t *cb_list; 455 456 for (c = 0; c < max_ncpus; c++) { 457 tx_cpu_t *tc = &tx->tx_cpu[c]; 458 /* 459 * No need to lock tx_cpu_t at this point, since this can 460 * only be called once a txg has been synced. 461 */ 462 463 int g = txg & TXG_MASK; 464 465 if (list_is_empty(&tc->tc_callbacks[g])) 466 continue; 467 468 if (tx->tx_commit_cb_taskq == NULL) { 469 /* 470 * Commit callback taskq hasn't been created yet. 471 */ 472 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 473 100, defclsyspri, boot_ncpus, boot_ncpus * 2, 474 TASKQ_PREPOPULATE | TASKQ_DYNAMIC | 475 TASKQ_THREADS_CPU_PCT); 476 } 477 478 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 479 list_create(cb_list, sizeof (dmu_tx_callback_t), 480 offsetof(dmu_tx_callback_t, dcb_node)); 481 482 list_move_tail(cb_list, &tc->tc_callbacks[g]); 483 484 (void) taskq_dispatch(tx->tx_commit_cb_taskq, 485 txg_do_callbacks, cb_list, TQ_SLEEP); 486 } 487 } 488 489 /* 490 * Wait for pending commit callbacks of already-synced transactions to finish 491 * processing. 492 * Calling this function from within a commit callback will deadlock. 493 */ 494 void 495 txg_wait_callbacks(dsl_pool_t *dp) 496 { 497 tx_state_t *tx = &dp->dp_tx; 498 499 if (tx->tx_commit_cb_taskq != NULL) 500 taskq_wait_outstanding(tx->tx_commit_cb_taskq, 0); 501 } 502 503 static boolean_t 504 txg_is_quiescing(dsl_pool_t *dp) 505 { 506 tx_state_t *tx = &dp->dp_tx; 507 ASSERT(MUTEX_HELD(&tx->tx_sync_lock)); 508 return (tx->tx_quiescing_txg != 0); 509 } 510 511 static boolean_t 512 txg_has_quiesced_to_sync(dsl_pool_t *dp) 513 { 514 tx_state_t *tx = &dp->dp_tx; 515 ASSERT(MUTEX_HELD(&tx->tx_sync_lock)); 516 return (tx->tx_quiesced_txg != 0); 517 } 518 519 static __attribute__((noreturn)) void 520 txg_sync_thread(void *arg) 521 { 522 dsl_pool_t *dp = arg; 523 spa_t *spa = dp->dp_spa; 524 tx_state_t *tx = &dp->dp_tx; 525 callb_cpr_t cpr; 526 clock_t start, delta; 527 528 (void) spl_fstrans_mark(); 529 txg_thread_enter(tx, &cpr); 530 531 start = delta = 0; 532 for (;;) { 533 clock_t timeout = zfs_txg_timeout * hz; 534 clock_t timer; 535 uint64_t txg; 536 537 /* 538 * We sync when we're scanning, there's someone waiting 539 * on us, or the quiesce thread has handed off a txg to 540 * us, or we have reached our timeout. 541 */ 542 timer = (delta >= timeout ? 0 : timeout - delta); 543 while (!dsl_scan_active(dp->dp_scan) && 544 !tx->tx_exiting && timer > 0 && 545 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 546 !txg_has_quiesced_to_sync(dp)) { 547 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 548 (u_longlong_t)tx->tx_synced_txg, 549 (u_longlong_t)tx->tx_sync_txg_waiting, dp); 550 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 551 delta = ddi_get_lbolt() - start; 552 timer = (delta > timeout ? 0 : timeout - delta); 553 } 554 555 /* 556 * When we're suspended, nothing should be changing and for 557 * MMP we don't want to bump anything that would make it 558 * harder to detect if another host is changing it when 559 * resuming after a MMP suspend. 560 */ 561 if (spa_suspended(spa)) 562 continue; 563 564 /* 565 * Wait until the quiesce thread hands off a txg to us, 566 * prompting it to do so if necessary. 567 */ 568 while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) { 569 if (txg_is_quiescing(dp)) { 570 txg_thread_wait(tx, &cpr, 571 &tx->tx_quiesce_done_cv, 0); 572 continue; 573 } 574 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 575 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 576 cv_broadcast(&tx->tx_quiesce_more_cv); 577 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 578 } 579 580 if (tx->tx_exiting) 581 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 582 583 /* 584 * Consume the quiesced txg which has been handed off to 585 * us. This may cause the quiescing thread to now be 586 * able to quiesce another txg, so we must signal it. 587 */ 588 ASSERT(tx->tx_quiesced_txg != 0); 589 txg = tx->tx_quiesced_txg; 590 tx->tx_quiesced_txg = 0; 591 tx->tx_syncing_txg = txg; 592 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg); 593 cv_broadcast(&tx->tx_quiesce_more_cv); 594 595 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 596 (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, 597 (u_longlong_t)tx->tx_sync_txg_waiting); 598 mutex_exit(&tx->tx_sync_lock); 599 600 txg_stat_t *ts = spa_txg_history_init_io(spa, txg, dp); 601 start = ddi_get_lbolt(); 602 spa_sync(spa, txg); 603 delta = ddi_get_lbolt() - start; 604 spa_txg_history_fini_io(spa, ts); 605 606 mutex_enter(&tx->tx_sync_lock); 607 tx->tx_synced_txg = txg; 608 tx->tx_syncing_txg = 0; 609 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg); 610 cv_broadcast(&tx->tx_sync_done_cv); 611 612 /* 613 * Dispatch commit callbacks to worker threads. 614 */ 615 txg_dispatch_callbacks(dp, txg); 616 } 617 } 618 619 static __attribute__((noreturn)) void 620 txg_quiesce_thread(void *arg) 621 { 622 dsl_pool_t *dp = arg; 623 tx_state_t *tx = &dp->dp_tx; 624 callb_cpr_t cpr; 625 626 txg_thread_enter(tx, &cpr); 627 628 for (;;) { 629 uint64_t txg; 630 631 /* 632 * We quiesce when there's someone waiting on us. 633 * However, we can only have one txg in "quiescing" or 634 * "quiesced, waiting to sync" state. So we wait until 635 * the "quiesced, waiting to sync" txg has been consumed 636 * by the sync thread. 637 */ 638 while (!tx->tx_exiting && 639 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 640 txg_has_quiesced_to_sync(dp))) 641 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 642 643 if (tx->tx_exiting) 644 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 645 646 txg = tx->tx_open_txg; 647 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 648 (u_longlong_t)txg, 649 (u_longlong_t)tx->tx_quiesce_txg_waiting, 650 (u_longlong_t)tx->tx_sync_txg_waiting); 651 tx->tx_quiescing_txg = txg; 652 653 mutex_exit(&tx->tx_sync_lock); 654 txg_quiesce(dp, txg); 655 mutex_enter(&tx->tx_sync_lock); 656 657 /* 658 * Hand this txg off to the sync thread. 659 */ 660 dprintf("quiesce done, handing off txg %llu\n", 661 (u_longlong_t)txg); 662 tx->tx_quiescing_txg = 0; 663 tx->tx_quiesced_txg = txg; 664 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg); 665 cv_broadcast(&tx->tx_sync_more_cv); 666 cv_broadcast(&tx->tx_quiesce_done_cv); 667 } 668 } 669 670 /* 671 * Delay this thread by delay nanoseconds if we are still in the open 672 * transaction group and there is already a waiting txg quiescing or quiesced. 673 * Abort the delay if this txg stalls or enters the quiescing state. 674 */ 675 void 676 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution) 677 { 678 tx_state_t *tx = &dp->dp_tx; 679 hrtime_t start = gethrtime(); 680 681 /* don't delay if this txg could transition to quiescing immediately */ 682 if (tx->tx_open_txg > txg || 683 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 684 return; 685 686 mutex_enter(&tx->tx_sync_lock); 687 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 688 mutex_exit(&tx->tx_sync_lock); 689 return; 690 } 691 692 while (gethrtime() - start < delay && 693 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) { 694 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv, 695 &tx->tx_sync_lock, delay, resolution, 0); 696 } 697 698 DMU_TX_STAT_BUMP(dmu_tx_delay); 699 700 mutex_exit(&tx->tx_sync_lock); 701 } 702 703 int 704 txg_wait_synced_flags(dsl_pool_t *dp, uint64_t txg, txg_wait_flag_t flags) 705 { 706 int error = 0; 707 tx_state_t *tx = &dp->dp_tx; 708 709 ASSERT0(flags & ~(TXG_WAIT_SIGNAL | TXG_WAIT_SUSPEND)); 710 ASSERT(!dsl_pool_config_held(dp)); 711 712 mutex_enter(&tx->tx_sync_lock); 713 ASSERT3U(tx->tx_threads, ==, 2); 714 if (txg == 0) 715 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 716 if (tx->tx_sync_txg_waiting < txg) 717 tx->tx_sync_txg_waiting = txg; 718 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 719 (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, 720 (u_longlong_t)tx->tx_sync_txg_waiting); 721 722 /* 723 * Keep pushing util the pool gets to the wanted txg. If something 724 * else interesting happens, we'll set an error and break out. 725 */ 726 while (tx->tx_synced_txg < txg) { 727 if ((flags & TXG_WAIT_SUSPEND) && spa_suspended(dp->dp_spa)) { 728 /* 729 * Pool suspended and the caller does not want to 730 * block; inform them immediately. 731 */ 732 error = SET_ERROR(ESHUTDOWN); 733 break; 734 } 735 736 dprintf("broadcasting sync more " 737 "tx_synced=%llu waiting=%llu dp=%px\n", 738 (u_longlong_t)tx->tx_synced_txg, 739 (u_longlong_t)tx->tx_sync_txg_waiting, dp); 740 cv_broadcast(&tx->tx_sync_more_cv); 741 742 if (flags & TXG_WAIT_SIGNAL) { 743 /* 744 * Condition wait here but stop if the thread receives a 745 * signal. The caller may call txg_wait_synced*() again 746 * to resume waiting for this txg. 747 */ 748 if (cv_wait_io_sig(&tx->tx_sync_done_cv, 749 &tx->tx_sync_lock) == 0) { 750 error = SET_ERROR(EINTR); 751 break; 752 } 753 } else { 754 /* Uninterruptable wait, until the condvar fires */ 755 cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 756 } 757 } 758 759 mutex_exit(&tx->tx_sync_lock); 760 return (error); 761 } 762 763 void 764 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 765 { 766 VERIFY0(txg_wait_synced_flags(dp, txg, TXG_WAIT_NONE)); 767 } 768 769 void 770 txg_wait_kick(dsl_pool_t *dp) 771 { 772 tx_state_t *tx = &dp->dp_tx; 773 mutex_enter(&tx->tx_sync_lock); 774 cv_broadcast(&tx->tx_sync_done_cv); 775 mutex_exit(&tx->tx_sync_lock); 776 } 777 778 /* 779 * Wait for the specified open transaction group. Set should_quiesce 780 * when the current open txg should be quiesced immediately. 781 */ 782 void 783 txg_wait_open(dsl_pool_t *dp, uint64_t txg, boolean_t should_quiesce) 784 { 785 tx_state_t *tx = &dp->dp_tx; 786 787 ASSERT(!dsl_pool_config_held(dp)); 788 789 mutex_enter(&tx->tx_sync_lock); 790 ASSERT3U(tx->tx_threads, ==, 2); 791 if (txg == 0) 792 txg = tx->tx_open_txg + 1; 793 if (tx->tx_quiesce_txg_waiting < txg && should_quiesce) 794 tx->tx_quiesce_txg_waiting = txg; 795 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 796 (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, 797 (u_longlong_t)tx->tx_sync_txg_waiting); 798 while (tx->tx_open_txg < txg) { 799 cv_broadcast(&tx->tx_quiesce_more_cv); 800 /* 801 * Callers setting should_quiesce will use cv_wait_io() and 802 * be accounted for as iowait time. Otherwise, the caller is 803 * understood to be idle and cv_wait_sig() is used to prevent 804 * incorrectly inflating the system load average. 805 */ 806 if (should_quiesce == B_TRUE) { 807 cv_wait_io(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 808 } else { 809 cv_wait_idle(&tx->tx_quiesce_done_cv, 810 &tx->tx_sync_lock); 811 } 812 } 813 mutex_exit(&tx->tx_sync_lock); 814 } 815 816 /* 817 * Pass in the txg number that should be synced. 818 */ 819 void 820 txg_kick(dsl_pool_t *dp, uint64_t txg) 821 { 822 tx_state_t *tx = &dp->dp_tx; 823 824 ASSERT(!dsl_pool_config_held(dp)); 825 826 if (tx->tx_sync_txg_waiting >= txg) 827 return; 828 829 mutex_enter(&tx->tx_sync_lock); 830 if (tx->tx_sync_txg_waiting < txg) { 831 tx->tx_sync_txg_waiting = txg; 832 cv_broadcast(&tx->tx_sync_more_cv); 833 } 834 mutex_exit(&tx->tx_sync_lock); 835 } 836 837 boolean_t 838 txg_stalled(dsl_pool_t *dp) 839 { 840 tx_state_t *tx = &dp->dp_tx; 841 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 842 } 843 844 boolean_t 845 txg_sync_waiting(dsl_pool_t *dp) 846 { 847 tx_state_t *tx = &dp->dp_tx; 848 849 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 850 tx->tx_quiesced_txg != 0); 851 } 852 853 /* 854 * Verify that this txg is active (open, quiescing, syncing). Non-active 855 * txg's should not be manipulated. 856 */ 857 #ifdef ZFS_DEBUG 858 void 859 txg_verify(spa_t *spa, uint64_t txg) 860 { 861 dsl_pool_t *dp __maybe_unused = spa_get_dsl(spa); 862 if (txg <= TXG_INITIAL || txg == ZILTEST_TXG) 863 return; 864 ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg); 865 ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg); 866 ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES); 867 } 868 #endif 869 870 /* 871 * Per-txg object lists. 872 */ 873 void 874 txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset) 875 { 876 int t; 877 878 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 879 880 tl->tl_offset = offset; 881 tl->tl_spa = spa; 882 883 for (t = 0; t < TXG_SIZE; t++) 884 tl->tl_head[t] = NULL; 885 } 886 887 static boolean_t 888 txg_list_empty_impl(txg_list_t *tl, uint64_t txg) 889 { 890 ASSERT(MUTEX_HELD(&tl->tl_lock)); 891 TXG_VERIFY(tl->tl_spa, txg); 892 return (tl->tl_head[txg & TXG_MASK] == NULL); 893 } 894 895 boolean_t 896 txg_list_empty(txg_list_t *tl, uint64_t txg) 897 { 898 mutex_enter(&tl->tl_lock); 899 boolean_t ret = txg_list_empty_impl(tl, txg); 900 mutex_exit(&tl->tl_lock); 901 902 return (ret); 903 } 904 905 void 906 txg_list_destroy(txg_list_t *tl) 907 { 908 int t; 909 910 mutex_enter(&tl->tl_lock); 911 for (t = 0; t < TXG_SIZE; t++) 912 ASSERT(txg_list_empty_impl(tl, t)); 913 mutex_exit(&tl->tl_lock); 914 915 mutex_destroy(&tl->tl_lock); 916 } 917 918 /* 919 * Returns true if all txg lists are empty. 920 * 921 * Warning: this is inherently racy (an item could be added immediately 922 * after this function returns). 923 */ 924 boolean_t 925 txg_all_lists_empty(txg_list_t *tl) 926 { 927 boolean_t res = B_TRUE; 928 for (int i = 0; i < TXG_SIZE; i++) 929 res &= (tl->tl_head[i] == NULL); 930 return (res); 931 } 932 933 /* 934 * Add an entry to the list (unless it's already on the list). 935 * Returns B_TRUE if it was actually added. 936 */ 937 boolean_t 938 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 939 { 940 int t = txg & TXG_MASK; 941 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 942 boolean_t add; 943 944 TXG_VERIFY(tl->tl_spa, txg); 945 mutex_enter(&tl->tl_lock); 946 add = (tn->tn_member[t] == 0); 947 if (add) { 948 tn->tn_member[t] = 1; 949 tn->tn_next[t] = tl->tl_head[t]; 950 tl->tl_head[t] = tn; 951 } 952 mutex_exit(&tl->tl_lock); 953 954 return (add); 955 } 956 957 /* 958 * Add an entry to the end of the list, unless it's already on the list. 959 * (walks list to find end) 960 * Returns B_TRUE if it was actually added. 961 */ 962 boolean_t 963 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 964 { 965 int t = txg & TXG_MASK; 966 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 967 boolean_t add; 968 969 TXG_VERIFY(tl->tl_spa, txg); 970 mutex_enter(&tl->tl_lock); 971 add = (tn->tn_member[t] == 0); 972 if (add) { 973 txg_node_t **tp; 974 975 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 976 continue; 977 978 tn->tn_member[t] = 1; 979 tn->tn_next[t] = NULL; 980 *tp = tn; 981 } 982 mutex_exit(&tl->tl_lock); 983 984 return (add); 985 } 986 987 /* 988 * Remove the head of the list and return it. 989 */ 990 void * 991 txg_list_remove(txg_list_t *tl, uint64_t txg) 992 { 993 int t = txg & TXG_MASK; 994 txg_node_t *tn; 995 void *p = NULL; 996 997 TXG_VERIFY(tl->tl_spa, txg); 998 mutex_enter(&tl->tl_lock); 999 if ((tn = tl->tl_head[t]) != NULL) { 1000 ASSERT(tn->tn_member[t]); 1001 ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]); 1002 p = (char *)tn - tl->tl_offset; 1003 tl->tl_head[t] = tn->tn_next[t]; 1004 tn->tn_next[t] = NULL; 1005 tn->tn_member[t] = 0; 1006 } 1007 mutex_exit(&tl->tl_lock); 1008 1009 return (p); 1010 } 1011 1012 /* 1013 * Remove a specific item from the list and return it. 1014 */ 1015 void * 1016 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 1017 { 1018 int t = txg & TXG_MASK; 1019 txg_node_t *tn, **tp; 1020 1021 TXG_VERIFY(tl->tl_spa, txg); 1022 mutex_enter(&tl->tl_lock); 1023 1024 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 1025 if ((char *)tn - tl->tl_offset == p) { 1026 *tp = tn->tn_next[t]; 1027 tn->tn_next[t] = NULL; 1028 tn->tn_member[t] = 0; 1029 mutex_exit(&tl->tl_lock); 1030 return (p); 1031 } 1032 } 1033 1034 mutex_exit(&tl->tl_lock); 1035 1036 return (NULL); 1037 } 1038 1039 boolean_t 1040 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 1041 { 1042 int t = txg & TXG_MASK; 1043 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 1044 1045 TXG_VERIFY(tl->tl_spa, txg); 1046 return (tn->tn_member[t] != 0); 1047 } 1048 1049 /* 1050 * Walk a txg list 1051 */ 1052 void * 1053 txg_list_head(txg_list_t *tl, uint64_t txg) 1054 { 1055 int t = txg & TXG_MASK; 1056 txg_node_t *tn; 1057 1058 mutex_enter(&tl->tl_lock); 1059 tn = tl->tl_head[t]; 1060 mutex_exit(&tl->tl_lock); 1061 1062 TXG_VERIFY(tl->tl_spa, txg); 1063 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 1064 } 1065 1066 void * 1067 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 1068 { 1069 int t = txg & TXG_MASK; 1070 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 1071 1072 TXG_VERIFY(tl->tl_spa, txg); 1073 1074 mutex_enter(&tl->tl_lock); 1075 tn = tn->tn_next[t]; 1076 mutex_exit(&tl->tl_lock); 1077 1078 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 1079 } 1080 1081 EXPORT_SYMBOL(txg_init); 1082 EXPORT_SYMBOL(txg_fini); 1083 EXPORT_SYMBOL(txg_sync_start); 1084 EXPORT_SYMBOL(txg_sync_stop); 1085 EXPORT_SYMBOL(txg_hold_open); 1086 EXPORT_SYMBOL(txg_rele_to_quiesce); 1087 EXPORT_SYMBOL(txg_rele_to_sync); 1088 EXPORT_SYMBOL(txg_register_callbacks); 1089 EXPORT_SYMBOL(txg_delay); 1090 EXPORT_SYMBOL(txg_wait_synced); 1091 EXPORT_SYMBOL(txg_wait_open); 1092 EXPORT_SYMBOL(txg_wait_callbacks); 1093 EXPORT_SYMBOL(txg_stalled); 1094 EXPORT_SYMBOL(txg_sync_waiting); 1095 1096 ZFS_MODULE_PARAM(zfs_txg, zfs_txg_, timeout, UINT, ZMOD_RW, 1097 "Max seconds worth of delta per txg"); 1098