1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include <sys/zfs_context.h> 29 #include <sys/txg_impl.h> 30 #include <sys/dmu_impl.h> 31 #include <sys/dsl_pool.h> 32 #include <sys/callb.h> 33 34 /* 35 * Pool-wide transaction groups. 36 */ 37 38 static void txg_sync_thread(dsl_pool_t *dp); 39 static void txg_quiesce_thread(dsl_pool_t *dp); 40 41 int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ 42 int zfs_txg_synctime = 5; /* target seconds to sync a txg */ 43 44 int zfs_write_limit_shift = 3; /* 1/8th of physical memory */ 45 46 uint64_t zfs_write_limit_min = 32 << 20; /* min write limit is 32MB */ 47 uint64_t zfs_write_limit_max = 0; /* max data payload per txg */ 48 uint64_t zfs_write_limit_inflated = 0; 49 50 /* 51 * Prepare the txg subsystem. 52 */ 53 void 54 txg_init(dsl_pool_t *dp, uint64_t txg) 55 { 56 tx_state_t *tx = &dp->dp_tx; 57 int c; 58 bzero(tx, sizeof (tx_state_t)); 59 60 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 61 62 for (c = 0; c < max_ncpus; c++) { 63 int i; 64 65 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 66 for (i = 0; i < TXG_SIZE; i++) { 67 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 68 NULL); 69 } 70 } 71 72 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 73 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 74 75 tx->tx_open_txg = txg; 76 } 77 78 /* 79 * Close down the txg subsystem. 80 */ 81 void 82 txg_fini(dsl_pool_t *dp) 83 { 84 tx_state_t *tx = &dp->dp_tx; 85 int c; 86 87 ASSERT(tx->tx_threads == 0); 88 89 rw_destroy(&tx->tx_suspend); 90 mutex_destroy(&tx->tx_sync_lock); 91 92 for (c = 0; c < max_ncpus; c++) { 93 int i; 94 95 mutex_destroy(&tx->tx_cpu[c].tc_lock); 96 for (i = 0; i < TXG_SIZE; i++) 97 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 98 } 99 100 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 101 102 bzero(tx, sizeof (tx_state_t)); 103 } 104 105 /* 106 * Start syncing transaction groups. 107 */ 108 void 109 txg_sync_start(dsl_pool_t *dp) 110 { 111 tx_state_t *tx = &dp->dp_tx; 112 113 mutex_enter(&tx->tx_sync_lock); 114 115 dprintf("pool %p\n", dp); 116 117 ASSERT(tx->tx_threads == 0); 118 119 tx->tx_threads = 2; 120 121 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 122 dp, 0, &p0, TS_RUN, minclsyspri); 123 124 /* 125 * The sync thread can need a larger-than-default stack size on 126 * 32-bit x86. This is due in part to nested pools and 127 * scrub_visitbp() recursion. 128 */ 129 tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread, 130 dp, 0, &p0, TS_RUN, minclsyspri); 131 132 mutex_exit(&tx->tx_sync_lock); 133 } 134 135 static void 136 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 137 { 138 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 139 mutex_enter(&tx->tx_sync_lock); 140 } 141 142 static void 143 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 144 { 145 ASSERT(*tpp != NULL); 146 *tpp = NULL; 147 tx->tx_threads--; 148 cv_broadcast(&tx->tx_exit_cv); 149 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 150 thread_exit(); 151 } 152 153 static void 154 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 155 { 156 CALLB_CPR_SAFE_BEGIN(cpr); 157 158 if (time) 159 (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time); 160 else 161 cv_wait(cv, &tx->tx_sync_lock); 162 163 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 164 } 165 166 /* 167 * Stop syncing transaction groups. 168 */ 169 void 170 txg_sync_stop(dsl_pool_t *dp) 171 { 172 tx_state_t *tx = &dp->dp_tx; 173 174 dprintf("pool %p\n", dp); 175 /* 176 * Finish off any work in progress. 177 */ 178 ASSERT(tx->tx_threads == 2); 179 txg_wait_synced(dp, 0); 180 181 /* 182 * Wake all sync threads and wait for them to die. 183 */ 184 mutex_enter(&tx->tx_sync_lock); 185 186 ASSERT(tx->tx_threads == 2); 187 188 tx->tx_exiting = 1; 189 190 cv_broadcast(&tx->tx_quiesce_more_cv); 191 cv_broadcast(&tx->tx_quiesce_done_cv); 192 cv_broadcast(&tx->tx_sync_more_cv); 193 194 while (tx->tx_threads != 0) 195 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 196 197 tx->tx_exiting = 0; 198 199 mutex_exit(&tx->tx_sync_lock); 200 } 201 202 uint64_t 203 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 204 { 205 tx_state_t *tx = &dp->dp_tx; 206 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 207 uint64_t txg; 208 209 mutex_enter(&tc->tc_lock); 210 211 txg = tx->tx_open_txg; 212 tc->tc_count[txg & TXG_MASK]++; 213 214 th->th_cpu = tc; 215 th->th_txg = txg; 216 217 return (txg); 218 } 219 220 void 221 txg_rele_to_quiesce(txg_handle_t *th) 222 { 223 tx_cpu_t *tc = th->th_cpu; 224 225 mutex_exit(&tc->tc_lock); 226 } 227 228 void 229 txg_rele_to_sync(txg_handle_t *th) 230 { 231 tx_cpu_t *tc = th->th_cpu; 232 int g = th->th_txg & TXG_MASK; 233 234 mutex_enter(&tc->tc_lock); 235 ASSERT(tc->tc_count[g] != 0); 236 if (--tc->tc_count[g] == 0) 237 cv_broadcast(&tc->tc_cv[g]); 238 mutex_exit(&tc->tc_lock); 239 240 th->th_cpu = NULL; /* defensive */ 241 } 242 243 static void 244 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 245 { 246 tx_state_t *tx = &dp->dp_tx; 247 int g = txg & TXG_MASK; 248 int c; 249 250 /* 251 * Grab all tx_cpu locks so nobody else can get into this txg. 252 */ 253 for (c = 0; c < max_ncpus; c++) 254 mutex_enter(&tx->tx_cpu[c].tc_lock); 255 256 ASSERT(txg == tx->tx_open_txg); 257 tx->tx_open_txg++; 258 259 /* 260 * Now that we've incremented tx_open_txg, we can let threads 261 * enter the next transaction group. 262 */ 263 for (c = 0; c < max_ncpus; c++) 264 mutex_exit(&tx->tx_cpu[c].tc_lock); 265 266 /* 267 * Quiesce the transaction group by waiting for everyone to txg_exit(). 268 */ 269 for (c = 0; c < max_ncpus; c++) { 270 tx_cpu_t *tc = &tx->tx_cpu[c]; 271 mutex_enter(&tc->tc_lock); 272 while (tc->tc_count[g] != 0) 273 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 274 mutex_exit(&tc->tc_lock); 275 } 276 } 277 278 static void 279 txg_sync_thread(dsl_pool_t *dp) 280 { 281 tx_state_t *tx = &dp->dp_tx; 282 callb_cpr_t cpr; 283 uint64_t timeout, start, delta, timer; 284 int target; 285 286 txg_thread_enter(tx, &cpr); 287 288 start = delta = 0; 289 timeout = zfs_txg_timeout * hz; 290 for (;;) { 291 uint64_t txg, written; 292 293 /* 294 * We sync when there's someone waiting on us, or the 295 * quiesce thread has handed off a txg to us, or we have 296 * reached our timeout. 297 */ 298 timer = (delta >= timeout ? 0 : timeout - delta); 299 while (!tx->tx_exiting && timer > 0 && 300 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 301 tx->tx_quiesced_txg == 0) { 302 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 303 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 304 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 305 delta = lbolt - start; 306 timer = (delta > timeout ? 0 : timeout - delta); 307 } 308 309 /* 310 * Wait until the quiesce thread hands off a txg to us, 311 * prompting it to do so if necessary. 312 */ 313 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 314 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 315 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 316 cv_broadcast(&tx->tx_quiesce_more_cv); 317 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 318 } 319 320 if (tx->tx_exiting) 321 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 322 323 rw_enter(&tx->tx_suspend, RW_WRITER); 324 325 /* 326 * Consume the quiesced txg which has been handed off to 327 * us. This may cause the quiescing thread to now be 328 * able to quiesce another txg, so we must signal it. 329 */ 330 txg = tx->tx_quiesced_txg; 331 tx->tx_quiesced_txg = 0; 332 tx->tx_syncing_txg = txg; 333 cv_broadcast(&tx->tx_quiesce_more_cv); 334 rw_exit(&tx->tx_suspend); 335 336 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 337 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 338 mutex_exit(&tx->tx_sync_lock); 339 start = lbolt; 340 spa_sync(dp->dp_spa, txg); 341 delta = (lbolt - start) + 1; 342 343 written = dp->dp_space_towrite[txg & TXG_MASK]; 344 dp->dp_space_towrite[txg & TXG_MASK] = 0; 345 ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0); 346 347 /* 348 * If the write limit max has not been explicitly set, set it 349 * to a fraction of available phisical memory (default 1/8th). 350 * Note that we must inflate the limit because the spa 351 * inflates write sizes to account for data replication. 352 * Check this each sync phase to catch changing memory size. 353 */ 354 if (zfs_write_limit_inflated == 0 || 355 (zfs_write_limit_shift && zfs_write_limit_max != 356 physmem * PAGESIZE >> zfs_write_limit_shift)) { 357 zfs_write_limit_max = 358 physmem * PAGESIZE >> zfs_write_limit_shift; 359 zfs_write_limit_inflated = 360 spa_get_asize(dp->dp_spa, zfs_write_limit_max); 361 if (zfs_write_limit_min > zfs_write_limit_inflated) 362 zfs_write_limit_inflated = zfs_write_limit_min; 363 } 364 365 /* 366 * Attempt to keep the sync time consistant by adjusting the 367 * amount of write traffic allowed into each transaction group. 368 */ 369 target = zfs_txg_synctime * hz; 370 if (delta > target) { 371 uint64_t old = MIN(dp->dp_write_limit, written); 372 373 dp->dp_write_limit = MAX(zfs_write_limit_min, 374 old * target / delta); 375 } else if (written >= dp->dp_write_limit && 376 delta >> 3 < target >> 3) { 377 uint64_t rescale = 378 MIN((100 * target) / delta, 200); 379 380 dp->dp_write_limit = MIN(zfs_write_limit_inflated, 381 written * rescale / 100); 382 } 383 384 mutex_enter(&tx->tx_sync_lock); 385 rw_enter(&tx->tx_suspend, RW_WRITER); 386 tx->tx_synced_txg = txg; 387 tx->tx_syncing_txg = 0; 388 rw_exit(&tx->tx_suspend); 389 cv_broadcast(&tx->tx_sync_done_cv); 390 } 391 } 392 393 static void 394 txg_quiesce_thread(dsl_pool_t *dp) 395 { 396 tx_state_t *tx = &dp->dp_tx; 397 callb_cpr_t cpr; 398 399 txg_thread_enter(tx, &cpr); 400 401 for (;;) { 402 uint64_t txg; 403 404 /* 405 * We quiesce when there's someone waiting on us. 406 * However, we can only have one txg in "quiescing" or 407 * "quiesced, waiting to sync" state. So we wait until 408 * the "quiesced, waiting to sync" txg has been consumed 409 * by the sync thread. 410 */ 411 while (!tx->tx_exiting && 412 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 413 tx->tx_quiesced_txg != 0)) 414 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 415 416 if (tx->tx_exiting) 417 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 418 419 txg = tx->tx_open_txg; 420 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 421 txg, tx->tx_quiesce_txg_waiting, 422 tx->tx_sync_txg_waiting); 423 mutex_exit(&tx->tx_sync_lock); 424 txg_quiesce(dp, txg); 425 mutex_enter(&tx->tx_sync_lock); 426 427 /* 428 * Hand this txg off to the sync thread. 429 */ 430 dprintf("quiesce done, handing off txg %llu\n", txg); 431 tx->tx_quiesced_txg = txg; 432 cv_broadcast(&tx->tx_sync_more_cv); 433 cv_broadcast(&tx->tx_quiesce_done_cv); 434 } 435 } 436 437 /* 438 * Delay this thread by 'ticks' if we are still in the open transaction 439 * group and there is already a waiting txg quiesing or quiesced. Abort 440 * the delay if this txg stalls or enters the quiesing state. 441 */ 442 void 443 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 444 { 445 tx_state_t *tx = &dp->dp_tx; 446 int timeout = lbolt + ticks; 447 448 /* don't delay if this txg could transition to quiesing immediately */ 449 if (tx->tx_open_txg > txg || 450 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 451 return; 452 453 mutex_enter(&tx->tx_sync_lock); 454 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 455 mutex_exit(&tx->tx_sync_lock); 456 return; 457 } 458 459 while (lbolt < timeout && 460 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 461 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 462 timeout); 463 464 mutex_exit(&tx->tx_sync_lock); 465 } 466 467 void 468 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 469 { 470 tx_state_t *tx = &dp->dp_tx; 471 472 mutex_enter(&tx->tx_sync_lock); 473 ASSERT(tx->tx_threads == 2); 474 if (txg == 0) 475 txg = tx->tx_open_txg; 476 if (tx->tx_sync_txg_waiting < txg) 477 tx->tx_sync_txg_waiting = txg; 478 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 479 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 480 while (tx->tx_synced_txg < txg) { 481 dprintf("broadcasting sync more " 482 "tx_synced=%llu waiting=%llu dp=%p\n", 483 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 484 cv_broadcast(&tx->tx_sync_more_cv); 485 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 486 } 487 mutex_exit(&tx->tx_sync_lock); 488 } 489 490 void 491 txg_wait_open(dsl_pool_t *dp, uint64_t txg) 492 { 493 tx_state_t *tx = &dp->dp_tx; 494 495 mutex_enter(&tx->tx_sync_lock); 496 ASSERT(tx->tx_threads == 2); 497 if (txg == 0) 498 txg = tx->tx_open_txg + 1; 499 if (tx->tx_quiesce_txg_waiting < txg) 500 tx->tx_quiesce_txg_waiting = txg; 501 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 502 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 503 while (tx->tx_open_txg < txg) { 504 cv_broadcast(&tx->tx_quiesce_more_cv); 505 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 506 } 507 mutex_exit(&tx->tx_sync_lock); 508 } 509 510 boolean_t 511 txg_stalled(dsl_pool_t *dp) 512 { 513 tx_state_t *tx = &dp->dp_tx; 514 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 515 } 516 517 boolean_t 518 txg_sync_waiting(dsl_pool_t *dp) 519 { 520 tx_state_t *tx = &dp->dp_tx; 521 522 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 523 tx->tx_quiesced_txg != 0); 524 } 525 526 void 527 txg_suspend(dsl_pool_t *dp) 528 { 529 tx_state_t *tx = &dp->dp_tx; 530 /* XXX some code paths suspend when they are already suspended! */ 531 rw_enter(&tx->tx_suspend, RW_READER); 532 } 533 534 void 535 txg_resume(dsl_pool_t *dp) 536 { 537 tx_state_t *tx = &dp->dp_tx; 538 rw_exit(&tx->tx_suspend); 539 } 540 541 /* 542 * Per-txg object lists. 543 */ 544 void 545 txg_list_create(txg_list_t *tl, size_t offset) 546 { 547 int t; 548 549 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 550 551 tl->tl_offset = offset; 552 553 for (t = 0; t < TXG_SIZE; t++) 554 tl->tl_head[t] = NULL; 555 } 556 557 void 558 txg_list_destroy(txg_list_t *tl) 559 { 560 int t; 561 562 for (t = 0; t < TXG_SIZE; t++) 563 ASSERT(txg_list_empty(tl, t)); 564 565 mutex_destroy(&tl->tl_lock); 566 } 567 568 int 569 txg_list_empty(txg_list_t *tl, uint64_t txg) 570 { 571 return (tl->tl_head[txg & TXG_MASK] == NULL); 572 } 573 574 /* 575 * Add an entry to the list. 576 * Returns 0 if it's a new entry, 1 if it's already there. 577 */ 578 int 579 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 580 { 581 int t = txg & TXG_MASK; 582 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 583 int already_on_list; 584 585 mutex_enter(&tl->tl_lock); 586 already_on_list = tn->tn_member[t]; 587 if (!already_on_list) { 588 tn->tn_member[t] = 1; 589 tn->tn_next[t] = tl->tl_head[t]; 590 tl->tl_head[t] = tn; 591 } 592 mutex_exit(&tl->tl_lock); 593 594 return (already_on_list); 595 } 596 597 /* 598 * Remove the head of the list and return it. 599 */ 600 void * 601 txg_list_remove(txg_list_t *tl, uint64_t txg) 602 { 603 int t = txg & TXG_MASK; 604 txg_node_t *tn; 605 void *p = NULL; 606 607 mutex_enter(&tl->tl_lock); 608 if ((tn = tl->tl_head[t]) != NULL) { 609 p = (char *)tn - tl->tl_offset; 610 tl->tl_head[t] = tn->tn_next[t]; 611 tn->tn_next[t] = NULL; 612 tn->tn_member[t] = 0; 613 } 614 mutex_exit(&tl->tl_lock); 615 616 return (p); 617 } 618 619 /* 620 * Remove a specific item from the list and return it. 621 */ 622 void * 623 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 624 { 625 int t = txg & TXG_MASK; 626 txg_node_t *tn, **tp; 627 628 mutex_enter(&tl->tl_lock); 629 630 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 631 if ((char *)tn - tl->tl_offset == p) { 632 *tp = tn->tn_next[t]; 633 tn->tn_next[t] = NULL; 634 tn->tn_member[t] = 0; 635 mutex_exit(&tl->tl_lock); 636 return (p); 637 } 638 } 639 640 mutex_exit(&tl->tl_lock); 641 642 return (NULL); 643 } 644 645 int 646 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 647 { 648 int t = txg & TXG_MASK; 649 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 650 651 return (tn->tn_member[t]); 652 } 653 654 /* 655 * Walk a txg list -- only safe if you know it's not changing. 656 */ 657 void * 658 txg_list_head(txg_list_t *tl, uint64_t txg) 659 { 660 int t = txg & TXG_MASK; 661 txg_node_t *tn = tl->tl_head[t]; 662 663 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 664 } 665 666 void * 667 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 668 { 669 int t = txg & TXG_MASK; 670 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 671 672 tn = tn->tn_next[t]; 673 674 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 675 } 676