1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include <sys/zfs_context.h> 30 #include <sys/txg_impl.h> 31 #include <sys/dmu_impl.h> 32 #include <sys/dsl_pool.h> 33 #include <sys/callb.h> 34 35 /* 36 * Pool-wide transaction groups. 37 */ 38 39 static void txg_sync_thread(dsl_pool_t *dp); 40 static void txg_quiesce_thread(dsl_pool_t *dp); 41 static void txg_timelimit_thread(dsl_pool_t *dp); 42 43 int txg_time = 5; /* max 5 seconds worth of delta per txg */ 44 45 /* 46 * Prepare the txg subsystem. 47 */ 48 void 49 txg_init(dsl_pool_t *dp, uint64_t txg) 50 { 51 tx_state_t *tx = &dp->dp_tx; 52 53 bzero(tx, sizeof (tx_state_t)); 54 55 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 56 57 rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL); 58 59 tx->tx_open_txg = txg; 60 } 61 62 /* 63 * Close down the txg subsystem. 64 */ 65 void 66 txg_fini(dsl_pool_t *dp) 67 { 68 tx_state_t *tx = &dp->dp_tx; 69 70 ASSERT(tx->tx_threads == 0); 71 72 rw_destroy(&tx->tx_suspend); 73 74 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 75 76 bzero(tx, sizeof (tx_state_t)); 77 } 78 79 /* 80 * Start syncing transaction groups. 81 */ 82 void 83 txg_sync_start(dsl_pool_t *dp) 84 { 85 tx_state_t *tx = &dp->dp_tx; 86 87 mutex_enter(&tx->tx_sync_lock); 88 89 dprintf("pool %p\n", dp); 90 91 ASSERT(tx->tx_threads == 0); 92 93 tx->tx_threads = 3; 94 95 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 96 dp, 0, &p0, TS_RUN, minclsyspri); 97 98 tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread, 99 dp, 0, &p0, TS_RUN, minclsyspri); 100 101 tx->tx_timelimit_thread = thread_create(NULL, 0, txg_timelimit_thread, 102 dp, 0, &p0, TS_RUN, minclsyspri); 103 104 mutex_exit(&tx->tx_sync_lock); 105 } 106 107 static void 108 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 109 { 110 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 111 mutex_enter(&tx->tx_sync_lock); 112 } 113 114 static void 115 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 116 { 117 ASSERT(*tpp != NULL); 118 *tpp = NULL; 119 tx->tx_threads--; 120 cv_broadcast(&tx->tx_exit_cv); 121 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 122 thread_exit(); 123 } 124 125 static void 126 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, int secmax) 127 { 128 CALLB_CPR_SAFE_BEGIN(cpr); 129 130 if (secmax) 131 (void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + secmax * hz); 132 else 133 cv_wait(cv, &tx->tx_sync_lock); 134 135 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 136 } 137 138 /* 139 * Stop syncing transaction groups. 140 */ 141 void 142 txg_sync_stop(dsl_pool_t *dp) 143 { 144 tx_state_t *tx = &dp->dp_tx; 145 146 dprintf("pool %p\n", dp); 147 /* 148 * Finish off any work in progress. 149 */ 150 ASSERT(tx->tx_threads == 3); 151 txg_wait_synced(dp, 0); 152 153 /* 154 * Wake all 3 sync threads (one per state) and wait for them to die. 155 */ 156 mutex_enter(&tx->tx_sync_lock); 157 158 ASSERT(tx->tx_threads == 3); 159 160 tx->tx_exiting = 1; 161 162 cv_broadcast(&tx->tx_quiesce_more_cv); 163 cv_broadcast(&tx->tx_quiesce_done_cv); 164 cv_broadcast(&tx->tx_sync_more_cv); 165 cv_broadcast(&tx->tx_timeout_exit_cv); 166 167 while (tx->tx_threads != 0) 168 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 169 170 tx->tx_exiting = 0; 171 172 mutex_exit(&tx->tx_sync_lock); 173 } 174 175 uint64_t 176 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 177 { 178 tx_state_t *tx = &dp->dp_tx; 179 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 180 uint64_t txg; 181 182 mutex_enter(&tc->tc_lock); 183 184 txg = tx->tx_open_txg; 185 tc->tc_count[txg & TXG_MASK]++; 186 187 th->th_cpu = tc; 188 th->th_txg = txg; 189 190 return (txg); 191 } 192 193 void 194 txg_rele_to_quiesce(txg_handle_t *th) 195 { 196 tx_cpu_t *tc = th->th_cpu; 197 198 mutex_exit(&tc->tc_lock); 199 } 200 201 void 202 txg_rele_to_sync(txg_handle_t *th) 203 { 204 tx_cpu_t *tc = th->th_cpu; 205 int g = th->th_txg & TXG_MASK; 206 207 mutex_enter(&tc->tc_lock); 208 ASSERT(tc->tc_count[g] != 0); 209 if (--tc->tc_count[g] == 0) 210 cv_broadcast(&tc->tc_cv[g]); 211 mutex_exit(&tc->tc_lock); 212 213 th->th_cpu = NULL; /* defensive */ 214 } 215 216 static void 217 txg_quiesce(dsl_pool_t *dp, uint64_t txg) 218 { 219 tx_state_t *tx = &dp->dp_tx; 220 int g = txg & TXG_MASK; 221 int c; 222 223 /* 224 * Grab all tx_cpu locks so nobody else can get into this txg. 225 */ 226 for (c = 0; c < max_ncpus; c++) 227 mutex_enter(&tx->tx_cpu[c].tc_lock); 228 229 ASSERT(txg == tx->tx_open_txg); 230 tx->tx_open_txg++; 231 232 /* 233 * Now that we've incremented tx_open_txg, we can let threads 234 * enter the next transaction group. 235 */ 236 for (c = 0; c < max_ncpus; c++) 237 mutex_exit(&tx->tx_cpu[c].tc_lock); 238 239 /* 240 * Quiesce the transaction group by waiting for everyone to txg_exit(). 241 */ 242 for (c = 0; c < max_ncpus; c++) { 243 tx_cpu_t *tc = &tx->tx_cpu[c]; 244 mutex_enter(&tc->tc_lock); 245 while (tc->tc_count[g] != 0) 246 cv_wait(&tc->tc_cv[g], &tc->tc_lock); 247 mutex_exit(&tc->tc_lock); 248 } 249 } 250 251 static void 252 txg_sync_thread(dsl_pool_t *dp) 253 { 254 tx_state_t *tx = &dp->dp_tx; 255 callb_cpr_t cpr; 256 257 txg_thread_enter(tx, &cpr); 258 259 for (;;) { 260 uint64_t txg; 261 262 /* 263 * We sync when there's someone waiting on us, or the 264 * quiesce thread has handed off a txg to us. 265 */ 266 while (!tx->tx_exiting && 267 tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 268 tx->tx_quiesced_txg == 0) { 269 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 270 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 271 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, 0); 272 } 273 274 /* 275 * Wait until the quiesce thread hands off a txg to us, 276 * prompting it to do so if necessary. 277 */ 278 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 279 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 280 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 281 cv_broadcast(&tx->tx_quiesce_more_cv); 282 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 283 } 284 285 if (tx->tx_exiting) 286 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 287 288 rw_enter(&tx->tx_suspend, RW_WRITER); 289 290 /* 291 * Consume the quiesced txg which has been handed off to 292 * us. This may cause the quiescing thread to now be 293 * able to quiesce another txg, so we must signal it. 294 */ 295 txg = tx->tx_quiesced_txg; 296 tx->tx_quiesced_txg = 0; 297 tx->tx_syncing_txg = txg; 298 cv_broadcast(&tx->tx_quiesce_more_cv); 299 rw_exit(&tx->tx_suspend); 300 301 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 302 txg, tx->tx_quiesce_txg_waiting, 303 tx->tx_sync_txg_waiting); 304 mutex_exit(&tx->tx_sync_lock); 305 spa_sync(dp->dp_spa, txg); 306 mutex_enter(&tx->tx_sync_lock); 307 rw_enter(&tx->tx_suspend, RW_WRITER); 308 tx->tx_synced_txg = txg; 309 tx->tx_syncing_txg = 0; 310 rw_exit(&tx->tx_suspend); 311 cv_broadcast(&tx->tx_sync_done_cv); 312 } 313 } 314 315 static void 316 txg_quiesce_thread(dsl_pool_t *dp) 317 { 318 tx_state_t *tx = &dp->dp_tx; 319 callb_cpr_t cpr; 320 321 txg_thread_enter(tx, &cpr); 322 323 for (;;) { 324 uint64_t txg; 325 326 /* 327 * We quiesce when there's someone waiting on us. 328 * However, we can only have one txg in "quiescing" or 329 * "quiesced, waiting to sync" state. So we wait until 330 * the "quiesced, waiting to sync" txg has been consumed 331 * by the sync thread. 332 */ 333 while (!tx->tx_exiting && 334 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 335 tx->tx_quiesced_txg != 0)) 336 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 337 338 if (tx->tx_exiting) 339 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 340 341 txg = tx->tx_open_txg; 342 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 343 txg, tx->tx_quiesce_txg_waiting, 344 tx->tx_sync_txg_waiting); 345 mutex_exit(&tx->tx_sync_lock); 346 txg_quiesce(dp, txg); 347 mutex_enter(&tx->tx_sync_lock); 348 349 /* 350 * Hand this txg off to the sync thread. 351 */ 352 dprintf("quiesce done, handing off txg %llu\n", txg); 353 tx->tx_quiesced_txg = txg; 354 cv_broadcast(&tx->tx_sync_more_cv); 355 cv_broadcast(&tx->tx_quiesce_done_cv); 356 } 357 } 358 359 void 360 txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 361 { 362 tx_state_t *tx = &dp->dp_tx; 363 364 mutex_enter(&tx->tx_sync_lock); 365 ASSERT(tx->tx_threads == 3); 366 if (txg == 0) 367 txg = tx->tx_open_txg; 368 if (tx->tx_sync_txg_waiting < txg) 369 tx->tx_sync_txg_waiting = txg; 370 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 371 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 372 while (tx->tx_synced_txg < txg) { 373 dprintf("broadcasting sync more " 374 "tx_synced=%llu waiting=%llu dp=%p\n", 375 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 376 cv_broadcast(&tx->tx_sync_more_cv); 377 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 378 } 379 mutex_exit(&tx->tx_sync_lock); 380 } 381 382 void 383 txg_wait_open(dsl_pool_t *dp, uint64_t txg) 384 { 385 tx_state_t *tx = &dp->dp_tx; 386 387 mutex_enter(&tx->tx_sync_lock); 388 ASSERT(tx->tx_threads == 3); 389 if (txg == 0) 390 txg = tx->tx_open_txg + 1; 391 if (tx->tx_quiesce_txg_waiting < txg) 392 tx->tx_quiesce_txg_waiting = txg; 393 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 394 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 395 while (tx->tx_open_txg < txg) { 396 cv_broadcast(&tx->tx_quiesce_more_cv); 397 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 398 } 399 mutex_exit(&tx->tx_sync_lock); 400 } 401 402 static void 403 txg_timelimit_thread(dsl_pool_t *dp) 404 { 405 tx_state_t *tx = &dp->dp_tx; 406 callb_cpr_t cpr; 407 408 txg_thread_enter(tx, &cpr); 409 410 while (!tx->tx_exiting) { 411 uint64_t txg = tx->tx_open_txg + 1; 412 413 txg_thread_wait(tx, &cpr, &tx->tx_timeout_exit_cv, txg_time); 414 415 if (tx->tx_quiesce_txg_waiting < txg) 416 tx->tx_quiesce_txg_waiting = txg; 417 418 while (!tx->tx_exiting && tx->tx_open_txg < txg) { 419 dprintf("pushing out %llu\n", txg); 420 cv_broadcast(&tx->tx_quiesce_more_cv); 421 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 422 } 423 } 424 txg_thread_exit(tx, &cpr, &tx->tx_timelimit_thread); 425 } 426 427 int 428 txg_stalled(dsl_pool_t *dp) 429 { 430 tx_state_t *tx = &dp->dp_tx; 431 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 432 } 433 434 void 435 txg_suspend(dsl_pool_t *dp) 436 { 437 tx_state_t *tx = &dp->dp_tx; 438 /* XXX some code paths suspend when they are already suspended! */ 439 rw_enter(&tx->tx_suspend, RW_READER); 440 } 441 442 void 443 txg_resume(dsl_pool_t *dp) 444 { 445 tx_state_t *tx = &dp->dp_tx; 446 rw_exit(&tx->tx_suspend); 447 } 448 449 /* 450 * Per-txg object lists. 451 */ 452 void 453 txg_list_create(txg_list_t *tl, size_t offset) 454 { 455 int t; 456 457 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 458 459 tl->tl_offset = offset; 460 461 for (t = 0; t < TXG_SIZE; t++) 462 tl->tl_head[t] = NULL; 463 } 464 465 void 466 txg_list_destroy(txg_list_t *tl) 467 { 468 int t; 469 470 for (t = 0; t < TXG_SIZE; t++) 471 ASSERT(txg_list_empty(tl, t)); 472 473 mutex_destroy(&tl->tl_lock); 474 } 475 476 int 477 txg_list_empty(txg_list_t *tl, uint64_t txg) 478 { 479 return (tl->tl_head[txg & TXG_MASK] == NULL); 480 } 481 482 /* 483 * Add an entry to the list. 484 * Returns 0 if it's a new entry, 1 if it's already there. 485 */ 486 int 487 txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 488 { 489 int t = txg & TXG_MASK; 490 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 491 int already_on_list; 492 493 mutex_enter(&tl->tl_lock); 494 already_on_list = tn->tn_member[t]; 495 if (!already_on_list) { 496 tn->tn_member[t] = 1; 497 tn->tn_next[t] = tl->tl_head[t]; 498 tl->tl_head[t] = tn; 499 } 500 mutex_exit(&tl->tl_lock); 501 502 return (already_on_list); 503 } 504 505 /* 506 * Remove the head of the list and return it. 507 */ 508 void * 509 txg_list_remove(txg_list_t *tl, uint64_t txg) 510 { 511 int t = txg & TXG_MASK; 512 txg_node_t *tn; 513 void *p = NULL; 514 515 mutex_enter(&tl->tl_lock); 516 if ((tn = tl->tl_head[t]) != NULL) { 517 p = (char *)tn - tl->tl_offset; 518 tl->tl_head[t] = tn->tn_next[t]; 519 tn->tn_next[t] = NULL; 520 tn->tn_member[t] = 0; 521 } 522 mutex_exit(&tl->tl_lock); 523 524 return (p); 525 } 526 527 /* 528 * Remove a specific item from the list and return it. 529 */ 530 void * 531 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 532 { 533 int t = txg & TXG_MASK; 534 txg_node_t *tn, **tp; 535 536 mutex_enter(&tl->tl_lock); 537 538 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 539 if ((char *)tn - tl->tl_offset == p) { 540 *tp = tn->tn_next[t]; 541 tn->tn_next[t] = NULL; 542 tn->tn_member[t] = 0; 543 mutex_exit(&tl->tl_lock); 544 return (p); 545 } 546 } 547 548 mutex_exit(&tl->tl_lock); 549 550 return (NULL); 551 } 552 553 int 554 txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 555 { 556 int t = txg & TXG_MASK; 557 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 558 559 return (tn->tn_member[t]); 560 } 561 562 /* 563 * Walk a txg list -- only safe if you know it's not changing. 564 */ 565 void * 566 txg_list_head(txg_list_t *tl, uint64_t txg) 567 { 568 int t = txg & TXG_MASK; 569 txg_node_t *tn = tl->tl_head[t]; 570 571 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 572 } 573 574 void * 575 txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 576 { 577 int t = txg & TXG_MASK; 578 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 579 580 tn = tn->tn_next[t]; 581 582 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 583 } 584