1 // SPDX-License-Identifier: GPL-2.0 2 3 #include "bcachefs.h" 4 #include "btree_key_cache.h" 5 #include "btree_update.h" 6 #include "btree_write_buffer.h" 7 #include "buckets.h" 8 #include "errcode.h" 9 #include "error.h" 10 #include "journal.h" 11 #include "journal_io.h" 12 #include "journal_reclaim.h" 13 #include "replicas.h" 14 #include "sb-members.h" 15 #include "trace.h" 16 17 #include <linux/kthread.h> 18 #include <linux/sched/mm.h> 19 20 static bool __should_discard_bucket(struct journal *, struct journal_device *); 21 22 /* Free space calculations: */ 23 24 static unsigned journal_space_from(struct journal_device *ja, 25 enum journal_space_from from) 26 { 27 switch (from) { 28 case journal_space_discarded: 29 return ja->discard_idx; 30 case journal_space_clean_ondisk: 31 return ja->dirty_idx_ondisk; 32 case journal_space_clean: 33 return ja->dirty_idx; 34 default: 35 BUG(); 36 } 37 } 38 39 unsigned bch2_journal_dev_buckets_available(struct journal *j, 40 struct journal_device *ja, 41 enum journal_space_from from) 42 { 43 if (!ja->nr) 44 return 0; 45 46 unsigned available = (journal_space_from(ja, from) - 47 ja->cur_idx - 1 + ja->nr) % ja->nr; 48 49 /* 50 * Don't use the last bucket unless writing the new last_seq 51 * will make another bucket available: 52 */ 53 if (available && ja->dirty_idx_ondisk == ja->dirty_idx) 54 --available; 55 56 return available; 57 } 58 59 void bch2_journal_set_watermark(struct journal *j) 60 { 61 struct bch_fs *c = container_of(j, struct bch_fs, journal); 62 bool low_on_space = j->space[journal_space_clean].total * 4 <= 63 j->space[journal_space_total].total; 64 bool low_on_pin = fifo_free(&j->pin) < j->pin.size / 4; 65 bool low_on_wb = bch2_btree_write_buffer_must_wait(c); 66 unsigned watermark = low_on_space || low_on_pin || low_on_wb 67 ? BCH_WATERMARK_reclaim 68 : BCH_WATERMARK_stripe; 69 70 if (track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_space], low_on_space) || 71 track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_pin], low_on_pin) || 72 track_event_change(&c->times[BCH_TIME_blocked_write_buffer_full], low_on_wb)) 73 trace_and_count(c, journal_full, c); 74 75 mod_bit(JOURNAL_space_low, &j->flags, low_on_space || low_on_pin); 76 77 swap(watermark, j->watermark); 78 if (watermark > j->watermark) 79 journal_wake(j); 80 } 81 82 static struct journal_space 83 journal_dev_space_available(struct journal *j, struct bch_dev *ca, 84 enum journal_space_from from) 85 { 86 struct bch_fs *c = container_of(j, struct bch_fs, journal); 87 struct journal_device *ja = &ca->journal; 88 unsigned sectors, buckets, unwritten; 89 unsigned bucket_size_aligned = round_down(ca->mi.bucket_size, block_sectors(c)); 90 u64 seq; 91 92 if (from == journal_space_total) 93 return (struct journal_space) { 94 .next_entry = bucket_size_aligned, 95 .total = bucket_size_aligned * ja->nr, 96 }; 97 98 buckets = bch2_journal_dev_buckets_available(j, ja, from); 99 sectors = round_down(ja->sectors_free, block_sectors(c)); 100 101 /* 102 * We that we don't allocate the space for a journal entry 103 * until we write it out - thus, account for it here: 104 */ 105 for (seq = journal_last_unwritten_seq(j); 106 seq <= journal_cur_seq(j); 107 seq++) { 108 unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors; 109 110 if (!unwritten) 111 continue; 112 113 /* entry won't fit on this device, skip: */ 114 if (unwritten > bucket_size_aligned) 115 continue; 116 117 if (unwritten >= sectors) { 118 if (!buckets) { 119 sectors = 0; 120 break; 121 } 122 123 buckets--; 124 sectors = bucket_size_aligned; 125 } 126 127 sectors -= unwritten; 128 } 129 130 if (sectors < ca->mi.bucket_size && buckets) { 131 buckets--; 132 sectors = bucket_size_aligned; 133 } 134 135 return (struct journal_space) { 136 .next_entry = sectors, 137 .total = sectors + buckets * bucket_size_aligned, 138 }; 139 } 140 141 static struct journal_space __journal_space_available(struct journal *j, unsigned nr_devs_want, 142 enum journal_space_from from) 143 { 144 struct bch_fs *c = container_of(j, struct bch_fs, journal); 145 unsigned pos, nr_devs = 0; 146 struct journal_space space, dev_space[BCH_SB_MEMBERS_MAX]; 147 unsigned min_bucket_size = U32_MAX; 148 149 BUG_ON(nr_devs_want > ARRAY_SIZE(dev_space)); 150 151 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) { 152 if (!ca->journal.nr || 153 !ca->mi.durability) 154 continue; 155 156 min_bucket_size = min(min_bucket_size, ca->mi.bucket_size); 157 158 space = journal_dev_space_available(j, ca, from); 159 if (!space.next_entry) 160 continue; 161 162 for (pos = 0; pos < nr_devs; pos++) 163 if (space.total > dev_space[pos].total) 164 break; 165 166 array_insert_item(dev_space, nr_devs, pos, space); 167 } 168 169 if (nr_devs < nr_devs_want) 170 return (struct journal_space) { 0, 0 }; 171 172 /* 173 * We sorted largest to smallest, and we want the smallest out of the 174 * @nr_devs_want largest devices: 175 */ 176 space = dev_space[nr_devs_want - 1]; 177 space.next_entry = min(space.next_entry, min_bucket_size); 178 return space; 179 } 180 181 void bch2_journal_space_available(struct journal *j) 182 { 183 struct bch_fs *c = container_of(j, struct bch_fs, journal); 184 unsigned clean, clean_ondisk, total; 185 unsigned max_entry_size = min(j->buf[0].buf_size >> 9, 186 j->buf[1].buf_size >> 9); 187 unsigned nr_online = 0, nr_devs_want; 188 bool can_discard = false; 189 int ret = 0; 190 191 lockdep_assert_held(&j->lock); 192 guard(rcu)(); 193 194 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) { 195 struct journal_device *ja = &ca->journal; 196 197 if (!ja->nr) 198 continue; 199 200 while (ja->dirty_idx != ja->cur_idx && 201 ja->bucket_seq[ja->dirty_idx] < journal_last_seq(j)) 202 ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr; 203 204 while (ja->dirty_idx_ondisk != ja->dirty_idx && 205 ja->bucket_seq[ja->dirty_idx_ondisk] < j->last_seq_ondisk) 206 ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr; 207 208 can_discard |= __should_discard_bucket(j, ja); 209 210 max_entry_size = min_t(unsigned, max_entry_size, ca->mi.bucket_size); 211 nr_online++; 212 } 213 214 j->can_discard = can_discard; 215 216 if (nr_online < metadata_replicas_required(c)) { 217 if (!(c->sb.features & BIT_ULL(BCH_FEATURE_small_image))) { 218 struct printbuf buf = PRINTBUF; 219 buf.atomic++; 220 prt_printf(&buf, "insufficient writeable journal devices available: have %u, need %u\n" 221 "rw journal devs:", nr_online, metadata_replicas_required(c)); 222 223 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) 224 prt_printf(&buf, " %s", ca->name); 225 226 bch_err(c, "%s", buf.buf); 227 printbuf_exit(&buf); 228 } 229 ret = bch_err_throw(c, insufficient_journal_devices); 230 goto out; 231 } 232 233 nr_devs_want = min_t(unsigned, nr_online, c->opts.metadata_replicas); 234 235 for (unsigned i = 0; i < journal_space_nr; i++) 236 j->space[i] = __journal_space_available(j, nr_devs_want, i); 237 238 clean_ondisk = j->space[journal_space_clean_ondisk].total; 239 clean = j->space[journal_space_clean].total; 240 total = j->space[journal_space_total].total; 241 242 if (!j->space[journal_space_discarded].next_entry) 243 ret = bch_err_throw(c, journal_full); 244 245 if ((j->space[journal_space_clean_ondisk].next_entry < 246 j->space[journal_space_clean_ondisk].total) && 247 (clean - clean_ondisk <= total / 8) && 248 (clean_ondisk * 2 > clean)) 249 set_bit(JOURNAL_may_skip_flush, &j->flags); 250 else 251 clear_bit(JOURNAL_may_skip_flush, &j->flags); 252 253 bch2_journal_set_watermark(j); 254 out: 255 j->cur_entry_sectors = !ret 256 ? j->space[journal_space_discarded].next_entry 257 : 0; 258 j->cur_entry_error = ret; 259 260 if (!ret) 261 journal_wake(j); 262 } 263 264 /* Discards - last part of journal reclaim: */ 265 266 static bool __should_discard_bucket(struct journal *j, struct journal_device *ja) 267 { 268 unsigned min_free = max(4, ja->nr / 8); 269 270 return bch2_journal_dev_buckets_available(j, ja, journal_space_discarded) < 271 min_free && 272 ja->discard_idx != ja->dirty_idx_ondisk; 273 } 274 275 static bool should_discard_bucket(struct journal *j, struct journal_device *ja) 276 { 277 spin_lock(&j->lock); 278 bool ret = __should_discard_bucket(j, ja); 279 spin_unlock(&j->lock); 280 281 return ret; 282 } 283 284 /* 285 * Advance ja->discard_idx as long as it points to buckets that are no longer 286 * dirty, issuing discards if necessary: 287 */ 288 void bch2_journal_do_discards(struct journal *j) 289 { 290 struct bch_fs *c = container_of(j, struct bch_fs, journal); 291 292 mutex_lock(&j->discard_lock); 293 294 for_each_rw_member(c, ca, BCH_DEV_WRITE_REF_journal_do_discards) { 295 struct journal_device *ja = &ca->journal; 296 297 while (should_discard_bucket(j, ja)) { 298 if (!c->opts.nochanges && 299 bch2_discard_opt_enabled(c, ca) && 300 bdev_max_discard_sectors(ca->disk_sb.bdev)) 301 blkdev_issue_discard(ca->disk_sb.bdev, 302 bucket_to_sector(ca, 303 ja->buckets[ja->discard_idx]), 304 ca->mi.bucket_size, GFP_NOFS); 305 306 spin_lock(&j->lock); 307 ja->discard_idx = (ja->discard_idx + 1) % ja->nr; 308 309 bch2_journal_space_available(j); 310 spin_unlock(&j->lock); 311 } 312 } 313 314 mutex_unlock(&j->discard_lock); 315 } 316 317 /* 318 * Journal entry pinning - machinery for holding a reference on a given journal 319 * entry, holding it open to ensure it gets replayed during recovery: 320 */ 321 322 void bch2_journal_reclaim_fast(struct journal *j) 323 { 324 bool popped = false; 325 326 lockdep_assert_held(&j->lock); 327 328 /* 329 * Unpin journal entries whose reference counts reached zero, meaning 330 * all btree nodes got written out 331 */ 332 while (!fifo_empty(&j->pin) && 333 j->pin.front <= j->seq_ondisk && 334 !atomic_read(&fifo_peek_front(&j->pin).count)) { 335 j->pin.front++; 336 popped = true; 337 } 338 339 if (popped) { 340 bch2_journal_space_available(j); 341 __closure_wake_up(&j->reclaim_flush_wait); 342 } 343 } 344 345 bool __bch2_journal_pin_put(struct journal *j, u64 seq) 346 { 347 struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq); 348 349 return atomic_dec_and_test(&pin_list->count); 350 } 351 352 void bch2_journal_pin_put(struct journal *j, u64 seq) 353 { 354 if (__bch2_journal_pin_put(j, seq)) { 355 spin_lock(&j->lock); 356 bch2_journal_reclaim_fast(j); 357 spin_unlock(&j->lock); 358 } 359 } 360 361 static inline bool __journal_pin_drop(struct journal *j, 362 struct journal_entry_pin *pin) 363 { 364 struct journal_entry_pin_list *pin_list; 365 366 if (!journal_pin_active(pin)) 367 return false; 368 369 if (j->flush_in_progress == pin) 370 j->flush_in_progress_dropped = true; 371 372 pin_list = journal_seq_pin(j, pin->seq); 373 pin->seq = 0; 374 list_del_init(&pin->list); 375 376 if (j->reclaim_flush_wait.list.first) 377 __closure_wake_up(&j->reclaim_flush_wait); 378 379 /* 380 * Unpinning a journal entry may make journal_next_bucket() succeed, if 381 * writing a new last_seq will now make another bucket available: 382 */ 383 return atomic_dec_and_test(&pin_list->count) && 384 pin_list == &fifo_peek_front(&j->pin); 385 } 386 387 void bch2_journal_pin_drop(struct journal *j, 388 struct journal_entry_pin *pin) 389 { 390 spin_lock(&j->lock); 391 if (__journal_pin_drop(j, pin)) 392 bch2_journal_reclaim_fast(j); 393 spin_unlock(&j->lock); 394 } 395 396 static enum journal_pin_type journal_pin_type(struct journal_entry_pin *pin, 397 journal_pin_flush_fn fn) 398 { 399 if (fn == bch2_btree_node_flush0 || 400 fn == bch2_btree_node_flush1) { 401 unsigned idx = fn == bch2_btree_node_flush1; 402 struct btree *b = container_of(pin, struct btree, writes[idx].journal); 403 404 return JOURNAL_PIN_TYPE_btree0 - b->c.level; 405 } else if (fn == bch2_btree_key_cache_journal_flush) 406 return JOURNAL_PIN_TYPE_key_cache; 407 else 408 return JOURNAL_PIN_TYPE_other; 409 } 410 411 static inline void bch2_journal_pin_set_locked(struct journal *j, u64 seq, 412 struct journal_entry_pin *pin, 413 journal_pin_flush_fn flush_fn, 414 enum journal_pin_type type) 415 { 416 struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq); 417 418 /* 419 * flush_fn is how we identify journal pins in debugfs, so must always 420 * exist, even if it doesn't do anything: 421 */ 422 BUG_ON(!flush_fn); 423 424 atomic_inc(&pin_list->count); 425 pin->seq = seq; 426 pin->flush = flush_fn; 427 428 if (list_empty(&pin_list->unflushed[type]) && 429 j->reclaim_flush_wait.list.first) 430 __closure_wake_up(&j->reclaim_flush_wait); 431 432 list_add(&pin->list, &pin_list->unflushed[type]); 433 } 434 435 void bch2_journal_pin_copy(struct journal *j, 436 struct journal_entry_pin *dst, 437 struct journal_entry_pin *src, 438 journal_pin_flush_fn flush_fn) 439 { 440 spin_lock(&j->lock); 441 442 u64 seq = READ_ONCE(src->seq); 443 444 if (seq < journal_last_seq(j)) { 445 /* 446 * bch2_journal_pin_copy() raced with bch2_journal_pin_drop() on 447 * the src pin - with the pin dropped, the entry to pin might no 448 * longer to exist, but that means there's no longer anything to 449 * copy and we can bail out here: 450 */ 451 spin_unlock(&j->lock); 452 return; 453 } 454 455 bool reclaim = __journal_pin_drop(j, dst); 456 457 bch2_journal_pin_set_locked(j, seq, dst, flush_fn, journal_pin_type(dst, flush_fn)); 458 459 if (reclaim) 460 bch2_journal_reclaim_fast(j); 461 462 /* 463 * If the journal is currently full, we might want to call flush_fn 464 * immediately: 465 */ 466 if (seq == journal_last_seq(j)) 467 journal_wake(j); 468 spin_unlock(&j->lock); 469 } 470 471 void bch2_journal_pin_set(struct journal *j, u64 seq, 472 struct journal_entry_pin *pin, 473 journal_pin_flush_fn flush_fn) 474 { 475 spin_lock(&j->lock); 476 477 BUG_ON(seq < journal_last_seq(j)); 478 479 bool reclaim = __journal_pin_drop(j, pin); 480 481 bch2_journal_pin_set_locked(j, seq, pin, flush_fn, journal_pin_type(pin, flush_fn)); 482 483 if (reclaim) 484 bch2_journal_reclaim_fast(j); 485 /* 486 * If the journal is currently full, we might want to call flush_fn 487 * immediately: 488 */ 489 if (seq == journal_last_seq(j)) 490 journal_wake(j); 491 492 spin_unlock(&j->lock); 493 } 494 495 /** 496 * bch2_journal_pin_flush: ensure journal pin callback is no longer running 497 * @j: journal object 498 * @pin: pin to flush 499 */ 500 void bch2_journal_pin_flush(struct journal *j, struct journal_entry_pin *pin) 501 { 502 BUG_ON(journal_pin_active(pin)); 503 504 wait_event(j->pin_flush_wait, j->flush_in_progress != pin); 505 } 506 507 /* 508 * Journal reclaim: flush references to open journal entries to reclaim space in 509 * the journal 510 * 511 * May be done by the journal code in the background as needed to free up space 512 * for more journal entries, or as part of doing a clean shutdown, or to migrate 513 * data off of a specific device: 514 */ 515 516 static struct journal_entry_pin * 517 journal_get_next_pin(struct journal *j, 518 u64 seq_to_flush, 519 unsigned allowed_below_seq, 520 unsigned allowed_above_seq, 521 u64 *seq) 522 { 523 struct journal_entry_pin_list *pin_list; 524 struct journal_entry_pin *ret = NULL; 525 526 fifo_for_each_entry_ptr(pin_list, &j->pin, *seq) { 527 if (*seq > seq_to_flush && !allowed_above_seq) 528 break; 529 530 for (unsigned i = 0; i < JOURNAL_PIN_TYPE_NR; i++) 531 if (((BIT(i) & allowed_below_seq) && *seq <= seq_to_flush) || 532 (BIT(i) & allowed_above_seq)) { 533 ret = list_first_entry_or_null(&pin_list->unflushed[i], 534 struct journal_entry_pin, list); 535 if (ret) 536 return ret; 537 } 538 } 539 540 return NULL; 541 } 542 543 /* returns true if we did work */ 544 static size_t journal_flush_pins(struct journal *j, 545 u64 seq_to_flush, 546 unsigned allowed_below_seq, 547 unsigned allowed_above_seq, 548 unsigned min_any, 549 unsigned min_key_cache) 550 { 551 struct journal_entry_pin *pin; 552 size_t nr_flushed = 0; 553 journal_pin_flush_fn flush_fn; 554 u64 seq; 555 int err; 556 557 lockdep_assert_held(&j->reclaim_lock); 558 559 while (1) { 560 unsigned allowed_above = allowed_above_seq; 561 unsigned allowed_below = allowed_below_seq; 562 563 if (min_any) { 564 allowed_above |= ~0; 565 allowed_below |= ~0; 566 } 567 568 if (min_key_cache) { 569 allowed_above |= BIT(JOURNAL_PIN_TYPE_key_cache); 570 allowed_below |= BIT(JOURNAL_PIN_TYPE_key_cache); 571 } 572 573 cond_resched(); 574 575 j->last_flushed = jiffies; 576 577 spin_lock(&j->lock); 578 pin = journal_get_next_pin(j, seq_to_flush, 579 allowed_below, 580 allowed_above, &seq); 581 if (pin) { 582 BUG_ON(j->flush_in_progress); 583 j->flush_in_progress = pin; 584 j->flush_in_progress_dropped = false; 585 flush_fn = pin->flush; 586 } 587 spin_unlock(&j->lock); 588 589 if (!pin) 590 break; 591 592 if (min_key_cache && pin->flush == bch2_btree_key_cache_journal_flush) 593 min_key_cache--; 594 595 if (min_any) 596 min_any--; 597 598 err = flush_fn(j, pin, seq); 599 600 spin_lock(&j->lock); 601 /* Pin might have been dropped or rearmed: */ 602 if (likely(!err && !j->flush_in_progress_dropped)) 603 list_move(&pin->list, &journal_seq_pin(j, seq)->flushed[journal_pin_type(pin, flush_fn)]); 604 j->flush_in_progress = NULL; 605 j->flush_in_progress_dropped = false; 606 spin_unlock(&j->lock); 607 608 wake_up(&j->pin_flush_wait); 609 610 if (err) 611 break; 612 613 nr_flushed++; 614 } 615 616 return nr_flushed; 617 } 618 619 static u64 journal_seq_to_flush(struct journal *j) 620 { 621 struct bch_fs *c = container_of(j, struct bch_fs, journal); 622 u64 seq_to_flush = 0; 623 624 guard(spinlock)(&j->lock); 625 guard(rcu)(); 626 627 for_each_rw_member_rcu(c, ca) { 628 struct journal_device *ja = &ca->journal; 629 unsigned nr_buckets, bucket_to_flush; 630 631 if (!ja->nr) 632 continue; 633 634 /* Try to keep the journal at most half full: */ 635 nr_buckets = ja->nr / 2; 636 637 bucket_to_flush = (ja->cur_idx + nr_buckets) % ja->nr; 638 seq_to_flush = max(seq_to_flush, 639 ja->bucket_seq[bucket_to_flush]); 640 } 641 642 /* Also flush if the pin fifo is more than half full */ 643 return max_t(s64, seq_to_flush, 644 (s64) journal_cur_seq(j) - 645 (j->pin.size >> 1)); 646 } 647 648 /** 649 * __bch2_journal_reclaim - free up journal buckets 650 * @j: journal object 651 * @direct: direct or background reclaim? 652 * @kicked: requested to run since we last ran? 653 * 654 * Background journal reclaim writes out btree nodes. It should be run 655 * early enough so that we never completely run out of journal buckets. 656 * 657 * High watermarks for triggering background reclaim: 658 * - FIFO has fewer than 512 entries left 659 * - fewer than 25% journal buckets free 660 * 661 * Background reclaim runs until low watermarks are reached: 662 * - FIFO has more than 1024 entries left 663 * - more than 50% journal buckets free 664 * 665 * As long as a reclaim can complete in the time it takes to fill up 666 * 512 journal entries or 25% of all journal buckets, then 667 * journal_next_bucket() should not stall. 668 */ 669 static int __bch2_journal_reclaim(struct journal *j, bool direct, bool kicked) 670 { 671 struct bch_fs *c = container_of(j, struct bch_fs, journal); 672 struct btree_cache *bc = &c->btree_cache; 673 bool kthread = (current->flags & PF_KTHREAD) != 0; 674 u64 seq_to_flush; 675 size_t min_nr, min_key_cache, nr_flushed; 676 unsigned flags; 677 int ret = 0; 678 679 /* 680 * We can't invoke memory reclaim while holding the reclaim_lock - 681 * journal reclaim is required to make progress for memory reclaim 682 * (cleaning the caches), so we can't get stuck in memory reclaim while 683 * we're holding the reclaim lock: 684 */ 685 lockdep_assert_held(&j->reclaim_lock); 686 flags = memalloc_noreclaim_save(); 687 688 do { 689 if (kthread && kthread_should_stop()) 690 break; 691 692 ret = bch2_journal_error(j); 693 if (ret) 694 break; 695 696 /* XXX shove journal discards off to another thread */ 697 bch2_journal_do_discards(j); 698 699 seq_to_flush = journal_seq_to_flush(j); 700 min_nr = 0; 701 702 /* 703 * If it's been longer than j->reclaim_delay_ms since we last flushed, 704 * make sure to flush at least one journal pin: 705 */ 706 if (time_after(jiffies, j->last_flushed + 707 msecs_to_jiffies(c->opts.journal_reclaim_delay))) 708 min_nr = 1; 709 710 if (j->watermark != BCH_WATERMARK_stripe) 711 min_nr = 1; 712 713 size_t btree_cache_live = bc->live[0].nr + bc->live[1].nr; 714 if (atomic_long_read(&bc->nr_dirty) * 2 > btree_cache_live) 715 min_nr = 1; 716 717 min_key_cache = min(bch2_nr_btree_keys_need_flush(c), (size_t) 128); 718 719 trace_and_count(c, journal_reclaim_start, c, 720 direct, kicked, 721 min_nr, min_key_cache, 722 atomic_long_read(&bc->nr_dirty), btree_cache_live, 723 atomic_long_read(&c->btree_key_cache.nr_dirty), 724 atomic_long_read(&c->btree_key_cache.nr_keys)); 725 726 nr_flushed = journal_flush_pins(j, seq_to_flush, 727 ~0, 0, 728 min_nr, min_key_cache); 729 730 if (direct) 731 j->nr_direct_reclaim += nr_flushed; 732 else 733 j->nr_background_reclaim += nr_flushed; 734 trace_and_count(c, journal_reclaim_finish, c, nr_flushed); 735 736 if (nr_flushed) 737 wake_up(&j->reclaim_wait); 738 } while ((min_nr || min_key_cache) && nr_flushed && !direct); 739 740 memalloc_noreclaim_restore(flags); 741 742 return ret; 743 } 744 745 int bch2_journal_reclaim(struct journal *j) 746 { 747 return __bch2_journal_reclaim(j, true, true); 748 } 749 750 static int bch2_journal_reclaim_thread(void *arg) 751 { 752 struct journal *j = arg; 753 struct bch_fs *c = container_of(j, struct bch_fs, journal); 754 unsigned long delay, now; 755 bool journal_empty; 756 int ret = 0; 757 758 set_freezable(); 759 760 j->last_flushed = jiffies; 761 762 while (!ret && !kthread_should_stop()) { 763 bool kicked = j->reclaim_kicked; 764 765 j->reclaim_kicked = false; 766 767 mutex_lock(&j->reclaim_lock); 768 ret = __bch2_journal_reclaim(j, false, kicked); 769 mutex_unlock(&j->reclaim_lock); 770 771 now = jiffies; 772 delay = msecs_to_jiffies(c->opts.journal_reclaim_delay); 773 j->next_reclaim = j->last_flushed + delay; 774 775 if (!time_in_range(j->next_reclaim, now, now + delay)) 776 j->next_reclaim = now + delay; 777 778 while (1) { 779 set_current_state(TASK_INTERRUPTIBLE|TASK_FREEZABLE); 780 if (kthread_should_stop()) 781 break; 782 if (j->reclaim_kicked) 783 break; 784 785 spin_lock(&j->lock); 786 journal_empty = fifo_empty(&j->pin); 787 spin_unlock(&j->lock); 788 789 long timeout = j->next_reclaim - jiffies; 790 791 if (journal_empty) 792 schedule(); 793 else if (timeout > 0) 794 schedule_timeout(timeout); 795 else 796 break; 797 } 798 __set_current_state(TASK_RUNNING); 799 } 800 801 return 0; 802 } 803 804 void bch2_journal_reclaim_stop(struct journal *j) 805 { 806 struct task_struct *p = j->reclaim_thread; 807 808 j->reclaim_thread = NULL; 809 810 if (p) { 811 kthread_stop(p); 812 put_task_struct(p); 813 } 814 } 815 816 int bch2_journal_reclaim_start(struct journal *j) 817 { 818 struct bch_fs *c = container_of(j, struct bch_fs, journal); 819 struct task_struct *p; 820 int ret; 821 822 if (j->reclaim_thread) 823 return 0; 824 825 p = kthread_create(bch2_journal_reclaim_thread, j, 826 "bch-reclaim/%s", c->name); 827 ret = PTR_ERR_OR_ZERO(p); 828 bch_err_msg(c, ret, "creating journal reclaim thread"); 829 if (ret) 830 return ret; 831 832 get_task_struct(p); 833 j->reclaim_thread = p; 834 wake_up_process(p); 835 return 0; 836 } 837 838 static bool journal_pins_still_flushing(struct journal *j, u64 seq_to_flush, 839 unsigned types) 840 { 841 struct journal_entry_pin_list *pin_list; 842 u64 seq; 843 844 spin_lock(&j->lock); 845 fifo_for_each_entry_ptr(pin_list, &j->pin, seq) { 846 if (seq > seq_to_flush) 847 break; 848 849 for (unsigned i = 0; i < JOURNAL_PIN_TYPE_NR; i++) 850 if ((BIT(i) & types) && 851 (!list_empty(&pin_list->unflushed[i]) || 852 !list_empty(&pin_list->flushed[i]))) { 853 spin_unlock(&j->lock); 854 return true; 855 } 856 } 857 spin_unlock(&j->lock); 858 859 return false; 860 } 861 862 static bool journal_flush_pins_or_still_flushing(struct journal *j, u64 seq_to_flush, 863 unsigned types) 864 { 865 return journal_flush_pins(j, seq_to_flush, types, 0, 0, 0) || 866 journal_pins_still_flushing(j, seq_to_flush, types); 867 } 868 869 static int journal_flush_done(struct journal *j, u64 seq_to_flush, 870 bool *did_work) 871 { 872 int ret = 0; 873 874 ret = bch2_journal_error(j); 875 if (ret) 876 return ret; 877 878 mutex_lock(&j->reclaim_lock); 879 880 for (int type = JOURNAL_PIN_TYPE_NR - 1; 881 type >= 0; 882 --type) 883 if (journal_flush_pins_or_still_flushing(j, seq_to_flush, BIT(type))) { 884 *did_work = true; 885 goto unlock; 886 } 887 888 if (seq_to_flush > journal_cur_seq(j)) 889 bch2_journal_entry_close(j); 890 891 spin_lock(&j->lock); 892 /* 893 * If journal replay hasn't completed, the unreplayed journal entries 894 * hold refs on their corresponding sequence numbers 895 */ 896 ret = !test_bit(JOURNAL_replay_done, &j->flags) || 897 journal_last_seq(j) > seq_to_flush || 898 !fifo_used(&j->pin); 899 900 spin_unlock(&j->lock); 901 unlock: 902 mutex_unlock(&j->reclaim_lock); 903 904 return ret; 905 } 906 907 bool bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush) 908 { 909 /* time_stats this */ 910 bool did_work = false; 911 912 if (!test_bit(JOURNAL_running, &j->flags)) 913 return false; 914 915 closure_wait_event(&j->reclaim_flush_wait, 916 journal_flush_done(j, seq_to_flush, &did_work)); 917 918 return did_work; 919 } 920 921 int bch2_journal_flush_device_pins(struct journal *j, int dev_idx) 922 { 923 struct bch_fs *c = container_of(j, struct bch_fs, journal); 924 struct journal_entry_pin_list *p; 925 u64 iter, seq = 0; 926 int ret = 0; 927 928 spin_lock(&j->lock); 929 fifo_for_each_entry_ptr(p, &j->pin, iter) 930 if (dev_idx >= 0 931 ? bch2_dev_list_has_dev(p->devs, dev_idx) 932 : p->devs.nr < c->opts.metadata_replicas) 933 seq = iter; 934 spin_unlock(&j->lock); 935 936 bch2_journal_flush_pins(j, seq); 937 938 ret = bch2_journal_error(j); 939 if (ret) 940 return ret; 941 942 mutex_lock(&c->replicas_gc_lock); 943 bch2_replicas_gc_start(c, 1 << BCH_DATA_journal); 944 945 /* 946 * Now that we've populated replicas_gc, write to the journal to mark 947 * active journal devices. This handles the case where the journal might 948 * be empty. Otherwise we could clear all journal replicas and 949 * temporarily put the fs into an unrecoverable state. Journal recovery 950 * expects to find devices marked for journal data on unclean mount. 951 */ 952 ret = bch2_journal_meta(&c->journal); 953 if (ret) 954 goto err; 955 956 seq = 0; 957 spin_lock(&j->lock); 958 while (!ret) { 959 union bch_replicas_padded replicas; 960 961 seq = max(seq, journal_last_seq(j)); 962 if (seq >= j->pin.back) 963 break; 964 bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal, 965 journal_seq_pin(j, seq)->devs); 966 seq++; 967 968 if (replicas.e.nr_devs) { 969 spin_unlock(&j->lock); 970 ret = bch2_mark_replicas(c, &replicas.e); 971 spin_lock(&j->lock); 972 } 973 } 974 spin_unlock(&j->lock); 975 err: 976 ret = bch2_replicas_gc_end(c, ret); 977 mutex_unlock(&c->replicas_gc_lock); 978 979 return ret; 980 } 981 982 bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 *seq) 983 { 984 struct journal_entry_pin_list *pin_list; 985 struct journal_entry_pin *pin; 986 987 spin_lock(&j->lock); 988 if (!test_bit(JOURNAL_running, &j->flags)) { 989 spin_unlock(&j->lock); 990 return true; 991 } 992 993 *seq = max(*seq, j->pin.front); 994 995 if (*seq >= j->pin.back) { 996 spin_unlock(&j->lock); 997 return true; 998 } 999 1000 out->atomic++; 1001 1002 pin_list = journal_seq_pin(j, *seq); 1003 1004 prt_printf(out, "%llu: count %u\n", *seq, atomic_read(&pin_list->count)); 1005 printbuf_indent_add(out, 2); 1006 1007 prt_printf(out, "unflushed:\n"); 1008 for (unsigned i = 0; i < ARRAY_SIZE(pin_list->unflushed); i++) 1009 list_for_each_entry(pin, &pin_list->unflushed[i], list) 1010 prt_printf(out, "\t%px %ps\n", pin, pin->flush); 1011 1012 prt_printf(out, "flushed:\n"); 1013 for (unsigned i = 0; i < ARRAY_SIZE(pin_list->flushed); i++) 1014 list_for_each_entry(pin, &pin_list->flushed[i], list) 1015 prt_printf(out, "\t%px %ps\n", pin, pin->flush); 1016 1017 printbuf_indent_sub(out, 2); 1018 1019 --out->atomic; 1020 spin_unlock(&j->lock); 1021 1022 return false; 1023 } 1024 1025 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j) 1026 { 1027 u64 seq = 0; 1028 1029 while (!bch2_journal_seq_pins_to_text(out, j, &seq)) 1030 seq++; 1031 } 1032