1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include <sys/zfs_context.h> 29 #include <sys/txg_impl.h> 30 #include <sys/dmu_impl.h> 31 #include <sys/dsl_pool.h> 32 #include <sys/callb.h> 33 34 /* 35 * Pool-wide transaction groups. 36 */ 37 38 static void txg_sync_thread(dsl_pool_t *dp); 39 static void txg_quiesce_thread(dsl_pool_t *dp); 40 static void txg_timelimit_thread(dsl_pool_t *dp); 41 42 int txg_time = 5; /* max 5 seconds worth of delta per txg */ 43 44 /* 45 * Prepare the txg subsystem. 46 */ 47 void 48 txg_init(dsl_pool_t *dp, uint64_t txg) 49 { 50 tx_state_t *tx = &dp->dp_tx; 51 int c; 52 bzero(tx, sizeof (tx_state_t)); 53 54 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 55 56 for (c = 0; c < max_ncpus; c++) { 57 int i; 58 59 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 60 for (i = 0; i < TXG_SIZE; i++) { 61 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 62 NULL); 63 } 64 } 65 66 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 67 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 68 69 tx->tx_open_txg = txg; 70 } 71 72 /* 73 * Close down the txg subsystem. 74 */ 75 void 76 txg_fini(dsl_pool_t *dp) 77 { 78 tx_state_t *tx = &dp->dp_tx; 79 int c; 80 81 ASSERT(tx->tx_threads == 0); 82 83 rw_destroy(&tx->tx_suspend); 84 mutex_destroy(&tx->tx_sync_lock); 85 86 for (c = 0; c < max_ncpus; c++) { 87 int i; 88 89 mutex_destroy(&tx->tx_cpu[c].tc_lock); 90 for (i = 0; i < TXG_SIZE; i++) 91 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 92 } 93 94 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 95 96 bzero(tx, sizeof (tx_state_t)); 97 } 98 99 /* 100 * Start syncing transaction groups. 101 */ 102 void 103 txg_sync_start(dsl_pool_t *dp) 104 { 105 tx_state_t *tx = &dp->dp_tx; 106 107 mutex_enter(&tx->tx_sync_lock); 108 109 dprintf("pool %p\n", dp); 110 111 ASSERT(tx->tx_threads == 0); 112 113 tx->tx_threads = 3; 114 115 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 116 dp, 0, &p0, TS_RUN, minclsyspri); 117 118 tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 119 dp, 0, &p0, TS_RUN, minclsyspri); 120 121 tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread, 122 dp, 0, &p0, TS_RUN, minclsyspri); 123 124 mutex_exit(&tx->tx_sync_lock); 125 } 126 127 static void 128 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 129 { 130 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 131 mutex_enter(&tx->tx_sync_lock); 132 } 133 134 static void 135 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 136 { 137 ASSERT(*tpp != NULL); 138 *tpp = NULL; 139 tx->tx_threads--; 140 cv_broadcast(&tx->tx_exit_cv); 141 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 142 thread_exit(); 143 } 144 145 static void 146 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax) 147 { 148 CALLB_CPR_SAFE_BEGIN(cpr); 149 150 if (secmax) 151 (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz); 152 else 153 cv_wait(cv, &tx->tx_sync_lock); 154 155 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 156 } 157 158 /* 159 * Stop syncing transaction groups. 160 */ 161 void 162 txg_sync_stop(dsl_pool_t *dp) 163 { 164 tx_state_t *tx = &dp->dp_tx; 165 166 dprintf("pool %p\n", dp); 167 /* 168 * Finish off any work in progress. 169 */ 170 ASSERT(tx->tx_threads == 3); 171 txg_wait_synced(dp, 0); 172 173 /* 174 * Wake all 3 sync threads (one per state) and wait for them to die. 175 */ 176 mutex_enter(&tx->tx_sync_lock); 177 178 ASSERT(tx->tx_threads == 3); 179 180 tx->tx_exiting = 1; 181 182 cv_broadcast(&tx->tx_quiesce_more_cv); 183 cv_broadcast(&tx->tx_quiesce_done_cv); 184 cv_broadcast(&tx->tx_sync_more_cv); 185 cv_broadcast(&tx->tx_timeout_exit_cv); 186 187 while (tx->tx_threads != 0) 188 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 189 190 tx->tx_exiting = 0; 191 192 mutex_exit(&tx->tx_sync_lock); 193 } 194 195 uint64_t 196 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 197 { 198 tx_state_t *tx = &dp->dp_tx; 199 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 200 uint64_t txg; 201 202 mutex_enter(&tc->tc_lock); 203 204 txg = tx->tx_open_txg; 205 tc->tc_count[txg & TXG_MASK]++; 206 207 th->th_cpu = tc; 208 th->th_txg = txg; 209 210 return (txg); 211 } 212 213 void 214 txg_rele_to_quiesce(txg_handle_t *th) 215 { 216 tx_cpu_t *tc = th->th_cpu; 217 218 mutex_exit(&tc->tc_lock); 219 } 220 221 void 222 txg_rele_to_sync(txg_handle_t *th) 223 { 224 tx_cpu_t *tc = th->th_cpu; 225 int g = th->th_txg & TXG_MASK; 226 227 mutex_enter(&tc->tc_lock); 228 ASSERT(tc->tc_count[g] != 0); 229 if (--tc->tc_count[g] == 0) 230 cv_broadcast(&tc->tc_cv[g]); 231 mutex_exit(&tc->tc_lock); 232 233 th->th_cpu = NULL; /* defensive */ 234 } 235 236 static void 237 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 238 { 239 tx_state_t *tx = &dp->dp_tx; 240 int g = txg & TXG_MASK; 241 int c; 242 243 /* 244 * Grab all tx_cpu locks so nobody else can get into this txg. 245 */ 246 for (c = 0; c < max_ncpus; c++) 247 mutex_enter(&tx->tx_cpu[c].tc_lock); 248 249 ASSERT(txg == tx->tx_open_txg); 250 tx->tx_open_txg++; 251 252 /* 253 * Now that we've incremented tx_open_txg, we can let threads 254 * enter the next transaction group. 255 */ 256 for (c = 0; c < max_ncpus; c++) 257 mutex_exit(&tx->tx_cpu[c].tc_lock); 258 259 /* 260 * Quiesce the transaction group by waiting for everyone to txg_exit(). 261 */ 262 for (c = 0; c < max_ncpus; c++) { 263 tx_cpu_t *tc = &tx->tx_cpu[c]; 264 mutex_enter(&tc->tc_lock); 265 while (tc->tc_count[g] != 0) 266 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 267 mutex_exit(&tc->tc_lock); 268 } 269 } 270 271 static void 272 txg_sync_thread(dsl_pool_t *dp) 273 { 274 tx_state_t *tx = &dp->dp_tx; 275 callb_cpr_t cpr; 276 277 txg_thread_enter(tx, &cpr); 278 279 for (;;) { 280 uint64_t txg; 281 282 /* 283 * We sync when there's someone waiting on us, or the 284 * quiesce thread has handed off a txg to us. 285 */ 286 while (!tx->tx_exiting && 287 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 288 tx->tx_quiesced_txg == 0) { 289 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 290 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 291 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0); 292 } 293 294 /* 295 * Wait until the quiesce thread hands off a txg to us, 296 * prompting it to do so if necessary. 297 */ 298 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 299 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 300 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 301 cv_broadcast(&tx->tx_quiesce_more_cv); 302 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 303 } 304 305 if (tx->tx_exiting) 306 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 307 308 rw_enter(&tx->tx_suspend, RW_WRITER); 309 310 /* 311 * Consume the quiesced txg which has been handed off to 312 * us. This may cause the quiescing thread to now be 313 * able to quiesce another txg, so we must signal it. 314 */ 315 txg = tx->tx_quiesced_txg; 316 tx->tx_quiesced_txg = 0; 317 tx->tx_syncing_txg = txg; 318 cv_broadcast(&tx->tx_quiesce_more_cv); 319 rw_exit(&tx->tx_suspend); 320 321 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 322 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 323 mutex_exit(&tx->tx_sync_lock); 324 spa_sync(dp->dp_spa, txg); 325 mutex_enter(&tx->tx_sync_lock); 326 rw_enter(&tx->tx_suspend, RW_WRITER); 327 tx->tx_synced_txg = txg; 328 tx->tx_syncing_txg = 0; 329 rw_exit(&tx->tx_suspend); 330 cv_broadcast(&tx->tx_sync_done_cv); 331 } 332 } 333 334 static void 335 txg_quiesce_thread(dsl_pool_t *dp) 336 { 337 tx_state_t *tx = &dp->dp_tx; 338 callb_cpr_t cpr; 339 340 txg_thread_enter(tx, &cpr); 341 342 for (;;) { 343 uint64_t txg; 344 345 /* 346 * We quiesce when there's someone waiting on us. 347 * However, we can only have one txg in "quiescing" or 348 * "quiesced, waiting to sync" state. So we wait until 349 * the "quiesced, waiting to sync" txg has been consumed 350 * by the sync thread. 351 */ 352 while (!tx->tx_exiting && 353 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 354 tx->tx_quiesced_txg != 0)) 355 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 356 357 if (tx->tx_exiting) 358 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 359 360 txg = tx->tx_open_txg; 361 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 362 txg, tx->tx_quiesce_txg_waiting, 363 tx->tx_sync_txg_waiting); 364 mutex_exit(&tx->tx_sync_lock); 365 txg_quiesce(dp, txg); 366 mutex_enter(&tx->tx_sync_lock); 367 368 /* 369 * Hand this txg off to the sync thread. 370 */ 371 dprintf("quiesce done, handing off txg %llu\n", txg); 372 tx->tx_quiesced_txg = txg; 373 cv_broadcast(&tx->tx_sync_more_cv); 374 cv_broadcast(&tx->tx_quiesce_done_cv); 375 } 376 } 377 378 void 379 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 380 { 381 tx_state_t *tx = &dp->dp_tx; 382 383 mutex_enter(&tx->tx_sync_lock); 384 ASSERT(tx->tx_threads == 3); 385 if (txg == 0) 386 txg = tx->tx_open_txg; 387 if (tx->tx_sync_txg_waiting < txg) 388 tx->tx_sync_txg_waiting = txg; 389 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 390 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 391 while (tx->tx_synced_txg < txg) { 392 dprintf("broadcasting sync more " 393 "tx_synced=%llu waiting=%llu dp=%p\n", 394 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 395 cv_broadcast(&tx->tx_sync_more_cv); 396 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 397 } 398 mutex_exit(&tx->tx_sync_lock); 399 } 400 401 void 402 txg_wait_open(dsl_pool_t *dp, uint64_t txg) 403 { 404 tx_state_t *tx = &dp->dp_tx; 405 406 mutex_enter(&tx->tx_sync_lock); 407 ASSERT(tx->tx_threads == 3); 408 if (txg == 0) 409 txg = tx->tx_open_txg + 1; 410 if (tx->tx_quiesce_txg_waiting < txg) 411 tx->tx_quiesce_txg_waiting = txg; 412 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 413 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 414 while (tx->tx_open_txg < txg) { 415 cv_broadcast(&tx->tx_quiesce_more_cv); 416 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 417 } 418 mutex_exit(&tx->tx_sync_lock); 419 } 420 421 static void 422 txg_timelimit_thread(dsl_pool_t *dp) 423 { 424 tx_state_t *tx = &dp->dp_tx; 425 callb_cpr_t cpr; 426 427 txg_thread_enter(tx, &cpr); 428 429 while (!tx->tx_exiting) { 430 uint64_t txg = tx->tx_open_txg + 1; 431 432 txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time); 433 434 if (tx->tx_quiesce_txg_waiting < txg) 435 tx->tx_quiesce_txg_waiting = txg; 436 437 while (!tx->tx_exiting && tx->tx_open_txg < txg) { 438 dprintf("pushing out %llu\n", txg); 439 cv_broadcast(&tx->tx_quiesce_more_cv); 440 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 441 } 442 } 443 txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread); 444 } 445 446 int 447 txg_stalled(dsl_pool_t *dp) 448 { 449 tx_state_t *tx = &dp->dp_tx; 450 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 451 } 452 453 void 454 txg_suspend(dsl_pool_t *dp) 455 { 456 tx_state_t *tx = &dp->dp_tx; 457 /* XXX some code paths suspend when they are already suspended! */ 458 rw_enter(&tx->tx_suspend, RW_READER); 459 } 460 461 void 462 txg_resume(dsl_pool_t *dp) 463 { 464 tx_state_t *tx = &dp->dp_tx; 465 rw_exit(&tx->tx_suspend); 466 } 467 468 /* 469 * Per-txg object lists. 470 */ 471 void 472 txg_list_create(txg_list_t *tl, size_t offset) 473 { 474 int t; 475 476 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 477 478 tl->tl_offset = offset; 479 480 for (t = 0; t < TXG_SIZE; t++) 481 tl->tl_head[t] = NULL; 482 } 483 484 void 485 txg_list_destroy(txg_list_t *tl) 486 { 487 int t; 488 489 for (t = 0; t < TXG_SIZE; t++) 490 ASSERT(txg_list_empty(tl, t)); 491 492 mutex_destroy(&tl->tl_lock); 493 } 494 495 int 496 txg_list_empty(txg_list_t *tl, uint64_t txg) 497 { 498 return (tl->tl_head[txg & TXG_MASK] == NULL); 499 } 500 501 /* 502 * Add an entry to the list. 503 * Returns 0 if it's a new entry, 1 if it's already there. 504 */ 505 int 506 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 507 { 508 int t = txg & TXG_MASK; 509 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 510 int already_on_list; 511 512 mutex_enter(&tl->tl_lock); 513 already_on_list = tn->tn_member[t]; 514 if (!already_on_list) { 515 tn->tn_member[t] = 1; 516 tn->tn_next[t] = tl->tl_head[t]; 517 tl->tl_head[t] = tn; 518 } 519 mutex_exit(&tl->tl_lock); 520 521 return (already_on_list); 522 } 523 524 /* 525 * Remove the head of the list and return it. 526 */ 527 void * 528 txg_list_remove(txg_list_t *tl, uint64_t txg) 529 { 530 int t = txg & TXG_MASK; 531 txg_node_t *tn; 532 void *p = NULL; 533 534 mutex_enter(&tl->tl_lock); 535 if ((tn = tl->tl_head[t]) != NULL) { 536 p = (char *)tn - tl->tl_offset; 537 tl->tl_head[t] = tn->tn_next[t]; 538 tn->tn_next[t] = NULL; 539 tn->tn_member[t] = 0; 540 } 541 mutex_exit(&tl->tl_lock); 542 543 return (p); 544 } 545 546 /* 547 * Remove a specific item from the list and return it. 548 */ 549 void * 550 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 551 { 552 int t = txg & TXG_MASK; 553 txg_node_t *tn, **tp; 554 555 mutex_enter(&tl->tl_lock); 556 557 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 558 if ((char *)tn - tl->tl_offset == p) { 559 *tp = tn->tn_next[t]; 560 tn->tn_next[t] = NULL; 561 tn->tn_member[t] = 0; 562 mutex_exit(&tl->tl_lock); 563 return (p); 564 } 565 } 566 567 mutex_exit(&tl->tl_lock); 568 569 return (NULL); 570 } 571 572 int 573 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 574 { 575 int t = txg & TXG_MASK; 576 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 577 578 return (tn->tn_member[t]); 579 } 580 581 /* 582 * Walk a txg list -- only safe if you know it's not changing. 583 */ 584 void * 585 txg_list_head(txg_list_t *tl, uint64_t txg) 586 { 587 int t = txg & TXG_MASK; 588 txg_node_t *tn = tl->tl_head[t]; 589 590 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 591 } 592 593 void * 594 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 595 { 596 int t = txg & TXG_MASK; 597 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 598 599 tn = tn->tn_next[t]; 600 601 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 602 } 603