1 // SPDX-License-Identifier: CDDL-1.0 2 /* 3 * CDDL HEADER START 4 * 5 * The contents of this file are subject to the terms of the 6 * Common Development and Distribution License (the "License"). 7 * You may not use this file except in compliance with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or https://opensource.org/licenses/CDDL-1.0. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 24 * Portions Copyright 2011 Martin Matuska 25 * Copyright (c) 2012, 2019 by Delphix. All rights reserved. 26 */ 27 28 #include <sys/zfs_context.h> 29 #include <sys/txg_impl.h> 30 #include <sys/dmu_impl.h> 31 #include <sys/spa_impl.h> 32 #include <sys/dmu_tx.h> 33 #include <sys/dsl_pool.h> 34 #include <sys/dsl_scan.h> 35 #include <sys/zil.h> 36 #include <sys/callb.h> 37 #include <sys/trace_zfs.h> 38 39 /* 40 * ZFS Transaction Groups 41 * ---------------------- 42 * 43 * ZFS transaction groups are, as the name implies, groups of transactions 44 * that act on persistent state. ZFS asserts consistency at the granularity of 45 * these transaction groups. Each successive transaction group (txg) is 46 * assigned a 64-bit consecutive identifier. There are three active 47 * transaction group states: open, quiescing, or syncing. At any given time, 48 * there may be an active txg associated with each state; each active txg may 49 * either be processing, or blocked waiting to enter the next state. There may 50 * be up to three active txgs, and there is always a txg in the open state 51 * (though it may be blocked waiting to enter the quiescing state). In broad 52 * strokes, transactions -- operations that change in-memory structures -- are 53 * accepted into the txg in the open state, and are completed while the txg is 54 * in the open or quiescing states. The accumulated changes are written to 55 * disk in the syncing state. 56 * 57 * Open 58 * 59 * When a new txg becomes active, it first enters the open state. New 60 * transactions -- updates to in-memory structures -- are assigned to the 61 * currently open txg. There is always a txg in the open state so that ZFS can 62 * accept new changes (though the txg may refuse new changes if it has hit 63 * some limit). ZFS advances the open txg to the next state for a variety of 64 * reasons such as it hitting a time or size threshold, or the execution of an 65 * administrative action that must be completed in the syncing state. 66 * 67 * Quiescing 68 * 69 * After a txg exits the open state, it enters the quiescing state. The 70 * quiescing state is intended to provide a buffer between accepting new 71 * transactions in the open state and writing them out to stable storage in 72 * the syncing state. While quiescing, transactions can continue their 73 * operation without delaying either of the other states. Typically, a txg is 74 * in the quiescing state very briefly since the operations are bounded by 75 * software latencies rather than, say, slower I/O latencies. After all 76 * transactions complete, the txg is ready to enter the next state. 77 * 78 * Syncing 79 * 80 * In the syncing state, the in-memory state built up during the open and (to 81 * a lesser degree) the quiescing states is written to stable storage. The 82 * process of writing out modified data can, in turn modify more data. For 83 * example when we write new blocks, we need to allocate space for them; those 84 * allocations modify metadata (space maps)... which themselves must be 85 * written to stable storage. During the sync state, ZFS iterates, writing out 86 * data until it converges and all in-memory changes have been written out. 87 * The first such pass is the largest as it encompasses all the modified user 88 * data (as opposed to filesystem metadata). Subsequent passes typically have 89 * far less data to write as they consist exclusively of filesystem metadata. 90 * 91 * To ensure convergence, after a certain number of passes ZFS begins 92 * overwriting locations on stable storage that had been allocated earlier in 93 * the syncing state (and subsequently freed). ZFS usually allocates new 94 * blocks to optimize for large, continuous, writes. For the syncing state to 95 * converge however it must complete a pass where no new blocks are allocated 96 * since each allocation requires a modification of persistent metadata. 97 * Further, to hasten convergence, after a prescribed number of passes, ZFS 98 * also defers frees, and stops compressing. 99 * 100 * In addition to writing out user data, we must also execute synctasks during 101 * the syncing context. A synctask is the mechanism by which some 102 * administrative activities work such as creating and destroying snapshots or 103 * datasets. Note that when a synctask is initiated it enters the open txg, 104 * and ZFS then pushes that txg as quickly as possible to completion of the 105 * syncing state in order to reduce the latency of the administrative 106 * activity. To complete the syncing state, ZFS writes out a new uberblock, 107 * the root of the tree of blocks that comprise all state stored on the ZFS 108 * pool. Finally, if there is a quiesced txg waiting, we signal that it can 109 * now transition to the syncing state. 110 */ 111 112 static __attribute__((noreturn)) void txg_sync_thread(void *arg); 113 static __attribute__((noreturn)) void txg_quiesce_thread(void *arg); 114 115 uint_t zfs_txg_timeout = 5; /* max seconds worth of delta per txg */ 116 117 /* 118 * Prepare the txg subsystem. 119 */ 120 void 121 txg_init(dsl_pool_t *dp, uint64_t txg) 122 { 123 tx_state_t *tx = &dp->dp_tx; 124 int c; 125 memset(tx, 0, sizeof (tx_state_t)); 126 127 tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 128 129 for (c = 0; c < max_ncpus; c++) { 130 int i; 131 132 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 133 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_NOLOCKDEP, 134 NULL); 135 for (i = 0; i < TXG_SIZE; i++) { 136 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 137 NULL); 138 list_create(&tx->tx_cpu[c].tc_callbacks[i], 139 sizeof (dmu_tx_callback_t), 140 offsetof(dmu_tx_callback_t, dcb_node)); 141 } 142 } 143 144 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 145 146 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 147 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 148 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 149 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 150 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 151 152 tx->tx_open_txg = txg; 153 } 154 155 /* 156 * Close down the txg subsystem. 157 */ 158 void 159 txg_fini(dsl_pool_t *dp) 160 { 161 tx_state_t *tx = &dp->dp_tx; 162 int c; 163 164 ASSERT0(tx->tx_threads); 165 166 mutex_destroy(&tx->tx_sync_lock); 167 168 cv_destroy(&tx->tx_sync_more_cv); 169 cv_destroy(&tx->tx_sync_done_cv); 170 cv_destroy(&tx->tx_quiesce_more_cv); 171 cv_destroy(&tx->tx_quiesce_done_cv); 172 cv_destroy(&tx->tx_exit_cv); 173 174 for (c = 0; c < max_ncpus; c++) { 175 int i; 176 177 mutex_destroy(&tx->tx_cpu[c].tc_open_lock); 178 mutex_destroy(&tx->tx_cpu[c].tc_lock); 179 for (i = 0; i < TXG_SIZE; i++) { 180 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 181 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 182 } 183 } 184 185 if (tx->tx_commit_cb_taskq != NULL) 186 taskq_destroy(tx->tx_commit_cb_taskq); 187 188 vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 189 190 memset(tx, 0, sizeof (tx_state_t)); 191 } 192 193 /* 194 * Start syncing transaction groups. 195 */ 196 void 197 txg_sync_start(dsl_pool_t *dp) 198 { 199 tx_state_t *tx = &dp->dp_tx; 200 201 mutex_enter(&tx->tx_sync_lock); 202 203 dprintf("pool %p\n", dp); 204 205 ASSERT0(tx->tx_threads); 206 207 tx->tx_threads = 2; 208 209 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 210 dp, 0, &p0, TS_RUN, defclsyspri); 211 212 /* 213 * The sync thread can need a larger-than-default stack size on 214 * 32-bit x86. This is due in part to nested pools and 215 * scrub_visitbp() recursion. 216 */ 217 tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 218 dp, 0, &p0, TS_RUN, defclsyspri); 219 220 mutex_exit(&tx->tx_sync_lock); 221 } 222 223 static void 224 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 225 { 226 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 227 mutex_enter(&tx->tx_sync_lock); 228 } 229 230 static void 231 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 232 { 233 ASSERT(*tpp != NULL); 234 *tpp = NULL; 235 tx->tx_threads--; 236 cv_broadcast(&tx->tx_exit_cv); 237 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 238 thread_exit(); 239 } 240 241 static void 242 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time) 243 { 244 CALLB_CPR_SAFE_BEGIN(cpr); 245 246 if (time) { 247 (void) cv_timedwait_idle(cv, &tx->tx_sync_lock, 248 ddi_get_lbolt() + time); 249 } else { 250 cv_wait_idle(cv, &tx->tx_sync_lock); 251 } 252 253 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 254 } 255 256 /* 257 * Stop syncing transaction groups. 258 */ 259 void 260 txg_sync_stop(dsl_pool_t *dp) 261 { 262 tx_state_t *tx = &dp->dp_tx; 263 264 dprintf("pool %p\n", dp); 265 /* 266 * Finish off any work in progress. 267 */ 268 ASSERT3U(tx->tx_threads, ==, 2); 269 270 /* 271 * We need to ensure that we've vacated the deferred metaslab trees. 272 */ 273 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 274 275 /* 276 * Wake all sync threads and wait for them to die. 277 */ 278 mutex_enter(&tx->tx_sync_lock); 279 280 ASSERT3U(tx->tx_threads, ==, 2); 281 282 tx->tx_exiting = 1; 283 284 cv_broadcast(&tx->tx_quiesce_more_cv); 285 cv_broadcast(&tx->tx_quiesce_done_cv); 286 cv_broadcast(&tx->tx_sync_more_cv); 287 288 while (tx->tx_threads != 0) 289 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 290 291 tx->tx_exiting = 0; 292 293 mutex_exit(&tx->tx_sync_lock); 294 } 295 296 /* 297 * Get a handle on the currently open txg and keep it open. 298 * 299 * The txg is guaranteed to stay open until txg_rele_to_quiesce() is called for 300 * the handle. Once txg_rele_to_quiesce() has been called, the txg stays 301 * in quiescing state until txg_rele_to_sync() is called for the handle. 302 * 303 * It is guaranteed that subsequent calls return monotonically increasing 304 * txgs for the same dsl_pool_t. Of course this is not strong monotonicity, 305 * because the same txg can be returned multiple times in a row. This 306 * guarantee holds both for subsequent calls from one thread and for multiple 307 * threads. For example, it is impossible to observe the following sequence 308 * of events: 309 * 310 * Thread 1 Thread 2 311 * 312 * 1 <- txg_hold_open(P, ...) 313 * 2 <- txg_hold_open(P, ...) 314 * 1 <- txg_hold_open(P, ...) 315 * 316 */ 317 uint64_t 318 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 319 { 320 tx_state_t *tx = &dp->dp_tx; 321 tx_cpu_t *tc; 322 uint64_t txg; 323 324 /* 325 * It appears the processor id is simply used as a "random" 326 * number to index into the array, and there isn't any other 327 * significance to the chosen tx_cpu. Because.. Why not use 328 * the current cpu to index into the array? 329 */ 330 tc = &tx->tx_cpu[CPU_SEQID_UNSTABLE]; 331 332 mutex_enter(&tc->tc_open_lock); 333 txg = tx->tx_open_txg; 334 335 mutex_enter(&tc->tc_lock); 336 tc->tc_count[txg & TXG_MASK]++; 337 mutex_exit(&tc->tc_lock); 338 339 th->th_cpu = tc; 340 th->th_txg = txg; 341 342 return (txg); 343 } 344 345 void 346 txg_rele_to_quiesce(txg_handle_t *th) 347 { 348 tx_cpu_t *tc = th->th_cpu; 349 350 ASSERT(!MUTEX_HELD(&tc->tc_lock)); 351 mutex_exit(&tc->tc_open_lock); 352 } 353 354 void 355 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 356 { 357 tx_cpu_t *tc = th->th_cpu; 358 int g = th->th_txg & TXG_MASK; 359 360 mutex_enter(&tc->tc_lock); 361 list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 362 mutex_exit(&tc->tc_lock); 363 } 364 365 void 366 txg_rele_to_sync(txg_handle_t *th) 367 { 368 tx_cpu_t *tc = th->th_cpu; 369 int g = th->th_txg & TXG_MASK; 370 371 mutex_enter(&tc->tc_lock); 372 ASSERT(tc->tc_count[g] != 0); 373 if (--tc->tc_count[g] == 0) 374 cv_broadcast(&tc->tc_cv[g]); 375 mutex_exit(&tc->tc_lock); 376 377 th->th_cpu = NULL; /* defensive */ 378 } 379 380 /* 381 * Blocks until all transactions in the group are committed. 382 * 383 * On return, the transaction group has reached a stable state in which it can 384 * then be passed off to the syncing context. 385 */ 386 static void 387 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 388 { 389 tx_state_t *tx = &dp->dp_tx; 390 uint64_t tx_open_time; 391 int g = txg & TXG_MASK; 392 int c; 393 394 /* 395 * Grab all tc_open_locks so nobody else can get into this txg. 396 */ 397 for (c = 0; c < max_ncpus; c++) 398 mutex_enter(&tx->tx_cpu[c].tc_open_lock); 399 400 ASSERT(txg == tx->tx_open_txg); 401 tx->tx_open_txg++; 402 tx->tx_open_time = tx_open_time = gethrtime(); 403 404 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg); 405 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg); 406 407 /* 408 * Now that we've incremented tx_open_txg, we can let threads 409 * enter the next transaction group. 410 */ 411 for (c = 0; c < max_ncpus; c++) 412 mutex_exit(&tx->tx_cpu[c].tc_open_lock); 413 414 spa_txg_history_set(dp->dp_spa, txg, TXG_STATE_OPEN, tx_open_time); 415 spa_txg_history_add(dp->dp_spa, txg + 1, tx_open_time); 416 417 /* 418 * Quiesce the transaction group by waiting for everyone to 419 * call txg_rele_to_sync() for their open transaction handles. 420 */ 421 for (c = 0; c < max_ncpus; c++) { 422 tx_cpu_t *tc = &tx->tx_cpu[c]; 423 mutex_enter(&tc->tc_lock); 424 while (tc->tc_count[g] != 0) 425 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 426 mutex_exit(&tc->tc_lock); 427 } 428 429 spa_txg_history_set(dp->dp_spa, txg, TXG_STATE_QUIESCED, gethrtime()); 430 } 431 432 static void 433 txg_do_callbacks(void *cb_list) 434 { 435 dmu_tx_do_callbacks(cb_list, 0); 436 437 list_destroy(cb_list); 438 439 kmem_free(cb_list, sizeof (list_t)); 440 } 441 442 /* 443 * Dispatch the commit callbacks registered on this txg to worker threads. 444 * 445 * If no callbacks are registered for a given TXG, nothing happens. 446 * This function creates a taskq for the associated pool, if needed. 447 */ 448 static void 449 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 450 { 451 int c; 452 tx_state_t *tx = &dp->dp_tx; 453 list_t *cb_list; 454 455 for (c = 0; c < max_ncpus; c++) { 456 tx_cpu_t *tc = &tx->tx_cpu[c]; 457 /* 458 * No need to lock tx_cpu_t at this point, since this can 459 * only be called once a txg has been synced. 460 */ 461 462 int g = txg & TXG_MASK; 463 464 if (list_is_empty(&tc->tc_callbacks[g])) 465 continue; 466 467 if (tx->tx_commit_cb_taskq == NULL) { 468 /* 469 * Commit callback taskq hasn't been created yet. 470 */ 471 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 472 100, defclsyspri, boot_ncpus, boot_ncpus * 2, 473 TASKQ_PREPOPULATE | TASKQ_DYNAMIC | 474 TASKQ_THREADS_CPU_PCT); 475 } 476 477 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 478 list_create(cb_list, sizeof (dmu_tx_callback_t), 479 offsetof(dmu_tx_callback_t, dcb_node)); 480 481 list_move_tail(cb_list, &tc->tc_callbacks[g]); 482 483 (void) taskq_dispatch(tx->tx_commit_cb_taskq, 484 txg_do_callbacks, cb_list, TQ_SLEEP); 485 } 486 } 487 488 /* 489 * Wait for pending commit callbacks of already-synced transactions to finish 490 * processing. 491 * Calling this function from within a commit callback will deadlock. 492 */ 493 void 494 txg_wait_callbacks(dsl_pool_t *dp) 495 { 496 tx_state_t *tx = &dp->dp_tx; 497 498 if (tx->tx_commit_cb_taskq != NULL) 499 taskq_wait_outstanding(tx->tx_commit_cb_taskq, 0); 500 } 501 502 static boolean_t 503 txg_is_quiescing(dsl_pool_t *dp) 504 { 505 tx_state_t *tx = &dp->dp_tx; 506 ASSERT(MUTEX_HELD(&tx->tx_sync_lock)); 507 return (tx->tx_quiescing_txg != 0); 508 } 509 510 static boolean_t 511 txg_has_quiesced_to_sync(dsl_pool_t *dp) 512 { 513 tx_state_t *tx = &dp->dp_tx; 514 ASSERT(MUTEX_HELD(&tx->tx_sync_lock)); 515 return (tx->tx_quiesced_txg != 0); 516 } 517 518 static __attribute__((noreturn)) void 519 txg_sync_thread(void *arg) 520 { 521 dsl_pool_t *dp = arg; 522 spa_t *spa = dp->dp_spa; 523 tx_state_t *tx = &dp->dp_tx; 524 callb_cpr_t cpr; 525 clock_t start, delta; 526 527 (void) spl_fstrans_mark(); 528 txg_thread_enter(tx, &cpr); 529 530 start = delta = 0; 531 for (;;) { 532 clock_t timeout = zfs_txg_timeout * hz; 533 clock_t timer; 534 uint64_t txg; 535 536 /* 537 * We sync when we're scanning, there's someone waiting 538 * on us, or the quiesce thread has handed off a txg to 539 * us, or we have reached our timeout. 540 */ 541 timer = (delta >= timeout ? 0 : timeout - delta); 542 while (!dsl_scan_active(dp->dp_scan) && 543 !tx->tx_exiting && timer > 0 && 544 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 545 !txg_has_quiesced_to_sync(dp)) { 546 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 547 (u_longlong_t)tx->tx_synced_txg, 548 (u_longlong_t)tx->tx_sync_txg_waiting, dp); 549 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 550 delta = ddi_get_lbolt() - start; 551 timer = (delta > timeout ? 0 : timeout - delta); 552 } 553 554 /* 555 * When we're suspended, nothing should be changing and for 556 * MMP we don't want to bump anything that would make it 557 * harder to detect if another host is changing it when 558 * resuming after a MMP suspend. 559 */ 560 if (spa_suspended(spa)) 561 continue; 562 563 /* 564 * Wait until the quiesce thread hands off a txg to us, 565 * prompting it to do so if necessary. 566 */ 567 while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) { 568 if (txg_is_quiescing(dp)) { 569 txg_thread_wait(tx, &cpr, 570 &tx->tx_quiesce_done_cv, 0); 571 continue; 572 } 573 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 574 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 575 cv_broadcast(&tx->tx_quiesce_more_cv); 576 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 577 } 578 579 if (tx->tx_exiting) 580 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 581 582 /* 583 * Consume the quiesced txg which has been handed off to 584 * us. This may cause the quiescing thread to now be 585 * able to quiesce another txg, so we must signal it. 586 */ 587 ASSERT(tx->tx_quiesced_txg != 0); 588 txg = tx->tx_quiesced_txg; 589 tx->tx_quiesced_txg = 0; 590 tx->tx_syncing_txg = txg; 591 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg); 592 cv_broadcast(&tx->tx_quiesce_more_cv); 593 594 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 595 (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, 596 (u_longlong_t)tx->tx_sync_txg_waiting); 597 mutex_exit(&tx->tx_sync_lock); 598 599 txg_stat_t *ts = spa_txg_history_init_io(spa, txg, dp); 600 start = ddi_get_lbolt(); 601 spa_sync(spa, txg); 602 delta = ddi_get_lbolt() - start; 603 spa_txg_history_fini_io(spa, ts); 604 605 mutex_enter(&tx->tx_sync_lock); 606 tx->tx_synced_txg = txg; 607 tx->tx_syncing_txg = 0; 608 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg); 609 cv_broadcast(&tx->tx_sync_done_cv); 610 611 /* 612 * Dispatch commit callbacks to worker threads. 613 */ 614 txg_dispatch_callbacks(dp, txg); 615 } 616 } 617 618 static __attribute__((noreturn)) void 619 txg_quiesce_thread(void *arg) 620 { 621 dsl_pool_t *dp = arg; 622 tx_state_t *tx = &dp->dp_tx; 623 callb_cpr_t cpr; 624 625 txg_thread_enter(tx, &cpr); 626 627 for (;;) { 628 uint64_t txg; 629 630 /* 631 * We quiesce when there's someone waiting on us. 632 * However, we can only have one txg in "quiescing" or 633 * "quiesced, waiting to sync" state. So we wait until 634 * the "quiesced, waiting to sync" txg has been consumed 635 * by the sync thread. 636 */ 637 while (!tx->tx_exiting && 638 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 639 txg_has_quiesced_to_sync(dp))) 640 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 641 642 if (tx->tx_exiting) 643 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 644 645 txg = tx->tx_open_txg; 646 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 647 (u_longlong_t)txg, 648 (u_longlong_t)tx->tx_quiesce_txg_waiting, 649 (u_longlong_t)tx->tx_sync_txg_waiting); 650 tx->tx_quiescing_txg = txg; 651 652 mutex_exit(&tx->tx_sync_lock); 653 txg_quiesce(dp, txg); 654 mutex_enter(&tx->tx_sync_lock); 655 656 /* 657 * Hand this txg off to the sync thread. 658 */ 659 dprintf("quiesce done, handing off txg %llu\n", 660 (u_longlong_t)txg); 661 tx->tx_quiescing_txg = 0; 662 tx->tx_quiesced_txg = txg; 663 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg); 664 cv_broadcast(&tx->tx_sync_more_cv); 665 cv_broadcast(&tx->tx_quiesce_done_cv); 666 } 667 } 668 669 /* 670 * Delay this thread by delay nanoseconds if we are still in the open 671 * transaction group and there is already a waiting txg quiescing or quiesced. 672 * Abort the delay if this txg stalls or enters the quiescing state. 673 */ 674 void 675 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution) 676 { 677 tx_state_t *tx = &dp->dp_tx; 678 hrtime_t start = gethrtime(); 679 680 /* don't delay if this txg could transition to quiescing immediately */ 681 if (tx->tx_open_txg > txg || 682 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 683 return; 684 685 mutex_enter(&tx->tx_sync_lock); 686 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 687 mutex_exit(&tx->tx_sync_lock); 688 return; 689 } 690 691 while (gethrtime() - start < delay && 692 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) { 693 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv, 694 &tx->tx_sync_lock, delay, resolution, 0); 695 } 696 697 DMU_TX_STAT_BUMP(dmu_tx_delay); 698 699 mutex_exit(&tx->tx_sync_lock); 700 } 701 702 static boolean_t 703 txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig) 704 { 705 tx_state_t *tx = &dp->dp_tx; 706 707 ASSERT(!dsl_pool_config_held(dp)); 708 709 mutex_enter(&tx->tx_sync_lock); 710 ASSERT3U(tx->tx_threads, ==, 2); 711 if (txg == 0) 712 txg = tx->tx_open_txg + TXG_DEFER_SIZE; 713 if (tx->tx_sync_txg_waiting < txg) 714 tx->tx_sync_txg_waiting = txg; 715 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 716 (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, 717 (u_longlong_t)tx->tx_sync_txg_waiting); 718 while (tx->tx_synced_txg < txg) { 719 dprintf("broadcasting sync more " 720 "tx_synced=%llu waiting=%llu dp=%px\n", 721 (u_longlong_t)tx->tx_synced_txg, 722 (u_longlong_t)tx->tx_sync_txg_waiting, dp); 723 cv_broadcast(&tx->tx_sync_more_cv); 724 if (wait_sig) { 725 /* 726 * Condition wait here but stop if the thread receives a 727 * signal. The caller may call txg_wait_synced*() again 728 * to resume waiting for this txg. 729 */ 730 if (cv_wait_io_sig(&tx->tx_sync_done_cv, 731 &tx->tx_sync_lock) == 0) { 732 mutex_exit(&tx->tx_sync_lock); 733 return (B_TRUE); 734 } 735 } else { 736 cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 737 } 738 } 739 mutex_exit(&tx->tx_sync_lock); 740 return (B_FALSE); 741 } 742 743 void 744 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 745 { 746 VERIFY0(txg_wait_synced_impl(dp, txg, B_FALSE)); 747 } 748 749 /* 750 * Similar to a txg_wait_synced but it can be interrupted from a signal. 751 * Returns B_TRUE if the thread was signaled while waiting. 752 */ 753 boolean_t 754 txg_wait_synced_sig(dsl_pool_t *dp, uint64_t txg) 755 { 756 return (txg_wait_synced_impl(dp, txg, B_TRUE)); 757 } 758 759 /* 760 * Wait for the specified open transaction group. Set should_quiesce 761 * when the current open txg should be quiesced immediately. 762 */ 763 void 764 txg_wait_open(dsl_pool_t *dp, uint64_t txg, boolean_t should_quiesce) 765 { 766 tx_state_t *tx = &dp->dp_tx; 767 768 ASSERT(!dsl_pool_config_held(dp)); 769 770 mutex_enter(&tx->tx_sync_lock); 771 ASSERT3U(tx->tx_threads, ==, 2); 772 if (txg == 0) 773 txg = tx->tx_open_txg + 1; 774 if (tx->tx_quiesce_txg_waiting < txg && should_quiesce) 775 tx->tx_quiesce_txg_waiting = txg; 776 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 777 (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, 778 (u_longlong_t)tx->tx_sync_txg_waiting); 779 while (tx->tx_open_txg < txg) { 780 cv_broadcast(&tx->tx_quiesce_more_cv); 781 /* 782 * Callers setting should_quiesce will use cv_wait_io() and 783 * be accounted for as iowait time. Otherwise, the caller is 784 * understood to be idle and cv_wait_sig() is used to prevent 785 * incorrectly inflating the system load average. 786 */ 787 if (should_quiesce == B_TRUE) { 788 cv_wait_io(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 789 } else { 790 cv_wait_idle(&tx->tx_quiesce_done_cv, 791 &tx->tx_sync_lock); 792 } 793 } 794 mutex_exit(&tx->tx_sync_lock); 795 } 796 797 /* 798 * Pass in the txg number that should be synced. 799 */ 800 void 801 txg_kick(dsl_pool_t *dp, uint64_t txg) 802 { 803 tx_state_t *tx = &dp->dp_tx; 804 805 ASSERT(!dsl_pool_config_held(dp)); 806 807 if (tx->tx_sync_txg_waiting >= txg) 808 return; 809 810 mutex_enter(&tx->tx_sync_lock); 811 if (tx->tx_sync_txg_waiting < txg) { 812 tx->tx_sync_txg_waiting = txg; 813 cv_broadcast(&tx->tx_sync_more_cv); 814 } 815 mutex_exit(&tx->tx_sync_lock); 816 } 817 818 boolean_t 819 txg_stalled(dsl_pool_t *dp) 820 { 821 tx_state_t *tx = &dp->dp_tx; 822 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 823 } 824 825 boolean_t 826 txg_sync_waiting(dsl_pool_t *dp) 827 { 828 tx_state_t *tx = &dp->dp_tx; 829 830 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 831 tx->tx_quiesced_txg != 0); 832 } 833 834 /* 835 * Verify that this txg is active (open, quiescing, syncing). Non-active 836 * txg's should not be manipulated. 837 */ 838 #ifdef ZFS_DEBUG 839 void 840 txg_verify(spa_t *spa, uint64_t txg) 841 { 842 dsl_pool_t *dp __maybe_unused = spa_get_dsl(spa); 843 if (txg <= TXG_INITIAL || txg == ZILTEST_TXG) 844 return; 845 ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg); 846 ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg); 847 ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES); 848 } 849 #endif 850 851 /* 852 * Per-txg object lists. 853 */ 854 void 855 txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset) 856 { 857 int t; 858 859 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 860 861 tl->tl_offset = offset; 862 tl->tl_spa = spa; 863 864 for (t = 0; t < TXG_SIZE; t++) 865 tl->tl_head[t] = NULL; 866 } 867 868 static boolean_t 869 txg_list_empty_impl(txg_list_t *tl, uint64_t txg) 870 { 871 ASSERT(MUTEX_HELD(&tl->tl_lock)); 872 TXG_VERIFY(tl->tl_spa, txg); 873 return (tl->tl_head[txg & TXG_MASK] == NULL); 874 } 875 876 boolean_t 877 txg_list_empty(txg_list_t *tl, uint64_t txg) 878 { 879 mutex_enter(&tl->tl_lock); 880 boolean_t ret = txg_list_empty_impl(tl, txg); 881 mutex_exit(&tl->tl_lock); 882 883 return (ret); 884 } 885 886 void 887 txg_list_destroy(txg_list_t *tl) 888 { 889 int t; 890 891 mutex_enter(&tl->tl_lock); 892 for (t = 0; t < TXG_SIZE; t++) 893 ASSERT(txg_list_empty_impl(tl, t)); 894 mutex_exit(&tl->tl_lock); 895 896 mutex_destroy(&tl->tl_lock); 897 } 898 899 /* 900 * Returns true if all txg lists are empty. 901 * 902 * Warning: this is inherently racy (an item could be added immediately 903 * after this function returns). 904 */ 905 boolean_t 906 txg_all_lists_empty(txg_list_t *tl) 907 { 908 boolean_t res = B_TRUE; 909 for (int i = 0; i < TXG_SIZE; i++) 910 res &= (tl->tl_head[i] == NULL); 911 return (res); 912 } 913 914 /* 915 * Add an entry to the list (unless it's already on the list). 916 * Returns B_TRUE if it was actually added. 917 */ 918 boolean_t 919 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 920 { 921 int t = txg & TXG_MASK; 922 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 923 boolean_t add; 924 925 TXG_VERIFY(tl->tl_spa, txg); 926 mutex_enter(&tl->tl_lock); 927 add = (tn->tn_member[t] == 0); 928 if (add) { 929 tn->tn_member[t] = 1; 930 tn->tn_next[t] = tl->tl_head[t]; 931 tl->tl_head[t] = tn; 932 } 933 mutex_exit(&tl->tl_lock); 934 935 return (add); 936 } 937 938 /* 939 * Add an entry to the end of the list, unless it's already on the list. 940 * (walks list to find end) 941 * Returns B_TRUE if it was actually added. 942 */ 943 boolean_t 944 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg) 945 { 946 int t = txg & TXG_MASK; 947 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 948 boolean_t add; 949 950 TXG_VERIFY(tl->tl_spa, txg); 951 mutex_enter(&tl->tl_lock); 952 add = (tn->tn_member[t] == 0); 953 if (add) { 954 txg_node_t **tp; 955 956 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t]) 957 continue; 958 959 tn->tn_member[t] = 1; 960 tn->tn_next[t] = NULL; 961 *tp = tn; 962 } 963 mutex_exit(&tl->tl_lock); 964 965 return (add); 966 } 967 968 /* 969 * Remove the head of the list and return it. 970 */ 971 void * 972 txg_list_remove(txg_list_t *tl, uint64_t txg) 973 { 974 int t = txg & TXG_MASK; 975 txg_node_t *tn; 976 void *p = NULL; 977 978 TXG_VERIFY(tl->tl_spa, txg); 979 mutex_enter(&tl->tl_lock); 980 if ((tn = tl->tl_head[t]) != NULL) { 981 ASSERT(tn->tn_member[t]); 982 ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]); 983 p = (char *)tn - tl->tl_offset; 984 tl->tl_head[t] = tn->tn_next[t]; 985 tn->tn_next[t] = NULL; 986 tn->tn_member[t] = 0; 987 } 988 mutex_exit(&tl->tl_lock); 989 990 return (p); 991 } 992 993 /* 994 * Remove a specific item from the list and return it. 995 */ 996 void * 997 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 998 { 999 int t = txg & TXG_MASK; 1000 txg_node_t *tn, **tp; 1001 1002 TXG_VERIFY(tl->tl_spa, txg); 1003 mutex_enter(&tl->tl_lock); 1004 1005 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 1006 if ((char *)tn - tl->tl_offset == p) { 1007 *tp = tn->tn_next[t]; 1008 tn->tn_next[t] = NULL; 1009 tn->tn_member[t] = 0; 1010 mutex_exit(&tl->tl_lock); 1011 return (p); 1012 } 1013 } 1014 1015 mutex_exit(&tl->tl_lock); 1016 1017 return (NULL); 1018 } 1019 1020 boolean_t 1021 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 1022 { 1023 int t = txg & TXG_MASK; 1024 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 1025 1026 TXG_VERIFY(tl->tl_spa, txg); 1027 return (tn->tn_member[t] != 0); 1028 } 1029 1030 /* 1031 * Walk a txg list 1032 */ 1033 void * 1034 txg_list_head(txg_list_t *tl, uint64_t txg) 1035 { 1036 int t = txg & TXG_MASK; 1037 txg_node_t *tn; 1038 1039 mutex_enter(&tl->tl_lock); 1040 tn = tl->tl_head[t]; 1041 mutex_exit(&tl->tl_lock); 1042 1043 TXG_VERIFY(tl->tl_spa, txg); 1044 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 1045 } 1046 1047 void * 1048 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 1049 { 1050 int t = txg & TXG_MASK; 1051 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 1052 1053 TXG_VERIFY(tl->tl_spa, txg); 1054 1055 mutex_enter(&tl->tl_lock); 1056 tn = tn->tn_next[t]; 1057 mutex_exit(&tl->tl_lock); 1058 1059 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 1060 } 1061 1062 EXPORT_SYMBOL(txg_init); 1063 EXPORT_SYMBOL(txg_fini); 1064 EXPORT_SYMBOL(txg_sync_start); 1065 EXPORT_SYMBOL(txg_sync_stop); 1066 EXPORT_SYMBOL(txg_hold_open); 1067 EXPORT_SYMBOL(txg_rele_to_quiesce); 1068 EXPORT_SYMBOL(txg_rele_to_sync); 1069 EXPORT_SYMBOL(txg_register_callbacks); 1070 EXPORT_SYMBOL(txg_delay); 1071 EXPORT_SYMBOL(txg_wait_synced); 1072 EXPORT_SYMBOL(txg_wait_open); 1073 EXPORT_SYMBOL(txg_wait_callbacks); 1074 EXPORT_SYMBOL(txg_stalled); 1075 EXPORT_SYMBOL(txg_sync_waiting); 1076 1077 ZFS_MODULE_PARAM(zfs_txg, zfs_txg_, timeout, UINT, ZMOD_RW, 1078 "Max seconds worth of delta per txg"); 1079