1 // SPDX-License-Identifier: GPL-2.0 2 3 #include "bcachefs.h" 4 #include "btree_key_cache.h" 5 #include "btree_update.h" 6 #include "btree_write_buffer.h" 7 #include "buckets.h" 8 #include "errcode.h" 9 #include "error.h" 10 #include "journal.h" 11 #include "journal_io.h" 12 #include "journal_reclaim.h" 13 #include "replicas.h" 14 #include "sb-members.h" 15 #include "trace.h" 16 17 #include <linux/kthread.h> 18 #include <linux/sched/mm.h> 19 20 static bool __should_discard_bucket(struct journal *, struct journal_device *); 21 22 /* Free space calculations: */ 23 24 static unsigned journal_space_from(struct journal_device *ja, 25 enum journal_space_from from) 26 { 27 switch (from) { 28 case journal_space_discarded: 29 return ja->discard_idx; 30 case journal_space_clean_ondisk: 31 return ja->dirty_idx_ondisk; 32 case journal_space_clean: 33 return ja->dirty_idx; 34 default: 35 BUG(); 36 } 37 } 38 39 unsigned bch2_journal_dev_buckets_available(struct journal *j, 40 struct journal_device *ja, 41 enum journal_space_from from) 42 { 43 if (!ja->nr) 44 return 0; 45 46 unsigned available = (journal_space_from(ja, from) - 47 ja->cur_idx - 1 + ja->nr) % ja->nr; 48 49 /* 50 * Don't use the last bucket unless writing the new last_seq 51 * will make another bucket available: 52 */ 53 if (available && ja->dirty_idx_ondisk == ja->dirty_idx) 54 --available; 55 56 return available; 57 } 58 59 void bch2_journal_set_watermark(struct journal *j) 60 { 61 struct bch_fs *c = container_of(j, struct bch_fs, journal); 62 bool low_on_space = j->space[journal_space_clean].total * 4 <= 63 j->space[journal_space_total].total; 64 bool low_on_pin = fifo_free(&j->pin) < j->pin.size / 4; 65 bool low_on_wb = bch2_btree_write_buffer_must_wait(c); 66 unsigned watermark = low_on_space || low_on_pin || low_on_wb 67 ? BCH_WATERMARK_reclaim 68 : BCH_WATERMARK_stripe; 69 70 if (track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_space], low_on_space) || 71 track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_pin], low_on_pin) || 72 track_event_change(&c->times[BCH_TIME_blocked_write_buffer_full], low_on_wb)) 73 trace_and_count(c, journal_full, c); 74 75 mod_bit(JOURNAL_space_low, &j->flags, low_on_space || low_on_pin); 76 77 swap(watermark, j->watermark); 78 if (watermark > j->watermark) 79 journal_wake(j); 80 } 81 82 static struct journal_space 83 journal_dev_space_available(struct journal *j, struct bch_dev *ca, 84 enum journal_space_from from) 85 { 86 struct bch_fs *c = container_of(j, struct bch_fs, journal); 87 struct journal_device *ja = &ca->journal; 88 unsigned sectors, buckets, unwritten; 89 unsigned bucket_size_aligned = round_down(ca->mi.bucket_size, block_sectors(c)); 90 u64 seq; 91 92 if (from == journal_space_total) 93 return (struct journal_space) { 94 .next_entry = bucket_size_aligned, 95 .total = bucket_size_aligned * ja->nr, 96 }; 97 98 buckets = bch2_journal_dev_buckets_available(j, ja, from); 99 sectors = round_down(ja->sectors_free, block_sectors(c)); 100 101 /* 102 * We that we don't allocate the space for a journal entry 103 * until we write it out - thus, account for it here: 104 */ 105 for (seq = journal_last_unwritten_seq(j); 106 seq <= journal_cur_seq(j); 107 seq++) { 108 unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors; 109 110 if (!unwritten) 111 continue; 112 113 /* entry won't fit on this device, skip: */ 114 if (unwritten > bucket_size_aligned) 115 continue; 116 117 if (unwritten >= sectors) { 118 if (!buckets) { 119 sectors = 0; 120 break; 121 } 122 123 buckets--; 124 sectors = bucket_size_aligned; 125 } 126 127 sectors -= unwritten; 128 } 129 130 if (sectors < ca->mi.bucket_size && buckets) { 131 buckets--; 132 sectors = bucket_size_aligned; 133 } 134 135 return (struct journal_space) { 136 .next_entry = sectors, 137 .total = sectors + buckets * bucket_size_aligned, 138 }; 139 } 140 141 static struct journal_space __journal_space_available(struct journal *j, unsigned nr_devs_want, 142 enum journal_space_from from) 143 { 144 struct bch_fs *c = container_of(j, struct bch_fs, journal); 145 unsigned pos, nr_devs = 0; 146 struct journal_space space, dev_space[BCH_SB_MEMBERS_MAX]; 147 unsigned min_bucket_size = U32_MAX; 148 149 BUG_ON(nr_devs_want > ARRAY_SIZE(dev_space)); 150 151 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) { 152 if (!ca->journal.nr || 153 !ca->mi.durability) 154 continue; 155 156 min_bucket_size = min(min_bucket_size, ca->mi.bucket_size); 157 158 space = journal_dev_space_available(j, ca, from); 159 if (!space.next_entry) 160 continue; 161 162 for (pos = 0; pos < nr_devs; pos++) 163 if (space.total > dev_space[pos].total) 164 break; 165 166 array_insert_item(dev_space, nr_devs, pos, space); 167 } 168 169 if (nr_devs < nr_devs_want) 170 return (struct journal_space) { 0, 0 }; 171 172 /* 173 * It's possible for bucket size to be misaligned w.r.t. the filesystem 174 * block size: 175 */ 176 min_bucket_size = round_down(min_bucket_size, block_sectors(c)); 177 178 /* 179 * We sorted largest to smallest, and we want the smallest out of the 180 * @nr_devs_want largest devices: 181 */ 182 space = dev_space[nr_devs_want - 1]; 183 space.next_entry = min(space.next_entry, min_bucket_size); 184 return space; 185 } 186 187 void bch2_journal_space_available(struct journal *j) 188 { 189 struct bch_fs *c = container_of(j, struct bch_fs, journal); 190 unsigned clean, clean_ondisk, total; 191 unsigned max_entry_size = min(j->buf[0].buf_size >> 9, 192 j->buf[1].buf_size >> 9); 193 unsigned nr_online = 0, nr_devs_want; 194 bool can_discard = false; 195 int ret = 0; 196 197 lockdep_assert_held(&j->lock); 198 guard(rcu)(); 199 200 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) { 201 struct journal_device *ja = &ca->journal; 202 203 if (!ja->nr) 204 continue; 205 206 while (ja->dirty_idx != ja->cur_idx && 207 ja->bucket_seq[ja->dirty_idx] < journal_last_seq(j)) 208 ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr; 209 210 while (ja->dirty_idx_ondisk != ja->dirty_idx && 211 ja->bucket_seq[ja->dirty_idx_ondisk] < j->last_seq_ondisk) 212 ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr; 213 214 can_discard |= __should_discard_bucket(j, ja); 215 216 max_entry_size = min_t(unsigned, max_entry_size, ca->mi.bucket_size); 217 nr_online++; 218 } 219 220 j->can_discard = can_discard; 221 222 if (nr_online < metadata_replicas_required(c)) { 223 if (!(c->sb.features & BIT_ULL(BCH_FEATURE_small_image))) { 224 struct printbuf buf = PRINTBUF; 225 buf.atomic++; 226 prt_printf(&buf, "insufficient writeable journal devices available: have %u, need %u\n" 227 "rw journal devs:", nr_online, metadata_replicas_required(c)); 228 229 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) 230 prt_printf(&buf, " %s", ca->name); 231 232 bch_err(c, "%s", buf.buf); 233 printbuf_exit(&buf); 234 } 235 ret = bch_err_throw(c, insufficient_journal_devices); 236 goto out; 237 } 238 239 nr_devs_want = min_t(unsigned, nr_online, c->opts.metadata_replicas); 240 241 for (unsigned i = 0; i < journal_space_nr; i++) 242 j->space[i] = __journal_space_available(j, nr_devs_want, i); 243 244 clean_ondisk = j->space[journal_space_clean_ondisk].total; 245 clean = j->space[journal_space_clean].total; 246 total = j->space[journal_space_total].total; 247 248 if (!j->space[journal_space_discarded].next_entry) 249 ret = bch_err_throw(c, journal_full); 250 251 if ((j->space[journal_space_clean_ondisk].next_entry < 252 j->space[journal_space_clean_ondisk].total) && 253 (clean - clean_ondisk <= total / 8) && 254 (clean_ondisk * 2 > clean)) 255 set_bit(JOURNAL_may_skip_flush, &j->flags); 256 else 257 clear_bit(JOURNAL_may_skip_flush, &j->flags); 258 259 bch2_journal_set_watermark(j); 260 out: 261 j->cur_entry_sectors = !ret 262 ? j->space[journal_space_discarded].next_entry 263 : 0; 264 j->cur_entry_error = ret; 265 266 if (!ret) 267 journal_wake(j); 268 } 269 270 /* Discards - last part of journal reclaim: */ 271 272 static bool __should_discard_bucket(struct journal *j, struct journal_device *ja) 273 { 274 unsigned min_free = max(4, ja->nr / 8); 275 276 return bch2_journal_dev_buckets_available(j, ja, journal_space_discarded) < 277 min_free && 278 ja->discard_idx != ja->dirty_idx_ondisk; 279 } 280 281 static bool should_discard_bucket(struct journal *j, struct journal_device *ja) 282 { 283 spin_lock(&j->lock); 284 bool ret = __should_discard_bucket(j, ja); 285 spin_unlock(&j->lock); 286 287 return ret; 288 } 289 290 /* 291 * Advance ja->discard_idx as long as it points to buckets that are no longer 292 * dirty, issuing discards if necessary: 293 */ 294 void bch2_journal_do_discards(struct journal *j) 295 { 296 struct bch_fs *c = container_of(j, struct bch_fs, journal); 297 298 mutex_lock(&j->discard_lock); 299 300 for_each_rw_member(c, ca, BCH_DEV_WRITE_REF_journal_do_discards) { 301 struct journal_device *ja = &ca->journal; 302 303 while (should_discard_bucket(j, ja)) { 304 if (!c->opts.nochanges && 305 bch2_discard_opt_enabled(c, ca) && 306 bdev_max_discard_sectors(ca->disk_sb.bdev)) 307 blkdev_issue_discard(ca->disk_sb.bdev, 308 bucket_to_sector(ca, 309 ja->buckets[ja->discard_idx]), 310 ca->mi.bucket_size, GFP_NOFS); 311 312 spin_lock(&j->lock); 313 ja->discard_idx = (ja->discard_idx + 1) % ja->nr; 314 315 bch2_journal_space_available(j); 316 spin_unlock(&j->lock); 317 } 318 } 319 320 mutex_unlock(&j->discard_lock); 321 } 322 323 /* 324 * Journal entry pinning - machinery for holding a reference on a given journal 325 * entry, holding it open to ensure it gets replayed during recovery: 326 */ 327 328 void bch2_journal_reclaim_fast(struct journal *j) 329 { 330 bool popped = false; 331 332 lockdep_assert_held(&j->lock); 333 334 /* 335 * Unpin journal entries whose reference counts reached zero, meaning 336 * all btree nodes got written out 337 */ 338 while (!fifo_empty(&j->pin) && 339 j->pin.front <= j->seq_ondisk && 340 !atomic_read(&fifo_peek_front(&j->pin).count)) { 341 j->pin.front++; 342 popped = true; 343 } 344 345 if (popped) { 346 bch2_journal_space_available(j); 347 __closure_wake_up(&j->reclaim_flush_wait); 348 } 349 } 350 351 bool __bch2_journal_pin_put(struct journal *j, u64 seq) 352 { 353 struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq); 354 355 return atomic_dec_and_test(&pin_list->count); 356 } 357 358 void bch2_journal_pin_put(struct journal *j, u64 seq) 359 { 360 if (__bch2_journal_pin_put(j, seq)) { 361 spin_lock(&j->lock); 362 bch2_journal_reclaim_fast(j); 363 spin_unlock(&j->lock); 364 } 365 } 366 367 static inline bool __journal_pin_drop(struct journal *j, 368 struct journal_entry_pin *pin) 369 { 370 struct journal_entry_pin_list *pin_list; 371 372 if (!journal_pin_active(pin)) 373 return false; 374 375 if (j->flush_in_progress == pin) 376 j->flush_in_progress_dropped = true; 377 378 pin_list = journal_seq_pin(j, pin->seq); 379 pin->seq = 0; 380 list_del_init(&pin->list); 381 382 if (j->reclaim_flush_wait.list.first) 383 __closure_wake_up(&j->reclaim_flush_wait); 384 385 /* 386 * Unpinning a journal entry may make journal_next_bucket() succeed, if 387 * writing a new last_seq will now make another bucket available: 388 */ 389 return atomic_dec_and_test(&pin_list->count) && 390 pin_list == &fifo_peek_front(&j->pin); 391 } 392 393 void bch2_journal_pin_drop(struct journal *j, 394 struct journal_entry_pin *pin) 395 { 396 spin_lock(&j->lock); 397 if (__journal_pin_drop(j, pin)) 398 bch2_journal_reclaim_fast(j); 399 spin_unlock(&j->lock); 400 } 401 402 static enum journal_pin_type journal_pin_type(struct journal_entry_pin *pin, 403 journal_pin_flush_fn fn) 404 { 405 if (fn == bch2_btree_node_flush0 || 406 fn == bch2_btree_node_flush1) { 407 unsigned idx = fn == bch2_btree_node_flush1; 408 struct btree *b = container_of(pin, struct btree, writes[idx].journal); 409 410 return JOURNAL_PIN_TYPE_btree0 - b->c.level; 411 } else if (fn == bch2_btree_key_cache_journal_flush) 412 return JOURNAL_PIN_TYPE_key_cache; 413 else 414 return JOURNAL_PIN_TYPE_other; 415 } 416 417 static inline void bch2_journal_pin_set_locked(struct journal *j, u64 seq, 418 struct journal_entry_pin *pin, 419 journal_pin_flush_fn flush_fn, 420 enum journal_pin_type type) 421 { 422 struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq); 423 424 /* 425 * flush_fn is how we identify journal pins in debugfs, so must always 426 * exist, even if it doesn't do anything: 427 */ 428 BUG_ON(!flush_fn); 429 430 atomic_inc(&pin_list->count); 431 pin->seq = seq; 432 pin->flush = flush_fn; 433 434 if (list_empty(&pin_list->unflushed[type]) && 435 j->reclaim_flush_wait.list.first) 436 __closure_wake_up(&j->reclaim_flush_wait); 437 438 list_add(&pin->list, &pin_list->unflushed[type]); 439 } 440 441 void bch2_journal_pin_copy(struct journal *j, 442 struct journal_entry_pin *dst, 443 struct journal_entry_pin *src, 444 journal_pin_flush_fn flush_fn) 445 { 446 spin_lock(&j->lock); 447 448 u64 seq = READ_ONCE(src->seq); 449 450 if (seq < journal_last_seq(j)) { 451 /* 452 * bch2_journal_pin_copy() raced with bch2_journal_pin_drop() on 453 * the src pin - with the pin dropped, the entry to pin might no 454 * longer to exist, but that means there's no longer anything to 455 * copy and we can bail out here: 456 */ 457 spin_unlock(&j->lock); 458 return; 459 } 460 461 bool reclaim = __journal_pin_drop(j, dst); 462 463 bch2_journal_pin_set_locked(j, seq, dst, flush_fn, journal_pin_type(dst, flush_fn)); 464 465 if (reclaim) 466 bch2_journal_reclaim_fast(j); 467 468 /* 469 * If the journal is currently full, we might want to call flush_fn 470 * immediately: 471 */ 472 if (seq == journal_last_seq(j)) 473 journal_wake(j); 474 spin_unlock(&j->lock); 475 } 476 477 void bch2_journal_pin_set(struct journal *j, u64 seq, 478 struct journal_entry_pin *pin, 479 journal_pin_flush_fn flush_fn) 480 { 481 spin_lock(&j->lock); 482 483 BUG_ON(seq < journal_last_seq(j)); 484 485 bool reclaim = __journal_pin_drop(j, pin); 486 487 bch2_journal_pin_set_locked(j, seq, pin, flush_fn, journal_pin_type(pin, flush_fn)); 488 489 if (reclaim) 490 bch2_journal_reclaim_fast(j); 491 /* 492 * If the journal is currently full, we might want to call flush_fn 493 * immediately: 494 */ 495 if (seq == journal_last_seq(j)) 496 journal_wake(j); 497 498 spin_unlock(&j->lock); 499 } 500 501 /** 502 * bch2_journal_pin_flush: ensure journal pin callback is no longer running 503 * @j: journal object 504 * @pin: pin to flush 505 */ 506 void bch2_journal_pin_flush(struct journal *j, struct journal_entry_pin *pin) 507 { 508 BUG_ON(journal_pin_active(pin)); 509 510 wait_event(j->pin_flush_wait, j->flush_in_progress != pin); 511 } 512 513 /* 514 * Journal reclaim: flush references to open journal entries to reclaim space in 515 * the journal 516 * 517 * May be done by the journal code in the background as needed to free up space 518 * for more journal entries, or as part of doing a clean shutdown, or to migrate 519 * data off of a specific device: 520 */ 521 522 static struct journal_entry_pin * 523 journal_get_next_pin(struct journal *j, 524 u64 seq_to_flush, 525 unsigned allowed_below_seq, 526 unsigned allowed_above_seq, 527 u64 *seq) 528 { 529 struct journal_entry_pin_list *pin_list; 530 struct journal_entry_pin *ret = NULL; 531 532 fifo_for_each_entry_ptr(pin_list, &j->pin, *seq) { 533 if (*seq > seq_to_flush && !allowed_above_seq) 534 break; 535 536 for (unsigned i = 0; i < JOURNAL_PIN_TYPE_NR; i++) 537 if (((BIT(i) & allowed_below_seq) && *seq <= seq_to_flush) || 538 (BIT(i) & allowed_above_seq)) { 539 ret = list_first_entry_or_null(&pin_list->unflushed[i], 540 struct journal_entry_pin, list); 541 if (ret) 542 return ret; 543 } 544 } 545 546 return NULL; 547 } 548 549 /* returns true if we did work */ 550 static size_t journal_flush_pins(struct journal *j, 551 u64 seq_to_flush, 552 unsigned allowed_below_seq, 553 unsigned allowed_above_seq, 554 unsigned min_any, 555 unsigned min_key_cache) 556 { 557 struct journal_entry_pin *pin; 558 size_t nr_flushed = 0; 559 journal_pin_flush_fn flush_fn; 560 u64 seq; 561 int err; 562 563 lockdep_assert_held(&j->reclaim_lock); 564 565 while (1) { 566 unsigned allowed_above = allowed_above_seq; 567 unsigned allowed_below = allowed_below_seq; 568 569 if (min_any) { 570 allowed_above |= ~0; 571 allowed_below |= ~0; 572 } 573 574 if (min_key_cache) { 575 allowed_above |= BIT(JOURNAL_PIN_TYPE_key_cache); 576 allowed_below |= BIT(JOURNAL_PIN_TYPE_key_cache); 577 } 578 579 cond_resched(); 580 581 j->last_flushed = jiffies; 582 583 spin_lock(&j->lock); 584 pin = journal_get_next_pin(j, seq_to_flush, 585 allowed_below, 586 allowed_above, &seq); 587 if (pin) { 588 BUG_ON(j->flush_in_progress); 589 j->flush_in_progress = pin; 590 j->flush_in_progress_dropped = false; 591 flush_fn = pin->flush; 592 } 593 spin_unlock(&j->lock); 594 595 if (!pin) 596 break; 597 598 if (min_key_cache && pin->flush == bch2_btree_key_cache_journal_flush) 599 min_key_cache--; 600 601 if (min_any) 602 min_any--; 603 604 err = flush_fn(j, pin, seq); 605 606 spin_lock(&j->lock); 607 /* Pin might have been dropped or rearmed: */ 608 if (likely(!err && !j->flush_in_progress_dropped)) 609 list_move(&pin->list, &journal_seq_pin(j, seq)->flushed[journal_pin_type(pin, flush_fn)]); 610 j->flush_in_progress = NULL; 611 j->flush_in_progress_dropped = false; 612 spin_unlock(&j->lock); 613 614 wake_up(&j->pin_flush_wait); 615 616 if (err) 617 break; 618 619 nr_flushed++; 620 } 621 622 return nr_flushed; 623 } 624 625 static u64 journal_seq_to_flush(struct journal *j) 626 { 627 struct bch_fs *c = container_of(j, struct bch_fs, journal); 628 u64 seq_to_flush = 0; 629 630 guard(spinlock)(&j->lock); 631 guard(rcu)(); 632 633 for_each_rw_member_rcu(c, ca) { 634 struct journal_device *ja = &ca->journal; 635 unsigned nr_buckets, bucket_to_flush; 636 637 if (!ja->nr) 638 continue; 639 640 /* Try to keep the journal at most half full: */ 641 nr_buckets = ja->nr / 2; 642 643 bucket_to_flush = (ja->cur_idx + nr_buckets) % ja->nr; 644 seq_to_flush = max(seq_to_flush, 645 ja->bucket_seq[bucket_to_flush]); 646 } 647 648 /* Also flush if the pin fifo is more than half full */ 649 return max_t(s64, seq_to_flush, 650 (s64) journal_cur_seq(j) - 651 (j->pin.size >> 1)); 652 } 653 654 /** 655 * __bch2_journal_reclaim - free up journal buckets 656 * @j: journal object 657 * @direct: direct or background reclaim? 658 * @kicked: requested to run since we last ran? 659 * 660 * Background journal reclaim writes out btree nodes. It should be run 661 * early enough so that we never completely run out of journal buckets. 662 * 663 * High watermarks for triggering background reclaim: 664 * - FIFO has fewer than 512 entries left 665 * - fewer than 25% journal buckets free 666 * 667 * Background reclaim runs until low watermarks are reached: 668 * - FIFO has more than 1024 entries left 669 * - more than 50% journal buckets free 670 * 671 * As long as a reclaim can complete in the time it takes to fill up 672 * 512 journal entries or 25% of all journal buckets, then 673 * journal_next_bucket() should not stall. 674 */ 675 static int __bch2_journal_reclaim(struct journal *j, bool direct, bool kicked) 676 { 677 struct bch_fs *c = container_of(j, struct bch_fs, journal); 678 struct btree_cache *bc = &c->btree_cache; 679 bool kthread = (current->flags & PF_KTHREAD) != 0; 680 u64 seq_to_flush; 681 size_t min_nr, min_key_cache, nr_flushed; 682 unsigned flags; 683 int ret = 0; 684 685 /* 686 * We can't invoke memory reclaim while holding the reclaim_lock - 687 * journal reclaim is required to make progress for memory reclaim 688 * (cleaning the caches), so we can't get stuck in memory reclaim while 689 * we're holding the reclaim lock: 690 */ 691 lockdep_assert_held(&j->reclaim_lock); 692 flags = memalloc_noreclaim_save(); 693 694 do { 695 if (kthread && kthread_should_stop()) 696 break; 697 698 ret = bch2_journal_error(j); 699 if (ret) 700 break; 701 702 /* XXX shove journal discards off to another thread */ 703 bch2_journal_do_discards(j); 704 705 seq_to_flush = journal_seq_to_flush(j); 706 min_nr = 0; 707 708 /* 709 * If it's been longer than j->reclaim_delay_ms since we last flushed, 710 * make sure to flush at least one journal pin: 711 */ 712 if (time_after(jiffies, j->last_flushed + 713 msecs_to_jiffies(c->opts.journal_reclaim_delay))) 714 min_nr = 1; 715 716 if (j->watermark != BCH_WATERMARK_stripe) 717 min_nr = 1; 718 719 size_t btree_cache_live = bc->live[0].nr + bc->live[1].nr; 720 if (atomic_long_read(&bc->nr_dirty) * 2 > btree_cache_live) 721 min_nr = 1; 722 723 min_key_cache = min(bch2_nr_btree_keys_need_flush(c), (size_t) 128); 724 725 trace_and_count(c, journal_reclaim_start, c, 726 direct, kicked, 727 min_nr, min_key_cache, 728 atomic_long_read(&bc->nr_dirty), btree_cache_live, 729 atomic_long_read(&c->btree_key_cache.nr_dirty), 730 atomic_long_read(&c->btree_key_cache.nr_keys)); 731 732 nr_flushed = journal_flush_pins(j, seq_to_flush, 733 ~0, 0, 734 min_nr, min_key_cache); 735 736 if (direct) 737 j->nr_direct_reclaim += nr_flushed; 738 else 739 j->nr_background_reclaim += nr_flushed; 740 trace_and_count(c, journal_reclaim_finish, c, nr_flushed); 741 742 if (nr_flushed) 743 wake_up(&j->reclaim_wait); 744 } while ((min_nr || min_key_cache) && nr_flushed && !direct); 745 746 memalloc_noreclaim_restore(flags); 747 748 return ret; 749 } 750 751 int bch2_journal_reclaim(struct journal *j) 752 { 753 return __bch2_journal_reclaim(j, true, true); 754 } 755 756 static int bch2_journal_reclaim_thread(void *arg) 757 { 758 struct journal *j = arg; 759 struct bch_fs *c = container_of(j, struct bch_fs, journal); 760 unsigned long delay, now; 761 bool journal_empty; 762 int ret = 0; 763 764 set_freezable(); 765 766 j->last_flushed = jiffies; 767 768 while (!ret && !kthread_should_stop()) { 769 bool kicked = j->reclaim_kicked; 770 771 j->reclaim_kicked = false; 772 773 mutex_lock(&j->reclaim_lock); 774 ret = __bch2_journal_reclaim(j, false, kicked); 775 mutex_unlock(&j->reclaim_lock); 776 777 now = jiffies; 778 delay = msecs_to_jiffies(c->opts.journal_reclaim_delay); 779 j->next_reclaim = j->last_flushed + delay; 780 781 if (!time_in_range(j->next_reclaim, now, now + delay)) 782 j->next_reclaim = now + delay; 783 784 while (1) { 785 set_current_state(TASK_INTERRUPTIBLE|TASK_FREEZABLE); 786 if (kthread_should_stop()) 787 break; 788 if (j->reclaim_kicked) 789 break; 790 791 spin_lock(&j->lock); 792 journal_empty = fifo_empty(&j->pin); 793 spin_unlock(&j->lock); 794 795 long timeout = j->next_reclaim - jiffies; 796 797 if (journal_empty) 798 schedule(); 799 else if (timeout > 0) 800 schedule_timeout(timeout); 801 else 802 break; 803 } 804 __set_current_state(TASK_RUNNING); 805 } 806 807 return 0; 808 } 809 810 void bch2_journal_reclaim_stop(struct journal *j) 811 { 812 struct task_struct *p = j->reclaim_thread; 813 814 j->reclaim_thread = NULL; 815 816 if (p) { 817 kthread_stop(p); 818 put_task_struct(p); 819 } 820 } 821 822 int bch2_journal_reclaim_start(struct journal *j) 823 { 824 struct bch_fs *c = container_of(j, struct bch_fs, journal); 825 struct task_struct *p; 826 int ret; 827 828 if (j->reclaim_thread) 829 return 0; 830 831 p = kthread_create(bch2_journal_reclaim_thread, j, 832 "bch-reclaim/%s", c->name); 833 ret = PTR_ERR_OR_ZERO(p); 834 bch_err_msg(c, ret, "creating journal reclaim thread"); 835 if (ret) 836 return ret; 837 838 get_task_struct(p); 839 j->reclaim_thread = p; 840 wake_up_process(p); 841 return 0; 842 } 843 844 static bool journal_pins_still_flushing(struct journal *j, u64 seq_to_flush, 845 unsigned types) 846 { 847 struct journal_entry_pin_list *pin_list; 848 u64 seq; 849 850 spin_lock(&j->lock); 851 fifo_for_each_entry_ptr(pin_list, &j->pin, seq) { 852 if (seq > seq_to_flush) 853 break; 854 855 for (unsigned i = 0; i < JOURNAL_PIN_TYPE_NR; i++) 856 if ((BIT(i) & types) && 857 (!list_empty(&pin_list->unflushed[i]) || 858 !list_empty(&pin_list->flushed[i]))) { 859 spin_unlock(&j->lock); 860 return true; 861 } 862 } 863 spin_unlock(&j->lock); 864 865 return false; 866 } 867 868 static bool journal_flush_pins_or_still_flushing(struct journal *j, u64 seq_to_flush, 869 unsigned types) 870 { 871 return journal_flush_pins(j, seq_to_flush, types, 0, 0, 0) || 872 journal_pins_still_flushing(j, seq_to_flush, types); 873 } 874 875 static int journal_flush_done(struct journal *j, u64 seq_to_flush, 876 bool *did_work) 877 { 878 int ret = 0; 879 880 ret = bch2_journal_error(j); 881 if (ret) 882 return ret; 883 884 mutex_lock(&j->reclaim_lock); 885 886 for (int type = JOURNAL_PIN_TYPE_NR - 1; 887 type >= 0; 888 --type) 889 if (journal_flush_pins_or_still_flushing(j, seq_to_flush, BIT(type))) { 890 *did_work = true; 891 goto unlock; 892 } 893 894 if (seq_to_flush > journal_cur_seq(j)) 895 bch2_journal_entry_close(j); 896 897 spin_lock(&j->lock); 898 /* 899 * If journal replay hasn't completed, the unreplayed journal entries 900 * hold refs on their corresponding sequence numbers 901 */ 902 ret = !test_bit(JOURNAL_replay_done, &j->flags) || 903 journal_last_seq(j) > seq_to_flush || 904 !fifo_used(&j->pin); 905 906 spin_unlock(&j->lock); 907 unlock: 908 mutex_unlock(&j->reclaim_lock); 909 910 return ret; 911 } 912 913 bool bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush) 914 { 915 /* time_stats this */ 916 bool did_work = false; 917 918 if (!test_bit(JOURNAL_running, &j->flags)) 919 return false; 920 921 closure_wait_event(&j->reclaim_flush_wait, 922 journal_flush_done(j, seq_to_flush, &did_work)); 923 924 return did_work; 925 } 926 927 int bch2_journal_flush_device_pins(struct journal *j, int dev_idx) 928 { 929 struct bch_fs *c = container_of(j, struct bch_fs, journal); 930 struct journal_entry_pin_list *p; 931 u64 iter, seq = 0; 932 int ret = 0; 933 934 spin_lock(&j->lock); 935 fifo_for_each_entry_ptr(p, &j->pin, iter) 936 if (dev_idx >= 0 937 ? bch2_dev_list_has_dev(p->devs, dev_idx) 938 : p->devs.nr < c->opts.metadata_replicas) 939 seq = iter; 940 spin_unlock(&j->lock); 941 942 bch2_journal_flush_pins(j, seq); 943 944 ret = bch2_journal_error(j); 945 if (ret) 946 return ret; 947 948 mutex_lock(&c->replicas_gc_lock); 949 bch2_replicas_gc_start(c, 1 << BCH_DATA_journal); 950 951 /* 952 * Now that we've populated replicas_gc, write to the journal to mark 953 * active journal devices. This handles the case where the journal might 954 * be empty. Otherwise we could clear all journal replicas and 955 * temporarily put the fs into an unrecoverable state. Journal recovery 956 * expects to find devices marked for journal data on unclean mount. 957 */ 958 ret = bch2_journal_meta(&c->journal); 959 if (ret) 960 goto err; 961 962 seq = 0; 963 spin_lock(&j->lock); 964 while (!ret) { 965 union bch_replicas_padded replicas; 966 967 seq = max(seq, journal_last_seq(j)); 968 if (seq >= j->pin.back) 969 break; 970 bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal, 971 journal_seq_pin(j, seq)->devs); 972 seq++; 973 974 if (replicas.e.nr_devs) { 975 spin_unlock(&j->lock); 976 ret = bch2_mark_replicas(c, &replicas.e); 977 spin_lock(&j->lock); 978 } 979 } 980 spin_unlock(&j->lock); 981 err: 982 ret = bch2_replicas_gc_end(c, ret); 983 mutex_unlock(&c->replicas_gc_lock); 984 985 return ret; 986 } 987 988 bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 *seq) 989 { 990 struct journal_entry_pin_list *pin_list; 991 struct journal_entry_pin *pin; 992 993 spin_lock(&j->lock); 994 if (!test_bit(JOURNAL_running, &j->flags)) { 995 spin_unlock(&j->lock); 996 return true; 997 } 998 999 *seq = max(*seq, j->pin.front); 1000 1001 if (*seq >= j->pin.back) { 1002 spin_unlock(&j->lock); 1003 return true; 1004 } 1005 1006 out->atomic++; 1007 1008 pin_list = journal_seq_pin(j, *seq); 1009 1010 prt_printf(out, "%llu: count %u\n", *seq, atomic_read(&pin_list->count)); 1011 printbuf_indent_add(out, 2); 1012 1013 prt_printf(out, "unflushed:\n"); 1014 for (unsigned i = 0; i < ARRAY_SIZE(pin_list->unflushed); i++) 1015 list_for_each_entry(pin, &pin_list->unflushed[i], list) 1016 prt_printf(out, "\t%px %ps\n", pin, pin->flush); 1017 1018 prt_printf(out, "flushed:\n"); 1019 for (unsigned i = 0; i < ARRAY_SIZE(pin_list->flushed); i++) 1020 list_for_each_entry(pin, &pin_list->flushed[i], list) 1021 prt_printf(out, "\t%px %ps\n", pin, pin->flush); 1022 1023 printbuf_indent_sub(out, 2); 1024 1025 --out->atomic; 1026 spin_unlock(&j->lock); 1027 1028 return false; 1029 } 1030 1031 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j) 1032 { 1033 u64 seq = 0; 1034 1035 while (!bch2_journal_seq_pins_to_text(out, j, &seq)) 1036 seq++; 1037 } 1038