1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #pragma ident "%Z%%M% %I% %E% SMI" 27 28 #include <sys/zfs_context.h> 29 #include <sys/txg_impl.h> 30 #include <sys/dmu_impl.h> 31 #include <sys/dsl_pool.h> 32 #include <sys/callb.h> 33 34 /* 35 * Pool-wide transaction groups. 36 */ 37 38 static void txg_sync_thread(dsl_pool_t *dp); 39 static void txg_quiesce_thread(dsl_pool_t *dp); 40 static void txg_timelimit_thread(dsl_pool_t *dp); 41 42 int txg_time = 5; /* max 5 seconds worth of delta per txg */ 43 44 /* 45 * Prepare the txg subsystem. 46 */ 47 void 48 txg_init(dsl_pool_t *dp, uint64_t txg) 49 { 50 tx_state_t *tx = &dp->dp_tx; 51 int c; 52 bzero(tx, sizeof (tx_state_t)); 53 54 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 55 56 for (c = 0; c < max_ncpus; c++) 57 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 58 59 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 60 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 61 62 tx->tx_open_txg = txg; 63 } 64 65 /* 66 * Close down the txg subsystem. 67 */ 68 void 69 txg_fini(dsl_pool_t *dp) 70 { 71 tx_state_t *tx = &dp->dp_tx; 72 int c; 73 74 ASSERT(tx->tx_threads == 0); 75 76 rw_destroy(&tx->tx_suspend); 77 mutex_destroy(&tx->tx_sync_lock); 78 79 for (c = 0; c < max_ncpus; c++) 80 mutex_destroy(&tx->tx_cpu[c].tc_lock); 81 82 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 83 84 bzero(tx, sizeof (tx_state_t)); 85 } 86 87 /* 88 * Start syncing transaction groups. 89 */ 90 void 91 txg_sync_start(dsl_pool_t *dp) 92 { 93 tx_state_t *tx = &dp->dp_tx; 94 95 mutex_enter(&tx->tx_sync_lock); 96 97 dprintf("pool %p\n", dp); 98 99 ASSERT(tx->tx_threads == 0); 100 101 tx->tx_threads = 3; 102 103 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 104 dp, 0, &p0, TS_RUN, minclsyspri); 105 106 tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 107 dp, 0, &p0, TS_RUN, minclsyspri); 108 109 tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread, 110 dp, 0, &p0, TS_RUN, minclsyspri); 111 112 mutex_exit(&tx->tx_sync_lock); 113 } 114 115 static void 116 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 117 { 118 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 119 mutex_enter(&tx->tx_sync_lock); 120 } 121 122 static void 123 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 124 { 125 ASSERT(*tpp != NULL); 126 *tpp = NULL; 127 tx->tx_threads--; 128 cv_broadcast(&tx->tx_exit_cv); 129 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 130 thread_exit(); 131 } 132 133 static void 134 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax) 135 { 136 CALLB_CPR_SAFE_BEGIN(cpr); 137 138 if (secmax) 139 (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz); 140 else 141 cv_wait(cv, &tx->tx_sync_lock); 142 143 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 144 } 145 146 /* 147 * Stop syncing transaction groups. 148 */ 149 void 150 txg_sync_stop(dsl_pool_t *dp) 151 { 152 tx_state_t *tx = &dp->dp_tx; 153 154 dprintf("pool %p\n", dp); 155 /* 156 * Finish off any work in progress. 157 */ 158 ASSERT(tx->tx_threads == 3); 159 txg_wait_synced(dp, 0); 160 161 /* 162 * Wake all 3 sync threads (one per state) and wait for them to die. 163 */ 164 mutex_enter(&tx->tx_sync_lock); 165 166 ASSERT(tx->tx_threads == 3); 167 168 tx->tx_exiting = 1; 169 170 cv_broadcast(&tx->tx_quiesce_more_cv); 171 cv_broadcast(&tx->tx_quiesce_done_cv); 172 cv_broadcast(&tx->tx_sync_more_cv); 173 cv_broadcast(&tx->tx_timeout_exit_cv); 174 175 while (tx->tx_threads != 0) 176 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 177 178 tx->tx_exiting = 0; 179 180 mutex_exit(&tx->tx_sync_lock); 181 } 182 183 uint64_t 184 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 185 { 186 tx_state_t *tx = &dp->dp_tx; 187 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 188 uint64_t txg; 189 190 mutex_enter(&tc->tc_lock); 191 192 txg = tx->tx_open_txg; 193 tc->tc_count[txg & TXG_MASK]++; 194 195 th->th_cpu = tc; 196 th->th_txg = txg; 197 198 return (txg); 199 } 200 201 void 202 txg_rele_to_quiesce(txg_handle_t *th) 203 { 204 tx_cpu_t *tc = th->th_cpu; 205 206 mutex_exit(&tc->tc_lock); 207 } 208 209 void 210 txg_rele_to_sync(txg_handle_t *th) 211 { 212 tx_cpu_t *tc = th->th_cpu; 213 int g = th->th_txg & TXG_MASK; 214 215 mutex_enter(&tc->tc_lock); 216 ASSERT(tc->tc_count[g] != 0); 217 if (--tc->tc_count[g] == 0) 218 cv_broadcast(&tc->tc_cv[g]); 219 mutex_exit(&tc->tc_lock); 220 221 th->th_cpu = NULL; /* defensive */ 222 } 223 224 static void 225 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 226 { 227 tx_state_t *tx = &dp->dp_tx; 228 int g = txg & TXG_MASK; 229 int c; 230 231 /* 232 * Grab all tx_cpu locks so nobody else can get into this txg. 233 */ 234 for (c = 0; c < max_ncpus; c++) 235 mutex_enter(&tx->tx_cpu[c].tc_lock); 236 237 ASSERT(txg == tx->tx_open_txg); 238 tx->tx_open_txg++; 239 240 /* 241 * Now that we've incremented tx_open_txg, we can let threads 242 * enter the next transaction group. 243 */ 244 for (c = 0; c < max_ncpus; c++) 245 mutex_exit(&tx->tx_cpu[c].tc_lock); 246 247 /* 248 * Quiesce the transaction group by waiting for everyone to txg_exit(). 249 */ 250 for (c = 0; c < max_ncpus; c++) { 251 tx_cpu_t *tc = &tx->tx_cpu[c]; 252 mutex_enter(&tc->tc_lock); 253 while (tc->tc_count[g] != 0) 254 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 255 mutex_exit(&tc->tc_lock); 256 } 257 } 258 259 static void 260 txg_sync_thread(dsl_pool_t *dp) 261 { 262 tx_state_t *tx = &dp->dp_tx; 263 callb_cpr_t cpr; 264 265 txg_thread_enter(tx, &cpr); 266 267 for (;;) { 268 uint64_t txg; 269 270 /* 271 * We sync when there's someone waiting on us, or the 272 * quiesce thread has handed off a txg to us. 273 */ 274 while (!tx->tx_exiting && 275 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 276 tx->tx_quiesced_txg == 0) { 277 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 278 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 279 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0); 280 } 281 282 /* 283 * Wait until the quiesce thread hands off a txg to us, 284 * prompting it to do so if necessary. 285 */ 286 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 287 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 288 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 289 cv_broadcast(&tx->tx_quiesce_more_cv); 290 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 291 } 292 293 if (tx->tx_exiting) 294 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 295 296 rw_enter(&tx->tx_suspend, RW_WRITER); 297 298 /* 299 * Consume the quiesced txg which has been handed off to 300 * us. This may cause the quiescing thread to now be 301 * able to quiesce another txg, so we must signal it. 302 */ 303 txg = tx->tx_quiesced_txg; 304 tx->tx_quiesced_txg = 0; 305 tx->tx_syncing_txg = txg; 306 cv_broadcast(&tx->tx_quiesce_more_cv); 307 rw_exit(&tx->tx_suspend); 308 309 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 310 txg, tx->tx_quiesce_txg_waiting, 311 tx->tx_sync_txg_waiting); 312 mutex_exit(&tx->tx_sync_lock); 313 spa_sync(dp->dp_spa, txg); 314 mutex_enter(&tx->tx_sync_lock); 315 rw_enter(&tx->tx_suspend, RW_WRITER); 316 tx->tx_synced_txg = txg; 317 tx->tx_syncing_txg = 0; 318 rw_exit(&tx->tx_suspend); 319 cv_broadcast(&tx->tx_sync_done_cv); 320 } 321 } 322 323 static void 324 txg_quiesce_thread(dsl_pool_t *dp) 325 { 326 tx_state_t *tx = &dp->dp_tx; 327 callb_cpr_t cpr; 328 329 txg_thread_enter(tx, &cpr); 330 331 for (;;) { 332 uint64_t txg; 333 334 /* 335 * We quiesce when there's someone waiting on us. 336 * However, we can only have one txg in "quiescing" or 337 * "quiesced, waiting to sync" state. So we wait until 338 * the "quiesced, waiting to sync" txg has been consumed 339 * by the sync thread. 340 */ 341 while (!tx->tx_exiting && 342 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 343 tx->tx_quiesced_txg != 0)) 344 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 345 346 if (tx->tx_exiting) 347 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 348 349 txg = tx->tx_open_txg; 350 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 351 txg, tx->tx_quiesce_txg_waiting, 352 tx->tx_sync_txg_waiting); 353 mutex_exit(&tx->tx_sync_lock); 354 txg_quiesce(dp, txg); 355 mutex_enter(&tx->tx_sync_lock); 356 357 /* 358 * Hand this txg off to the sync thread. 359 */ 360 dprintf("quiesce done, handing off txg %llu\n", txg); 361 tx->tx_quiesced_txg = txg; 362 cv_broadcast(&tx->tx_sync_more_cv); 363 cv_broadcast(&tx->tx_quiesce_done_cv); 364 } 365 } 366 367 void 368 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 369 { 370 tx_state_t *tx = &dp->dp_tx; 371 372 mutex_enter(&tx->tx_sync_lock); 373 ASSERT(tx->tx_threads == 3); 374 if (txg == 0) 375 txg = tx->tx_open_txg; 376 if (tx->tx_sync_txg_waiting < txg) 377 tx->tx_sync_txg_waiting = txg; 378 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 379 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 380 while (tx->tx_synced_txg < txg) { 381 dprintf("broadcasting sync more " 382 "tx_synced=%llu waiting=%llu dp=%p\n", 383 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 384 cv_broadcast(&tx->tx_sync_more_cv); 385 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 386 } 387 mutex_exit(&tx->tx_sync_lock); 388 } 389 390 void 391 txg_wait_open(dsl_pool_t *dp, uint64_t txg) 392 { 393 tx_state_t *tx = &dp->dp_tx; 394 395 mutex_enter(&tx->tx_sync_lock); 396 ASSERT(tx->tx_threads == 3); 397 if (txg == 0) 398 txg = tx->tx_open_txg + 1; 399 if (tx->tx_quiesce_txg_waiting < txg) 400 tx->tx_quiesce_txg_waiting = txg; 401 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 402 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 403 while (tx->tx_open_txg < txg) { 404 cv_broadcast(&tx->tx_quiesce_more_cv); 405 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 406 } 407 mutex_exit(&tx->tx_sync_lock); 408 } 409 410 static void 411 txg_timelimit_thread(dsl_pool_t *dp) 412 { 413 tx_state_t *tx = &dp->dp_tx; 414 callb_cpr_t cpr; 415 416 txg_thread_enter(tx, &cpr); 417 418 while (!tx->tx_exiting) { 419 uint64_t txg = tx->tx_open_txg + 1; 420 421 txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time); 422 423 if (tx->tx_quiesce_txg_waiting < txg) 424 tx->tx_quiesce_txg_waiting = txg; 425 426 while (!tx->tx_exiting && tx->tx_open_txg < txg) { 427 dprintf("pushing out %llu\n", txg); 428 cv_broadcast(&tx->tx_quiesce_more_cv); 429 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 430 } 431 } 432 txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread); 433 } 434 435 int 436 txg_stalled(dsl_pool_t *dp) 437 { 438 tx_state_t *tx = &dp->dp_tx; 439 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 440 } 441 442 void 443 txg_suspend(dsl_pool_t *dp) 444 { 445 tx_state_t *tx = &dp->dp_tx; 446 /* XXX some code paths suspend when they are already suspended! */ 447 rw_enter(&tx->tx_suspend, RW_READER); 448 } 449 450 void 451 txg_resume(dsl_pool_t *dp) 452 { 453 tx_state_t *tx = &dp->dp_tx; 454 rw_exit(&tx->tx_suspend); 455 } 456 457 /* 458 * Per-txg object lists. 459 */ 460 void 461 txg_list_create(txg_list_t *tl, size_t offset) 462 { 463 int t; 464 465 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 466 467 tl->tl_offset = offset; 468 469 for (t = 0; t < TXG_SIZE; t++) 470 tl->tl_head[t] = NULL; 471 } 472 473 void 474 txg_list_destroy(txg_list_t *tl) 475 { 476 int t; 477 478 for (t = 0; t < TXG_SIZE; t++) 479 ASSERT(txg_list_empty(tl, t)); 480 481 mutex_destroy(&tl->tl_lock); 482 } 483 484 int 485 txg_list_empty(txg_list_t *tl, uint64_t txg) 486 { 487 return (tl->tl_head[txg & TXG_MASK] == NULL); 488 } 489 490 /* 491 * Add an entry to the list. 492 * Returns 0 if it's a new entry, 1 if it's already there. 493 */ 494 int 495 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 496 { 497 int t = txg & TXG_MASK; 498 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 499 int already_on_list; 500 501 mutex_enter(&tl->tl_lock); 502 already_on_list = tn->tn_member[t]; 503 if (!already_on_list) { 504 tn->tn_member[t] = 1; 505 tn->tn_next[t] = tl->tl_head[t]; 506 tl->tl_head[t] = tn; 507 } 508 mutex_exit(&tl->tl_lock); 509 510 return (already_on_list); 511 } 512 513 /* 514 * Remove the head of the list and return it. 515 */ 516 void * 517 txg_list_remove(txg_list_t *tl, uint64_t txg) 518 { 519 int t = txg & TXG_MASK; 520 txg_node_t *tn; 521 void *p = NULL; 522 523 mutex_enter(&tl->tl_lock); 524 if ((tn = tl->tl_head[t]) != NULL) { 525 p = (char *)tn - tl->tl_offset; 526 tl->tl_head[t] = tn->tn_next[t]; 527 tn->tn_next[t] = NULL; 528 tn->tn_member[t] = 0; 529 } 530 mutex_exit(&tl->tl_lock); 531 532 return (p); 533 } 534 535 /* 536 * Remove a specific item from the list and return it. 537 */ 538 void * 539 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 540 { 541 int t = txg & TXG_MASK; 542 txg_node_t *tn, **tp; 543 544 mutex_enter(&tl->tl_lock); 545 546 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 547 if ((char *)tn - tl->tl_offset == p) { 548 *tp = tn->tn_next[t]; 549 tn->tn_next[t] = NULL; 550 tn->tn_member[t] = 0; 551 mutex_exit(&tl->tl_lock); 552 return (p); 553 } 554 } 555 556 mutex_exit(&tl->tl_lock); 557 558 return (NULL); 559 } 560 561 int 562 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 563 { 564 int t = txg & TXG_MASK; 565 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 566 567 return (tn->tn_member[t]); 568 } 569 570 /* 571 * Walk a txg list -- only safe if you know it's not changing. 572 */ 573 void * 574 txg_list_head(txg_list_t *tl, uint64_t txg) 575 { 576 int t = txg & TXG_MASK; 577 txg_node_t *tn = tl->tl_head[t]; 578 579 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 580 } 581 582 void * 583 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 584 { 585 int t = txg & TXG_MASK; 586 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 587 588 tn = tn->tn_next[t]; 589 590 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 591 } 592