1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * bcachefs journalling code, for btree insertions 4 * 5 * Copyright 2012 Google, Inc. 6 */ 7 8 #include "bcachefs.h" 9 #include "alloc.h" 10 #include "bkey_methods.h" 11 #include "btree_gc.h" 12 #include "buckets.h" 13 #include "journal.h" 14 #include "journal_io.h" 15 #include "journal_reclaim.h" 16 #include "journal_seq_blacklist.h" 17 #include "super-io.h" 18 #include "trace.h" 19 20 static bool journal_entry_is_open(struct journal *j) 21 { 22 return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; 23 } 24 25 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set) 26 { 27 struct journal_buf *w = journal_prev_buf(j); 28 29 atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count); 30 31 if (!need_write_just_set && 32 test_bit(JOURNAL_NEED_WRITE, &j->flags)) 33 bch2_time_stats_update(j->delay_time, 34 j->need_write_time); 35 #if 0 36 closure_call(&j->io, bch2_journal_write, NULL, NULL); 37 #else 38 /* Shut sparse up: */ 39 closure_init(&j->io, NULL); 40 set_closure_fn(&j->io, bch2_journal_write, NULL); 41 bch2_journal_write(&j->io); 42 #endif 43 } 44 45 static void journal_pin_new_entry(struct journal *j, int count) 46 { 47 struct journal_entry_pin_list *p; 48 49 /* 50 * The fifo_push() needs to happen at the same time as j->seq is 51 * incremented for journal_last_seq() to be calculated correctly 52 */ 53 atomic64_inc(&j->seq); 54 p = fifo_push_ref(&j->pin); 55 56 INIT_LIST_HEAD(&p->list); 57 INIT_LIST_HEAD(&p->flushed); 58 atomic_set(&p->count, count); 59 p->devs.nr = 0; 60 } 61 62 static void bch2_journal_buf_init(struct journal *j) 63 { 64 struct journal_buf *buf = journal_cur_buf(j); 65 66 memset(buf->has_inode, 0, sizeof(buf->has_inode)); 67 68 memset(buf->data, 0, sizeof(*buf->data)); 69 buf->data->seq = cpu_to_le64(journal_cur_seq(j)); 70 buf->data->u64s = 0; 71 } 72 73 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf) 74 { 75 return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX); 76 } 77 78 static inline bool journal_entry_empty(struct jset *j) 79 { 80 struct jset_entry *i; 81 82 if (j->seq != j->last_seq) 83 return false; 84 85 vstruct_for_each(j, i) 86 if (i->type || i->u64s) 87 return false; 88 return true; 89 } 90 91 static enum { 92 JOURNAL_ENTRY_ERROR, 93 JOURNAL_ENTRY_INUSE, 94 JOURNAL_ENTRY_CLOSED, 95 JOURNAL_UNLOCKED, 96 } journal_buf_switch(struct journal *j, bool need_write_just_set) 97 { 98 struct bch_fs *c = container_of(j, struct bch_fs, journal); 99 struct journal_buf *buf; 100 union journal_res_state old, new; 101 u64 v = atomic64_read(&j->reservations.counter); 102 103 lockdep_assert_held(&j->lock); 104 105 do { 106 old.v = new.v = v; 107 if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL) 108 return JOURNAL_ENTRY_CLOSED; 109 110 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) 111 return JOURNAL_ENTRY_ERROR; 112 113 if (new.prev_buf_unwritten) 114 return JOURNAL_ENTRY_INUSE; 115 116 /* 117 * avoid race between setting buf->data->u64s and 118 * journal_res_put starting write: 119 */ 120 journal_state_inc(&new); 121 122 new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL; 123 new.idx++; 124 new.prev_buf_unwritten = 1; 125 126 BUG_ON(journal_state_count(new, new.idx)); 127 } while ((v = atomic64_cmpxchg(&j->reservations.counter, 128 old.v, new.v)) != old.v); 129 130 clear_bit(JOURNAL_NEED_WRITE, &j->flags); 131 132 buf = &j->buf[old.idx]; 133 buf->data->u64s = cpu_to_le32(old.cur_entry_offset); 134 135 j->prev_buf_sectors = 136 vstruct_blocks_plus(buf->data, c->block_bits, 137 journal_entry_u64s_reserve(buf)) * 138 c->opts.block_size; 139 BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors); 140 141 /* 142 * We have to set last_seq here, _before_ opening a new journal entry: 143 * 144 * A threads may replace an old pin with a new pin on their current 145 * journal reservation - the expectation being that the journal will 146 * contain either what the old pin protected or what the new pin 147 * protects. 148 * 149 * After the old pin is dropped journal_last_seq() won't include the old 150 * pin, so we can only write the updated last_seq on the entry that 151 * contains whatever the new pin protects. 152 * 153 * Restated, we can _not_ update last_seq for a given entry if there 154 * could be a newer entry open with reservations/pins that have been 155 * taken against it. 156 * 157 * Hence, we want update/set last_seq on the current journal entry right 158 * before we open a new one: 159 */ 160 bch2_journal_reclaim_fast(j); 161 buf->data->last_seq = cpu_to_le64(journal_last_seq(j)); 162 163 if (journal_entry_empty(buf->data)) 164 clear_bit(JOURNAL_NOT_EMPTY, &j->flags); 165 else 166 set_bit(JOURNAL_NOT_EMPTY, &j->flags); 167 168 journal_pin_new_entry(j, 1); 169 170 bch2_journal_buf_init(j); 171 172 cancel_delayed_work(&j->write_work); 173 spin_unlock(&j->lock); 174 175 if (c->bucket_journal_seq > 1 << 14) { 176 c->bucket_journal_seq = 0; 177 bch2_bucket_seq_cleanup(c); 178 } 179 180 c->bucket_journal_seq++; 181 182 /* ugh - might be called from __journal_res_get() under wait_event() */ 183 __set_current_state(TASK_RUNNING); 184 bch2_journal_buf_put(j, old.idx, need_write_just_set); 185 186 return JOURNAL_UNLOCKED; 187 } 188 189 void bch2_journal_halt(struct journal *j) 190 { 191 union journal_res_state old, new; 192 u64 v = atomic64_read(&j->reservations.counter); 193 194 do { 195 old.v = new.v = v; 196 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) 197 return; 198 199 new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL; 200 } while ((v = atomic64_cmpxchg(&j->reservations.counter, 201 old.v, new.v)) != old.v); 202 203 journal_wake(j); 204 closure_wake_up(&journal_cur_buf(j)->wait); 205 closure_wake_up(&journal_prev_buf(j)->wait); 206 } 207 208 /* 209 * should _only_ called from journal_res_get() - when we actually want a 210 * journal reservation - journal entry is open means journal is dirty: 211 * 212 * returns: 213 * 1: success 214 * 0: journal currently full (must wait) 215 * -EROFS: insufficient rw devices 216 * -EIO: journal error 217 */ 218 static int journal_entry_open(struct journal *j) 219 { 220 struct journal_buf *buf = journal_cur_buf(j); 221 union journal_res_state old, new; 222 ssize_t u64s; 223 int sectors; 224 u64 v; 225 226 lockdep_assert_held(&j->lock); 227 BUG_ON(journal_entry_is_open(j)); 228 229 if (!fifo_free(&j->pin)) 230 return 0; 231 232 sectors = bch2_journal_entry_sectors(j); 233 if (sectors <= 0) 234 return sectors; 235 236 buf->disk_sectors = sectors; 237 238 sectors = min_t(unsigned, sectors, buf->size >> 9); 239 j->cur_buf_sectors = sectors; 240 241 u64s = (sectors << 9) / sizeof(u64); 242 243 /* Subtract the journal header */ 244 u64s -= sizeof(struct jset) / sizeof(u64); 245 /* 246 * Btree roots, prio pointers don't get added until right before we do 247 * the write: 248 */ 249 u64s -= journal_entry_u64s_reserve(buf); 250 u64s = max_t(ssize_t, 0L, u64s); 251 252 BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL); 253 254 if (u64s <= le32_to_cpu(buf->data->u64s)) 255 return 0; 256 257 /* 258 * Must be set before marking the journal entry as open: 259 */ 260 j->cur_entry_u64s = u64s; 261 262 v = atomic64_read(&j->reservations.counter); 263 do { 264 old.v = new.v = v; 265 266 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) 267 return -EIO; 268 269 /* Handle any already added entries */ 270 new.cur_entry_offset = le32_to_cpu(buf->data->u64s); 271 } while ((v = atomic64_cmpxchg(&j->reservations.counter, 272 old.v, new.v)) != old.v); 273 274 if (j->res_get_blocked_start) 275 bch2_time_stats_update(j->blocked_time, 276 j->res_get_blocked_start); 277 j->res_get_blocked_start = 0; 278 279 mod_delayed_work(system_freezable_wq, 280 &j->write_work, 281 msecs_to_jiffies(j->write_delay_ms)); 282 journal_wake(j); 283 return 1; 284 } 285 286 /* 287 * returns true if there's nothing to flush and no journal write still in flight 288 */ 289 static bool journal_flush_write(struct journal *j) 290 { 291 bool ret; 292 293 spin_lock(&j->lock); 294 ret = !j->reservations.prev_buf_unwritten; 295 296 if (!journal_entry_is_open(j)) { 297 spin_unlock(&j->lock); 298 return ret; 299 } 300 301 set_bit(JOURNAL_NEED_WRITE, &j->flags); 302 if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED) 303 ret = false; 304 else 305 spin_unlock(&j->lock); 306 return ret; 307 } 308 309 static void journal_write_work(struct work_struct *work) 310 { 311 struct journal *j = container_of(work, struct journal, write_work.work); 312 313 journal_flush_write(j); 314 } 315 316 /* 317 * Given an inode number, if that inode number has data in the journal that 318 * hasn't yet been flushed, return the journal sequence number that needs to be 319 * flushed: 320 */ 321 u64 bch2_inode_journal_seq(struct journal *j, u64 inode) 322 { 323 size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8)); 324 u64 seq = 0; 325 326 if (!test_bit(h, j->buf[0].has_inode) && 327 !test_bit(h, j->buf[1].has_inode)) 328 return 0; 329 330 spin_lock(&j->lock); 331 if (test_bit(h, journal_cur_buf(j)->has_inode)) 332 seq = journal_cur_seq(j); 333 else if (test_bit(h, journal_prev_buf(j)->has_inode)) 334 seq = journal_cur_seq(j) - 1; 335 spin_unlock(&j->lock); 336 337 return seq; 338 } 339 340 static int __journal_res_get(struct journal *j, struct journal_res *res, 341 unsigned u64s_min, unsigned u64s_max) 342 { 343 struct bch_fs *c = container_of(j, struct bch_fs, journal); 344 struct journal_buf *buf; 345 int ret; 346 retry: 347 ret = journal_res_get_fast(j, res, u64s_min, u64s_max); 348 if (ret) 349 return ret; 350 351 spin_lock(&j->lock); 352 /* 353 * Recheck after taking the lock, so we don't race with another thread 354 * that just did journal_entry_open() and call journal_entry_close() 355 * unnecessarily 356 */ 357 ret = journal_res_get_fast(j, res, u64s_min, u64s_max); 358 if (ret) { 359 spin_unlock(&j->lock); 360 return 1; 361 } 362 363 /* 364 * If we couldn't get a reservation because the current buf filled up, 365 * and we had room for a bigger entry on disk, signal that we want to 366 * realloc the journal bufs: 367 */ 368 buf = journal_cur_buf(j); 369 if (journal_entry_is_open(j) && 370 buf->size >> 9 < buf->disk_sectors && 371 buf->size < JOURNAL_ENTRY_SIZE_MAX) 372 j->buf_size_want = max(j->buf_size_want, buf->size << 1); 373 374 /* 375 * Close the current journal entry if necessary, then try to start a new 376 * one: 377 */ 378 switch (journal_buf_switch(j, false)) { 379 case JOURNAL_ENTRY_ERROR: 380 spin_unlock(&j->lock); 381 return -EROFS; 382 case JOURNAL_ENTRY_INUSE: 383 /* haven't finished writing out the previous one: */ 384 spin_unlock(&j->lock); 385 trace_journal_entry_full(c); 386 goto blocked; 387 case JOURNAL_ENTRY_CLOSED: 388 break; 389 case JOURNAL_UNLOCKED: 390 goto retry; 391 } 392 393 /* We now have a new, closed journal buf - see if we can open it: */ 394 ret = journal_entry_open(j); 395 spin_unlock(&j->lock); 396 397 if (ret < 0) 398 return ret; 399 if (ret) 400 goto retry; 401 402 /* Journal's full, we have to wait */ 403 404 /* 405 * Direct reclaim - can't rely on reclaim from work item 406 * due to freezing.. 407 */ 408 bch2_journal_reclaim_work(&j->reclaim_work.work); 409 410 trace_journal_full(c); 411 blocked: 412 if (!j->res_get_blocked_start) 413 j->res_get_blocked_start = local_clock() ?: 1; 414 return 0; 415 } 416 417 /* 418 * Essentially the entry function to the journaling code. When bcachefs is doing 419 * a btree insert, it calls this function to get the current journal write. 420 * Journal write is the structure used set up journal writes. The calling 421 * function will then add its keys to the structure, queuing them for the next 422 * write. 423 * 424 * To ensure forward progress, the current task must not be holding any 425 * btree node write locks. 426 */ 427 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, 428 unsigned u64s_min, unsigned u64s_max) 429 { 430 int ret; 431 432 wait_event(j->wait, 433 (ret = __journal_res_get(j, res, u64s_min, 434 u64s_max))); 435 return ret < 0 ? ret : 0; 436 } 437 438 u64 bch2_journal_last_unwritten_seq(struct journal *j) 439 { 440 u64 seq; 441 442 spin_lock(&j->lock); 443 seq = journal_cur_seq(j); 444 if (j->reservations.prev_buf_unwritten) 445 seq--; 446 spin_unlock(&j->lock); 447 448 return seq; 449 } 450 451 /** 452 * bch2_journal_open_seq_async - try to open a new journal entry if @seq isn't 453 * open yet, or wait if we cannot 454 * 455 * used by the btree interior update machinery, when it needs to write a new 456 * btree root - every journal entry contains the roots of all the btrees, so it 457 * doesn't need to bother with getting a journal reservation 458 */ 459 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent) 460 { 461 int ret; 462 463 spin_lock(&j->lock); 464 BUG_ON(seq > journal_cur_seq(j)); 465 466 if (seq < journal_cur_seq(j) || 467 journal_entry_is_open(j)) { 468 spin_unlock(&j->lock); 469 return 1; 470 } 471 472 ret = journal_entry_open(j); 473 if (!ret) 474 closure_wait(&j->async_wait, parent); 475 spin_unlock(&j->lock); 476 477 if (!ret) 478 bch2_journal_reclaim_work(&j->reclaim_work.work); 479 480 return ret; 481 } 482 483 /** 484 * bch2_journal_wait_on_seq - wait for a journal entry to be written 485 * 486 * does _not_ cause @seq to be written immediately - if there is no other 487 * activity to cause the relevant journal entry to be filled up or flushed it 488 * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is 489 * configurable). 490 */ 491 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent) 492 { 493 spin_lock(&j->lock); 494 495 BUG_ON(seq > journal_cur_seq(j)); 496 497 if (bch2_journal_error(j)) { 498 spin_unlock(&j->lock); 499 return; 500 } 501 502 if (seq == journal_cur_seq(j)) { 503 if (!closure_wait(&journal_cur_buf(j)->wait, parent)) 504 BUG(); 505 } else if (seq + 1 == journal_cur_seq(j) && 506 j->reservations.prev_buf_unwritten) { 507 if (!closure_wait(&journal_prev_buf(j)->wait, parent)) 508 BUG(); 509 510 smp_mb(); 511 512 /* check if raced with write completion (or failure) */ 513 if (!j->reservations.prev_buf_unwritten || 514 bch2_journal_error(j)) 515 closure_wake_up(&journal_prev_buf(j)->wait); 516 } 517 518 spin_unlock(&j->lock); 519 } 520 521 /** 522 * bch2_journal_flush_seq_async - wait for a journal entry to be written 523 * 524 * like bch2_journal_wait_on_seq, except that it triggers a write immediately if 525 * necessary 526 */ 527 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent) 528 { 529 struct journal_buf *buf; 530 531 spin_lock(&j->lock); 532 533 BUG_ON(seq > journal_cur_seq(j)); 534 535 if (bch2_journal_error(j)) { 536 spin_unlock(&j->lock); 537 return; 538 } 539 540 if (seq == journal_cur_seq(j)) { 541 bool set_need_write = false; 542 543 buf = journal_cur_buf(j); 544 545 if (parent && !closure_wait(&buf->wait, parent)) 546 BUG(); 547 548 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) { 549 j->need_write_time = local_clock(); 550 set_need_write = true; 551 } 552 553 switch (journal_buf_switch(j, set_need_write)) { 554 case JOURNAL_ENTRY_ERROR: 555 if (parent) 556 closure_wake_up(&buf->wait); 557 break; 558 case JOURNAL_ENTRY_CLOSED: 559 /* 560 * Journal entry hasn't been opened yet, but caller 561 * claims it has something 562 */ 563 BUG(); 564 case JOURNAL_ENTRY_INUSE: 565 break; 566 case JOURNAL_UNLOCKED: 567 return; 568 } 569 } else if (parent && 570 seq + 1 == journal_cur_seq(j) && 571 j->reservations.prev_buf_unwritten) { 572 buf = journal_prev_buf(j); 573 574 if (!closure_wait(&buf->wait, parent)) 575 BUG(); 576 577 smp_mb(); 578 579 /* check if raced with write completion (or failure) */ 580 if (!j->reservations.prev_buf_unwritten || 581 bch2_journal_error(j)) 582 closure_wake_up(&buf->wait); 583 } 584 585 spin_unlock(&j->lock); 586 } 587 588 static int journal_seq_flushed(struct journal *j, u64 seq) 589 { 590 struct journal_buf *buf; 591 int ret = 1; 592 593 spin_lock(&j->lock); 594 BUG_ON(seq > journal_cur_seq(j)); 595 596 if (seq == journal_cur_seq(j)) { 597 bool set_need_write = false; 598 599 ret = 0; 600 601 buf = journal_cur_buf(j); 602 603 if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) { 604 j->need_write_time = local_clock(); 605 set_need_write = true; 606 } 607 608 switch (journal_buf_switch(j, set_need_write)) { 609 case JOURNAL_ENTRY_ERROR: 610 ret = -EIO; 611 break; 612 case JOURNAL_ENTRY_CLOSED: 613 /* 614 * Journal entry hasn't been opened yet, but caller 615 * claims it has something 616 */ 617 BUG(); 618 case JOURNAL_ENTRY_INUSE: 619 break; 620 case JOURNAL_UNLOCKED: 621 return 0; 622 } 623 } else if (seq + 1 == journal_cur_seq(j) && 624 j->reservations.prev_buf_unwritten) { 625 ret = bch2_journal_error(j); 626 } 627 628 spin_unlock(&j->lock); 629 630 return ret; 631 } 632 633 int bch2_journal_flush_seq(struct journal *j, u64 seq) 634 { 635 u64 start_time = local_clock(); 636 int ret, ret2; 637 638 ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq))); 639 640 bch2_time_stats_update(j->flush_seq_time, start_time); 641 642 return ret ?: ret2 < 0 ? ret2 : 0; 643 } 644 645 /** 646 * bch2_journal_meta_async - force a journal entry to be written 647 */ 648 void bch2_journal_meta_async(struct journal *j, struct closure *parent) 649 { 650 struct journal_res res; 651 unsigned u64s = jset_u64s(0); 652 653 memset(&res, 0, sizeof(res)); 654 655 bch2_journal_res_get(j, &res, u64s, u64s); 656 bch2_journal_res_put(j, &res); 657 658 bch2_journal_flush_seq_async(j, res.seq, parent); 659 } 660 661 int bch2_journal_meta(struct journal *j) 662 { 663 struct journal_res res; 664 unsigned u64s = jset_u64s(0); 665 int ret; 666 667 memset(&res, 0, sizeof(res)); 668 669 ret = bch2_journal_res_get(j, &res, u64s, u64s); 670 if (ret) 671 return ret; 672 673 bch2_journal_res_put(j, &res); 674 675 return bch2_journal_flush_seq(j, res.seq); 676 } 677 678 /* 679 * bch2_journal_flush_async - if there is an open journal entry, or a journal 680 * still being written, write it and wait for the write to complete 681 */ 682 void bch2_journal_flush_async(struct journal *j, struct closure *parent) 683 { 684 u64 seq, journal_seq; 685 686 spin_lock(&j->lock); 687 journal_seq = journal_cur_seq(j); 688 689 if (journal_entry_is_open(j)) { 690 seq = journal_seq; 691 } else if (journal_seq) { 692 seq = journal_seq - 1; 693 } else { 694 spin_unlock(&j->lock); 695 return; 696 } 697 spin_unlock(&j->lock); 698 699 bch2_journal_flush_seq_async(j, seq, parent); 700 } 701 702 int bch2_journal_flush(struct journal *j) 703 { 704 u64 seq, journal_seq; 705 706 spin_lock(&j->lock); 707 journal_seq = journal_cur_seq(j); 708 709 if (journal_entry_is_open(j)) { 710 seq = journal_seq; 711 } else if (journal_seq) { 712 seq = journal_seq - 1; 713 } else { 714 spin_unlock(&j->lock); 715 return 0; 716 } 717 spin_unlock(&j->lock); 718 719 return bch2_journal_flush_seq(j, seq); 720 } 721 722 /* allocate journal on a device: */ 723 724 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, 725 bool new_fs, struct closure *cl) 726 { 727 struct bch_fs *c = ca->fs; 728 struct journal_device *ja = &ca->journal; 729 struct bch_sb_field_journal *journal_buckets; 730 u64 *new_bucket_seq = NULL, *new_buckets = NULL; 731 int ret = 0; 732 733 /* don't handle reducing nr of buckets yet: */ 734 if (nr <= ja->nr) 735 return 0; 736 737 ret = -ENOMEM; 738 new_buckets = kzalloc(nr * sizeof(u64), GFP_KERNEL); 739 new_bucket_seq = kzalloc(nr * sizeof(u64), GFP_KERNEL); 740 if (!new_buckets || !new_bucket_seq) 741 goto err; 742 743 journal_buckets = bch2_sb_resize_journal(&ca->disk_sb, 744 nr + sizeof(*journal_buckets) / sizeof(u64)); 745 if (!journal_buckets) 746 goto err; 747 748 /* 749 * We may be called from the device add path, before the new device has 750 * actually been added to the running filesystem: 751 */ 752 if (c) 753 spin_lock(&c->journal.lock); 754 755 memcpy(new_buckets, ja->buckets, ja->nr * sizeof(u64)); 756 memcpy(new_bucket_seq, ja->bucket_seq, ja->nr * sizeof(u64)); 757 swap(new_buckets, ja->buckets); 758 swap(new_bucket_seq, ja->bucket_seq); 759 760 if (c) 761 spin_unlock(&c->journal.lock); 762 763 while (ja->nr < nr) { 764 struct open_bucket *ob = NULL; 765 long bucket; 766 767 if (new_fs) { 768 bucket = bch2_bucket_alloc_new_fs(ca); 769 if (bucket < 0) { 770 ret = -ENOSPC; 771 goto err; 772 } 773 } else { 774 int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl); 775 if (ob_idx < 0) { 776 ret = cl ? -EAGAIN : -ENOSPC; 777 goto err; 778 } 779 780 ob = c->open_buckets + ob_idx; 781 bucket = sector_to_bucket(ca, ob->ptr.offset); 782 } 783 784 if (c) { 785 percpu_down_read(&c->usage_lock); 786 spin_lock(&c->journal.lock); 787 } else { 788 preempt_disable(); 789 } 790 791 __array_insert_item(ja->buckets, ja->nr, ja->last_idx); 792 __array_insert_item(ja->bucket_seq, ja->nr, ja->last_idx); 793 __array_insert_item(journal_buckets->buckets, ja->nr, ja->last_idx); 794 795 ja->buckets[ja->last_idx] = bucket; 796 ja->bucket_seq[ja->last_idx] = 0; 797 journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket); 798 799 if (ja->last_idx < ja->nr) { 800 if (ja->cur_idx >= ja->last_idx) 801 ja->cur_idx++; 802 ja->last_idx++; 803 } 804 ja->nr++; 805 806 bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL, 807 ca->mi.bucket_size, 808 gc_phase(GC_PHASE_SB), 809 new_fs 810 ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE 811 : 0); 812 813 if (c) { 814 spin_unlock(&c->journal.lock); 815 percpu_up_read(&c->usage_lock); 816 } else { 817 preempt_enable(); 818 } 819 820 if (!new_fs) 821 bch2_open_bucket_put(c, ob); 822 } 823 824 ret = 0; 825 err: 826 kfree(new_bucket_seq); 827 kfree(new_buckets); 828 829 return ret; 830 } 831 832 /* 833 * Allocate more journal space at runtime - not currently making use if it, but 834 * the code works: 835 */ 836 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca, 837 unsigned nr) 838 { 839 struct journal_device *ja = &ca->journal; 840 struct closure cl; 841 unsigned current_nr; 842 int ret; 843 844 closure_init_stack(&cl); 845 846 do { 847 struct disk_reservation disk_res = { 0, 0 }; 848 849 closure_sync(&cl); 850 851 mutex_lock(&c->sb_lock); 852 current_nr = ja->nr; 853 854 /* 855 * note: journal buckets aren't really counted as _sectors_ used yet, so 856 * we don't need the disk reservation to avoid the BUG_ON() in buckets.c 857 * when space used goes up without a reservation - but we do need the 858 * reservation to ensure we'll actually be able to allocate: 859 */ 860 861 if (bch2_disk_reservation_get(c, &disk_res, 862 bucket_to_sector(ca, nr - ja->nr), 1, 0)) { 863 mutex_unlock(&c->sb_lock); 864 return -ENOSPC; 865 } 866 867 ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl); 868 869 bch2_disk_reservation_put(c, &disk_res); 870 871 if (ja->nr != current_nr) 872 bch2_write_super(c); 873 mutex_unlock(&c->sb_lock); 874 } while (ret == -EAGAIN); 875 876 return ret; 877 } 878 879 int bch2_dev_journal_alloc(struct bch_dev *ca) 880 { 881 unsigned nr; 882 883 if (dynamic_fault("bcachefs:add:journal_alloc")) 884 return -ENOMEM; 885 886 /* 887 * clamp journal size to 1024 buckets or 512MB (in sectors), whichever 888 * is smaller: 889 */ 890 nr = clamp_t(unsigned, ca->mi.nbuckets >> 8, 891 BCH_JOURNAL_BUCKETS_MIN, 892 min(1 << 10, 893 (1 << 20) / ca->mi.bucket_size)); 894 895 return __bch2_set_nr_journal_buckets(ca, nr, true, NULL); 896 } 897 898 /* startup/shutdown: */ 899 900 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx) 901 { 902 union journal_res_state state; 903 struct journal_buf *w; 904 bool ret; 905 906 spin_lock(&j->lock); 907 state = READ_ONCE(j->reservations); 908 w = j->buf + !state.idx; 909 910 ret = state.prev_buf_unwritten && 911 bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx); 912 spin_unlock(&j->lock); 913 914 return ret; 915 } 916 917 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca) 918 { 919 spin_lock(&j->lock); 920 bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx); 921 spin_unlock(&j->lock); 922 923 wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx)); 924 } 925 926 void bch2_fs_journal_stop(struct journal *j) 927 { 928 struct bch_fs *c = container_of(j, struct bch_fs, journal); 929 930 wait_event(j->wait, journal_flush_write(j)); 931 932 /* do we need to write another journal entry? */ 933 if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) || 934 c->btree_roots_dirty) 935 bch2_journal_meta(j); 936 937 BUG_ON(!bch2_journal_error(j) && 938 test_bit(JOURNAL_NOT_EMPTY, &j->flags)); 939 940 cancel_delayed_work_sync(&j->write_work); 941 cancel_delayed_work_sync(&j->reclaim_work); 942 } 943 944 void bch2_fs_journal_start(struct journal *j) 945 { 946 struct journal_seq_blacklist *bl; 947 u64 blacklist = 0; 948 949 list_for_each_entry(bl, &j->seq_blacklist, list) 950 blacklist = max(blacklist, bl->end); 951 952 spin_lock(&j->lock); 953 954 set_bit(JOURNAL_STARTED, &j->flags); 955 956 while (journal_cur_seq(j) < blacklist) 957 journal_pin_new_entry(j, 0); 958 959 /* 960 * journal_buf_switch() only inits the next journal entry when it 961 * closes an open journal entry - the very first journal entry gets 962 * initialized here: 963 */ 964 journal_pin_new_entry(j, 1); 965 bch2_journal_buf_init(j); 966 967 spin_unlock(&j->lock); 968 969 /* 970 * Adding entries to the next journal entry before allocating space on 971 * disk for the next journal entry - this is ok, because these entries 972 * only have to go down with the next journal entry we write: 973 */ 974 bch2_journal_seq_blacklist_write(j); 975 976 queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0); 977 } 978 979 /* init/exit: */ 980 981 void bch2_dev_journal_exit(struct bch_dev *ca) 982 { 983 kfree(ca->journal.bio); 984 kfree(ca->journal.buckets); 985 kfree(ca->journal.bucket_seq); 986 987 ca->journal.bio = NULL; 988 ca->journal.buckets = NULL; 989 ca->journal.bucket_seq = NULL; 990 } 991 992 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) 993 { 994 struct journal_device *ja = &ca->journal; 995 struct bch_sb_field_journal *journal_buckets = 996 bch2_sb_get_journal(sb); 997 unsigned i, nr_bvecs; 998 999 ja->nr = bch2_nr_journal_buckets(journal_buckets); 1000 1001 ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL); 1002 if (!ja->bucket_seq) 1003 return -ENOMEM; 1004 1005 nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE); 1006 1007 ca->journal.bio = bio_kmalloc(nr_bvecs, GFP_KERNEL); 1008 if (!ca->journal.bio) 1009 return -ENOMEM; 1010 1011 bio_init(ca->journal.bio, NULL, ca->journal.bio->bi_inline_vecs, nr_bvecs, 0); 1012 1013 ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL); 1014 if (!ja->buckets) 1015 return -ENOMEM; 1016 1017 for (i = 0; i < ja->nr; i++) 1018 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]); 1019 1020 return 0; 1021 } 1022 1023 void bch2_fs_journal_exit(struct journal *j) 1024 { 1025 kvpfree(j->buf[1].data, j->buf[1].size); 1026 kvpfree(j->buf[0].data, j->buf[0].size); 1027 free_fifo(&j->pin); 1028 } 1029 1030 int bch2_fs_journal_init(struct journal *j) 1031 { 1032 struct bch_fs *c = container_of(j, struct bch_fs, journal); 1033 static struct lock_class_key res_key; 1034 int ret = 0; 1035 1036 pr_verbose_init(c->opts, ""); 1037 1038 spin_lock_init(&j->lock); 1039 spin_lock_init(&j->err_lock); 1040 init_waitqueue_head(&j->wait); 1041 INIT_DELAYED_WORK(&j->write_work, journal_write_work); 1042 INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work); 1043 init_waitqueue_head(&j->pin_flush_wait); 1044 mutex_init(&j->blacklist_lock); 1045 INIT_LIST_HEAD(&j->seq_blacklist); 1046 mutex_init(&j->reclaim_lock); 1047 1048 lockdep_init_map(&j->res_map, "journal res", &res_key, 0); 1049 1050 j->buf[0].size = JOURNAL_ENTRY_SIZE_MIN; 1051 j->buf[1].size = JOURNAL_ENTRY_SIZE_MIN; 1052 j->write_delay_ms = 1000; 1053 j->reclaim_delay_ms = 100; 1054 1055 bkey_extent_init(&j->key); 1056 1057 atomic64_set(&j->reservations.counter, 1058 ((union journal_res_state) 1059 { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v); 1060 1061 if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 1062 !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) || 1063 !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) { 1064 ret = -ENOMEM; 1065 goto out; 1066 } 1067 1068 j->pin.front = j->pin.back = 1; 1069 out: 1070 pr_verbose_init(c->opts, "ret %i", ret); 1071 return ret; 1072 } 1073 1074 /* debug: */ 1075 1076 ssize_t bch2_journal_print_debug(struct journal *j, char *buf) 1077 { 1078 struct bch_fs *c = container_of(j, struct bch_fs, journal); 1079 union journal_res_state *s = &j->reservations; 1080 struct bch_dev *ca; 1081 unsigned iter; 1082 ssize_t ret = 0; 1083 1084 rcu_read_lock(); 1085 spin_lock(&j->lock); 1086 1087 ret += scnprintf(buf + ret, PAGE_SIZE - ret, 1088 "active journal entries:\t%llu\n" 1089 "seq:\t\t\t%llu\n" 1090 "last_seq:\t\t%llu\n" 1091 "last_seq_ondisk:\t%llu\n" 1092 "reservation count:\t%u\n" 1093 "reservation offset:\t%u\n" 1094 "current entry u64s:\t%u\n" 1095 "io in flight:\t\t%i\n" 1096 "need write:\t\t%i\n" 1097 "dirty:\t\t\t%i\n" 1098 "replay done:\t\t%i\n", 1099 fifo_used(&j->pin), 1100 journal_cur_seq(j), 1101 journal_last_seq(j), 1102 j->last_seq_ondisk, 1103 journal_state_count(*s, s->idx), 1104 s->cur_entry_offset, 1105 j->cur_entry_u64s, 1106 s->prev_buf_unwritten, 1107 test_bit(JOURNAL_NEED_WRITE, &j->flags), 1108 journal_entry_is_open(j), 1109 test_bit(JOURNAL_REPLAY_DONE, &j->flags)); 1110 1111 for_each_member_device_rcu(ca, c, iter, 1112 &c->rw_devs[BCH_DATA_JOURNAL]) { 1113 struct journal_device *ja = &ca->journal; 1114 1115 if (!ja->nr) 1116 continue; 1117 1118 ret += scnprintf(buf + ret, PAGE_SIZE - ret, 1119 "dev %u:\n" 1120 "\tnr\t\t%u\n" 1121 "\tcur_idx\t\t%u (seq %llu)\n" 1122 "\tlast_idx\t%u (seq %llu)\n", 1123 iter, ja->nr, 1124 ja->cur_idx, ja->bucket_seq[ja->cur_idx], 1125 ja->last_idx, ja->bucket_seq[ja->last_idx]); 1126 } 1127 1128 spin_unlock(&j->lock); 1129 rcu_read_unlock(); 1130 1131 return ret; 1132 } 1133 1134 ssize_t bch2_journal_print_pins(struct journal *j, char *buf) 1135 { 1136 struct journal_entry_pin_list *pin_list; 1137 struct journal_entry_pin *pin; 1138 ssize_t ret = 0; 1139 u64 i; 1140 1141 spin_lock(&j->lock); 1142 fifo_for_each_entry_ptr(pin_list, &j->pin, i) { 1143 ret += scnprintf(buf + ret, PAGE_SIZE - ret, 1144 "%llu: count %u\n", 1145 i, atomic_read(&pin_list->count)); 1146 1147 list_for_each_entry(pin, &pin_list->list, list) 1148 ret += scnprintf(buf + ret, PAGE_SIZE - ret, 1149 "\t%p %pf\n", 1150 pin, pin->flush); 1151 1152 if (!list_empty(&pin_list->flushed)) 1153 ret += scnprintf(buf + ret, PAGE_SIZE - ret, 1154 "flushed:\n"); 1155 1156 list_for_each_entry(pin, &pin_list->flushed, list) 1157 ret += scnprintf(buf + ret, PAGE_SIZE - ret, 1158 "\t%p %pf\n", 1159 pin, pin->flush); 1160 } 1161 spin_unlock(&j->lock); 1162 1163 return ret; 1164 } 1165