1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * bcachefs journalling code, for btree insertions 4 * 5 * Copyright 2012 Google, Inc. 6 */ 7 8 #include "bcachefs.h" 9 #include "alloc_foreground.h" 10 #include "bkey_methods.h" 11 #include "btree_gc.h" 12 #include "btree_update.h" 13 #include "btree_write_buffer.h" 14 #include "buckets.h" 15 #include "enumerated_ref.h" 16 #include "error.h" 17 #include "journal.h" 18 #include "journal_io.h" 19 #include "journal_reclaim.h" 20 #include "journal_sb.h" 21 #include "journal_seq_blacklist.h" 22 #include "trace.h" 23 24 static inline bool journal_seq_unwritten(struct journal *j, u64 seq) 25 { 26 return seq > j->seq_ondisk; 27 } 28 29 static bool __journal_entry_is_open(union journal_res_state state) 30 { 31 return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; 32 } 33 34 static inline unsigned nr_unwritten_journal_entries(struct journal *j) 35 { 36 return atomic64_read(&j->seq) - j->seq_ondisk; 37 } 38 39 static bool journal_entry_is_open(struct journal *j) 40 { 41 return __journal_entry_is_open(j->reservations); 42 } 43 44 static void bch2_journal_buf_to_text(struct printbuf *out, struct journal *j, u64 seq) 45 { 46 union journal_res_state s = READ_ONCE(j->reservations); 47 unsigned i = seq & JOURNAL_BUF_MASK; 48 struct journal_buf *buf = j->buf + i; 49 50 prt_printf(out, "seq:\t%llu\n", seq); 51 printbuf_indent_add(out, 2); 52 53 if (!buf->write_started) 54 prt_printf(out, "refcount:\t%u\n", journal_state_count(s, i & JOURNAL_STATE_BUF_MASK)); 55 56 struct closure *cl = &buf->io; 57 int r = atomic_read(&cl->remaining); 58 prt_printf(out, "io:\t%pS r %i\n", cl->fn, r & CLOSURE_REMAINING_MASK); 59 60 if (buf->data) { 61 prt_printf(out, "size:\t"); 62 prt_human_readable_u64(out, vstruct_bytes(buf->data)); 63 prt_newline(out); 64 } 65 66 prt_printf(out, "expires:\t%li jiffies\n", buf->expires - jiffies); 67 68 prt_printf(out, "flags:\t"); 69 if (buf->noflush) 70 prt_str(out, "noflush "); 71 if (buf->must_flush) 72 prt_str(out, "must_flush "); 73 if (buf->separate_flush) 74 prt_str(out, "separate_flush "); 75 if (buf->need_flush_to_write_buffer) 76 prt_str(out, "need_flush_to_write_buffer "); 77 if (buf->write_started) 78 prt_str(out, "write_started "); 79 if (buf->write_allocated) 80 prt_str(out, "write_allocated "); 81 if (buf->write_done) 82 prt_str(out, "write_done"); 83 prt_newline(out); 84 85 printbuf_indent_sub(out, 2); 86 } 87 88 static void bch2_journal_bufs_to_text(struct printbuf *out, struct journal *j) 89 { 90 lockdep_assert_held(&j->lock); 91 out->atomic++; 92 93 if (!out->nr_tabstops) 94 printbuf_tabstop_push(out, 24); 95 96 for (u64 seq = journal_last_unwritten_seq(j); 97 seq <= journal_cur_seq(j); 98 seq++) 99 bch2_journal_buf_to_text(out, j, seq); 100 prt_printf(out, "last buf %s\n", journal_entry_is_open(j) ? "open" : "closed"); 101 102 --out->atomic; 103 } 104 105 static inline struct journal_buf * 106 journal_seq_to_buf(struct journal *j, u64 seq) 107 { 108 struct journal_buf *buf = NULL; 109 110 EBUG_ON(seq > journal_cur_seq(j)); 111 112 if (journal_seq_unwritten(j, seq)) 113 buf = j->buf + (seq & JOURNAL_BUF_MASK); 114 return buf; 115 } 116 117 static void journal_pin_list_init(struct journal_entry_pin_list *p, int count) 118 { 119 for (unsigned i = 0; i < ARRAY_SIZE(p->unflushed); i++) 120 INIT_LIST_HEAD(&p->unflushed[i]); 121 for (unsigned i = 0; i < ARRAY_SIZE(p->flushed); i++) 122 INIT_LIST_HEAD(&p->flushed[i]); 123 atomic_set(&p->count, count); 124 p->devs.nr = 0; 125 } 126 127 /* 128 * Detect stuck journal conditions and trigger shutdown. Technically the journal 129 * can end up stuck for a variety of reasons, such as a blocked I/O, journal 130 * reservation lockup, etc. Since this is a fatal error with potentially 131 * unpredictable characteristics, we want to be fairly conservative before we 132 * decide to shut things down. 133 * 134 * Consider the journal stuck when it appears full with no ability to commit 135 * btree transactions, to discard journal buckets, nor acquire priority 136 * (reserved watermark) reservation. 137 */ 138 static inline bool 139 journal_error_check_stuck(struct journal *j, int error, unsigned flags) 140 { 141 struct bch_fs *c = container_of(j, struct bch_fs, journal); 142 bool stuck = false; 143 struct printbuf buf = PRINTBUF; 144 145 buf.atomic++; 146 147 if (!(error == -BCH_ERR_journal_full || 148 error == -BCH_ERR_journal_pin_full) || 149 nr_unwritten_journal_entries(j) || 150 (flags & BCH_WATERMARK_MASK) != BCH_WATERMARK_reclaim) 151 return stuck; 152 153 spin_lock(&j->lock); 154 155 if (j->can_discard) { 156 spin_unlock(&j->lock); 157 return stuck; 158 } 159 160 stuck = true; 161 162 /* 163 * The journal shutdown path will set ->err_seq, but do it here first to 164 * serialize against concurrent failures and avoid duplicate error 165 * reports. 166 */ 167 if (j->err_seq) { 168 spin_unlock(&j->lock); 169 return stuck; 170 } 171 j->err_seq = journal_cur_seq(j); 172 173 __bch2_journal_debug_to_text(&buf, j); 174 spin_unlock(&j->lock); 175 prt_printf(&buf, bch2_fmt(c, "Journal stuck! Hava a pre-reservation but journal full (error %s)"), 176 bch2_err_str(error)); 177 bch2_print_str(c, KERN_ERR, buf.buf); 178 179 printbuf_reset(&buf); 180 bch2_journal_pins_to_text(&buf, j); 181 bch_err(c, "Journal pins:\n%s", buf.buf); 182 printbuf_exit(&buf); 183 184 bch2_fatal_error(c); 185 dump_stack(); 186 187 return stuck; 188 } 189 190 void bch2_journal_do_writes(struct journal *j) 191 { 192 for (u64 seq = journal_last_unwritten_seq(j); 193 seq <= journal_cur_seq(j); 194 seq++) { 195 unsigned idx = seq & JOURNAL_BUF_MASK; 196 struct journal_buf *w = j->buf + idx; 197 198 if (w->write_started && !w->write_allocated) 199 break; 200 if (w->write_started) 201 continue; 202 203 if (!journal_state_seq_count(j, j->reservations, seq)) { 204 j->seq_write_started = seq; 205 w->write_started = true; 206 closure_call(&w->io, bch2_journal_write, j->wq, NULL); 207 } 208 209 break; 210 } 211 } 212 213 /* 214 * Final processing when the last reference of a journal buffer has been 215 * dropped. Drop the pin list reference acquired at journal entry open and write 216 * the buffer, if requested. 217 */ 218 void bch2_journal_buf_put_final(struct journal *j, u64 seq) 219 { 220 lockdep_assert_held(&j->lock); 221 222 if (__bch2_journal_pin_put(j, seq)) 223 bch2_journal_reclaim_fast(j); 224 bch2_journal_do_writes(j); 225 226 /* 227 * for __bch2_next_write_buffer_flush_journal_buf(), when quiescing an 228 * open journal entry 229 */ 230 wake_up(&j->wait); 231 } 232 233 /* 234 * Returns true if journal entry is now closed: 235 * 236 * We don't close a journal_buf until the next journal_buf is finished writing, 237 * and can be opened again - this also initializes the next journal_buf: 238 */ 239 static void __journal_entry_close(struct journal *j, unsigned closed_val, bool trace) 240 { 241 struct bch_fs *c = container_of(j, struct bch_fs, journal); 242 struct journal_buf *buf = journal_cur_buf(j); 243 union journal_res_state old, new; 244 unsigned sectors; 245 246 BUG_ON(closed_val != JOURNAL_ENTRY_CLOSED_VAL && 247 closed_val != JOURNAL_ENTRY_ERROR_VAL); 248 249 lockdep_assert_held(&j->lock); 250 251 old.v = atomic64_read(&j->reservations.counter); 252 do { 253 new.v = old.v; 254 new.cur_entry_offset = closed_val; 255 256 if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL || 257 old.cur_entry_offset == new.cur_entry_offset) 258 return; 259 } while (!atomic64_try_cmpxchg(&j->reservations.counter, 260 &old.v, new.v)); 261 262 if (!__journal_entry_is_open(old)) 263 return; 264 265 if (old.cur_entry_offset == JOURNAL_ENTRY_BLOCKED_VAL) 266 old.cur_entry_offset = j->cur_entry_offset_if_blocked; 267 268 /* Close out old buffer: */ 269 buf->data->u64s = cpu_to_le32(old.cur_entry_offset); 270 271 if (trace_journal_entry_close_enabled() && trace) { 272 struct printbuf pbuf = PRINTBUF; 273 pbuf.atomic++; 274 275 prt_str(&pbuf, "entry size: "); 276 prt_human_readable_u64(&pbuf, vstruct_bytes(buf->data)); 277 prt_newline(&pbuf); 278 bch2_prt_task_backtrace(&pbuf, current, 1, GFP_NOWAIT); 279 trace_journal_entry_close(c, pbuf.buf); 280 printbuf_exit(&pbuf); 281 } 282 283 sectors = vstruct_blocks_plus(buf->data, c->block_bits, 284 buf->u64s_reserved) << c->block_bits; 285 if (unlikely(sectors > buf->sectors)) { 286 struct printbuf err = PRINTBUF; 287 err.atomic++; 288 289 prt_printf(&err, "journal entry overran reserved space: %u > %u\n", 290 sectors, buf->sectors); 291 prt_printf(&err, "buf u64s %u u64s reserved %u cur_entry_u64s %u block_bits %u\n", 292 le32_to_cpu(buf->data->u64s), buf->u64s_reserved, 293 j->cur_entry_u64s, 294 c->block_bits); 295 prt_printf(&err, "fatal error - emergency read only"); 296 bch2_journal_halt_locked(j); 297 298 bch_err(c, "%s", err.buf); 299 printbuf_exit(&err); 300 return; 301 } 302 303 buf->sectors = sectors; 304 305 /* 306 * We have to set last_seq here, _before_ opening a new journal entry: 307 * 308 * A threads may replace an old pin with a new pin on their current 309 * journal reservation - the expectation being that the journal will 310 * contain either what the old pin protected or what the new pin 311 * protects. 312 * 313 * After the old pin is dropped journal_last_seq() won't include the old 314 * pin, so we can only write the updated last_seq on the entry that 315 * contains whatever the new pin protects. 316 * 317 * Restated, we can _not_ update last_seq for a given entry if there 318 * could be a newer entry open with reservations/pins that have been 319 * taken against it. 320 * 321 * Hence, we want update/set last_seq on the current journal entry right 322 * before we open a new one: 323 */ 324 buf->last_seq = journal_last_seq(j); 325 buf->data->last_seq = cpu_to_le64(buf->last_seq); 326 BUG_ON(buf->last_seq > le64_to_cpu(buf->data->seq)); 327 328 cancel_delayed_work(&j->write_work); 329 330 bch2_journal_space_available(j); 331 332 __bch2_journal_buf_put(j, le64_to_cpu(buf->data->seq)); 333 } 334 335 void bch2_journal_halt_locked(struct journal *j) 336 { 337 lockdep_assert_held(&j->lock); 338 339 __journal_entry_close(j, JOURNAL_ENTRY_ERROR_VAL, true); 340 if (!j->err_seq) 341 j->err_seq = journal_cur_seq(j); 342 journal_wake(j); 343 } 344 345 void bch2_journal_halt(struct journal *j) 346 { 347 spin_lock(&j->lock); 348 bch2_journal_halt_locked(j); 349 spin_unlock(&j->lock); 350 } 351 352 static bool journal_entry_want_write(struct journal *j) 353 { 354 bool ret = !journal_entry_is_open(j) || 355 journal_cur_seq(j) == journal_last_unwritten_seq(j); 356 357 /* Don't close it yet if we already have a write in flight: */ 358 if (ret) 359 __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); 360 else if (nr_unwritten_journal_entries(j)) { 361 struct journal_buf *buf = journal_cur_buf(j); 362 363 if (!buf->flush_time) { 364 buf->flush_time = local_clock() ?: 1; 365 buf->expires = jiffies; 366 } 367 } 368 369 return ret; 370 } 371 372 bool bch2_journal_entry_close(struct journal *j) 373 { 374 bool ret; 375 376 spin_lock(&j->lock); 377 ret = journal_entry_want_write(j); 378 spin_unlock(&j->lock); 379 380 return ret; 381 } 382 383 /* 384 * should _only_ called from journal_res_get() - when we actually want a 385 * journal reservation - journal entry is open means journal is dirty: 386 */ 387 static int journal_entry_open(struct journal *j) 388 { 389 struct bch_fs *c = container_of(j, struct bch_fs, journal); 390 struct journal_buf *buf = j->buf + 391 ((journal_cur_seq(j) + 1) & JOURNAL_BUF_MASK); 392 union journal_res_state old, new; 393 int u64s; 394 395 lockdep_assert_held(&j->lock); 396 BUG_ON(journal_entry_is_open(j)); 397 BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb)); 398 399 if (j->blocked) 400 return bch_err_throw(c, journal_blocked); 401 402 if (j->cur_entry_error) 403 return j->cur_entry_error; 404 405 int ret = bch2_journal_error(j); 406 if (unlikely(ret)) 407 return ret; 408 409 if (!fifo_free(&j->pin)) 410 return bch_err_throw(c, journal_pin_full); 411 412 if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf)) 413 return bch_err_throw(c, journal_max_in_flight); 414 415 if (atomic64_read(&j->seq) - j->seq_write_started == JOURNAL_STATE_BUF_NR) 416 return bch_err_throw(c, journal_max_open); 417 418 if (unlikely(journal_cur_seq(j) >= JOURNAL_SEQ_MAX)) { 419 bch_err(c, "cannot start: journal seq overflow"); 420 if (bch2_fs_emergency_read_only_locked(c)) 421 bch_err(c, "fatal error - emergency read only"); 422 return bch_err_throw(c, journal_shutdown); 423 } 424 425 if (!j->free_buf && !buf->data) 426 return bch_err_throw(c, journal_buf_enomem); /* will retry after write completion frees up a buf */ 427 428 BUG_ON(!j->cur_entry_sectors); 429 430 if (!buf->data) { 431 swap(buf->data, j->free_buf); 432 swap(buf->buf_size, j->free_buf_size); 433 } 434 435 buf->expires = 436 (journal_cur_seq(j) == j->flushed_seq_ondisk 437 ? jiffies 438 : j->last_flush_write) + 439 msecs_to_jiffies(c->opts.journal_flush_delay); 440 441 buf->u64s_reserved = j->entry_u64s_reserved; 442 buf->disk_sectors = j->cur_entry_sectors; 443 buf->sectors = min(buf->disk_sectors, buf->buf_size >> 9); 444 445 u64s = (int) (buf->sectors << 9) / sizeof(u64) - 446 journal_entry_overhead(j); 447 u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1); 448 449 if (u64s <= (ssize_t) j->early_journal_entries.nr) 450 return bch_err_throw(c, journal_full); 451 452 if (fifo_empty(&j->pin) && j->reclaim_thread) 453 wake_up_process(j->reclaim_thread); 454 455 /* 456 * The fifo_push() needs to happen at the same time as j->seq is 457 * incremented for journal_last_seq() to be calculated correctly 458 */ 459 atomic64_inc(&j->seq); 460 journal_pin_list_init(fifo_push_ref(&j->pin), 1); 461 462 if (unlikely(bch2_journal_seq_is_blacklisted(c, journal_cur_seq(j), false))) { 463 bch_err(c, "attempting to open blacklisted journal seq %llu", 464 journal_cur_seq(j)); 465 if (bch2_fs_emergency_read_only_locked(c)) 466 bch_err(c, "fatal error - emergency read only"); 467 return bch_err_throw(c, journal_shutdown); 468 } 469 470 BUG_ON(j->pin.back - 1 != atomic64_read(&j->seq)); 471 472 BUG_ON(j->buf + (journal_cur_seq(j) & JOURNAL_BUF_MASK) != buf); 473 474 bkey_extent_init(&buf->key); 475 buf->noflush = false; 476 buf->must_flush = false; 477 buf->separate_flush = false; 478 buf->flush_time = 0; 479 buf->need_flush_to_write_buffer = true; 480 buf->write_started = false; 481 buf->write_allocated = false; 482 buf->write_done = false; 483 484 memset(buf->data, 0, sizeof(*buf->data)); 485 buf->data->seq = cpu_to_le64(journal_cur_seq(j)); 486 buf->data->u64s = 0; 487 488 if (j->early_journal_entries.nr) { 489 memcpy(buf->data->_data, j->early_journal_entries.data, 490 j->early_journal_entries.nr * sizeof(u64)); 491 le32_add_cpu(&buf->data->u64s, j->early_journal_entries.nr); 492 } 493 494 /* 495 * Must be set before marking the journal entry as open: 496 */ 497 j->cur_entry_u64s = u64s; 498 499 old.v = atomic64_read(&j->reservations.counter); 500 do { 501 new.v = old.v; 502 503 BUG_ON(old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL); 504 505 new.idx++; 506 BUG_ON(journal_state_count(new, new.idx)); 507 BUG_ON(new.idx != (journal_cur_seq(j) & JOURNAL_STATE_BUF_MASK)); 508 509 journal_state_inc(&new); 510 511 /* Handle any already added entries */ 512 new.cur_entry_offset = le32_to_cpu(buf->data->u64s); 513 } while (!atomic64_try_cmpxchg(&j->reservations.counter, 514 &old.v, new.v)); 515 516 if (nr_unwritten_journal_entries(j) == 1) 517 mod_delayed_work(j->wq, 518 &j->write_work, 519 msecs_to_jiffies(c->opts.journal_flush_delay)); 520 journal_wake(j); 521 522 if (j->early_journal_entries.nr) 523 darray_exit(&j->early_journal_entries); 524 return 0; 525 } 526 527 static bool journal_quiesced(struct journal *j) 528 { 529 bool ret = atomic64_read(&j->seq) == j->seq_ondisk; 530 531 if (!ret) 532 bch2_journal_entry_close(j); 533 return ret; 534 } 535 536 static void journal_quiesce(struct journal *j) 537 { 538 wait_event(j->wait, journal_quiesced(j)); 539 } 540 541 static void journal_write_work(struct work_struct *work) 542 { 543 struct journal *j = container_of(work, struct journal, write_work.work); 544 545 spin_lock(&j->lock); 546 if (__journal_entry_is_open(j->reservations)) { 547 long delta = journal_cur_buf(j)->expires - jiffies; 548 549 if (delta > 0) 550 mod_delayed_work(j->wq, &j->write_work, delta); 551 else 552 __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); 553 } 554 spin_unlock(&j->lock); 555 } 556 557 static void journal_buf_prealloc(struct journal *j) 558 { 559 if (j->free_buf && 560 j->free_buf_size >= j->buf_size_want) 561 return; 562 563 unsigned buf_size = j->buf_size_want; 564 565 spin_unlock(&j->lock); 566 void *buf = kvmalloc(buf_size, GFP_NOFS); 567 spin_lock(&j->lock); 568 569 if (buf && 570 (!j->free_buf || 571 buf_size > j->free_buf_size)) { 572 swap(buf, j->free_buf); 573 swap(buf_size, j->free_buf_size); 574 } 575 576 if (unlikely(buf)) { 577 spin_unlock(&j->lock); 578 /* kvfree can sleep */ 579 kvfree(buf); 580 spin_lock(&j->lock); 581 } 582 } 583 584 static int __journal_res_get(struct journal *j, struct journal_res *res, 585 unsigned flags) 586 { 587 struct bch_fs *c = container_of(j, struct bch_fs, journal); 588 struct journal_buf *buf; 589 bool can_discard; 590 int ret; 591 retry: 592 if (journal_res_get_fast(j, res, flags)) 593 return 0; 594 595 ret = bch2_journal_error(j); 596 if (unlikely(ret)) 597 return ret; 598 599 if (j->blocked) 600 return bch_err_throw(c, journal_blocked); 601 602 if ((flags & BCH_WATERMARK_MASK) < j->watermark) { 603 ret = bch_err_throw(c, journal_full); 604 can_discard = j->can_discard; 605 goto out; 606 } 607 608 if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf) && !journal_entry_is_open(j)) { 609 ret = bch_err_throw(c, journal_max_in_flight); 610 goto out; 611 } 612 613 spin_lock(&j->lock); 614 615 journal_buf_prealloc(j); 616 617 /* 618 * Recheck after taking the lock, so we don't race with another thread 619 * that just did journal_entry_open() and call bch2_journal_entry_close() 620 * unnecessarily 621 */ 622 if (journal_res_get_fast(j, res, flags)) { 623 ret = 0; 624 goto unlock; 625 } 626 627 /* 628 * If we couldn't get a reservation because the current buf filled up, 629 * and we had room for a bigger entry on disk, signal that we want to 630 * realloc the journal bufs: 631 */ 632 buf = journal_cur_buf(j); 633 if (journal_entry_is_open(j) && 634 buf->buf_size >> 9 < buf->disk_sectors && 635 buf->buf_size < JOURNAL_ENTRY_SIZE_MAX) 636 j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1); 637 638 __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, false); 639 ret = journal_entry_open(j) ?: -BCH_ERR_journal_retry_open; 640 unlock: 641 can_discard = j->can_discard; 642 spin_unlock(&j->lock); 643 out: 644 if (likely(!ret)) 645 return 0; 646 if (ret == -BCH_ERR_journal_retry_open) 647 goto retry; 648 649 if (journal_error_check_stuck(j, ret, flags)) 650 ret = bch_err_throw(c, journal_stuck); 651 652 if (ret == -BCH_ERR_journal_max_in_flight && 653 track_event_change(&c->times[BCH_TIME_blocked_journal_max_in_flight], true) && 654 trace_journal_entry_full_enabled()) { 655 struct printbuf buf = PRINTBUF; 656 657 bch2_printbuf_make_room(&buf, 4096); 658 659 spin_lock(&j->lock); 660 prt_printf(&buf, "seq %llu\n", journal_cur_seq(j)); 661 bch2_journal_bufs_to_text(&buf, j); 662 spin_unlock(&j->lock); 663 664 trace_journal_entry_full(c, buf.buf); 665 printbuf_exit(&buf); 666 count_event(c, journal_entry_full); 667 } 668 669 if (ret == -BCH_ERR_journal_max_open && 670 track_event_change(&c->times[BCH_TIME_blocked_journal_max_open], true) && 671 trace_journal_entry_full_enabled()) { 672 struct printbuf buf = PRINTBUF; 673 674 bch2_printbuf_make_room(&buf, 4096); 675 676 spin_lock(&j->lock); 677 prt_printf(&buf, "seq %llu\n", journal_cur_seq(j)); 678 bch2_journal_bufs_to_text(&buf, j); 679 spin_unlock(&j->lock); 680 681 trace_journal_entry_full(c, buf.buf); 682 printbuf_exit(&buf); 683 count_event(c, journal_entry_full); 684 } 685 686 /* 687 * Journal is full - can't rely on reclaim from work item due to 688 * freezing: 689 */ 690 if ((ret == -BCH_ERR_journal_full || 691 ret == -BCH_ERR_journal_pin_full) && 692 !(flags & JOURNAL_RES_GET_NONBLOCK)) { 693 if (can_discard) { 694 bch2_journal_do_discards(j); 695 goto retry; 696 } 697 698 if (mutex_trylock(&j->reclaim_lock)) { 699 bch2_journal_reclaim(j); 700 mutex_unlock(&j->reclaim_lock); 701 } 702 } 703 704 return ret; 705 } 706 707 static unsigned max_dev_latency(struct bch_fs *c) 708 { 709 u64 nsecs = 0; 710 711 guard(rcu)(); 712 for_each_rw_member_rcu(c, ca) 713 nsecs = max(nsecs, ca->io_latency[WRITE].stats.max_duration); 714 715 return nsecs_to_jiffies(nsecs); 716 } 717 718 /* 719 * Essentially the entry function to the journaling code. When bcachefs is doing 720 * a btree insert, it calls this function to get the current journal write. 721 * Journal write is the structure used set up journal writes. The calling 722 * function will then add its keys to the structure, queuing them for the next 723 * write. 724 * 725 * To ensure forward progress, the current task must not be holding any 726 * btree node write locks. 727 */ 728 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, 729 unsigned flags, 730 struct btree_trans *trans) 731 { 732 int ret; 733 734 if (closure_wait_event_timeout(&j->async_wait, 735 !bch2_err_matches(ret = __journal_res_get(j, res, flags), BCH_ERR_operation_blocked) || 736 (flags & JOURNAL_RES_GET_NONBLOCK), 737 HZ)) 738 return ret; 739 740 if (trans) 741 bch2_trans_unlock_long(trans); 742 743 struct bch_fs *c = container_of(j, struct bch_fs, journal); 744 int remaining_wait = max(max_dev_latency(c) * 2, HZ * 10); 745 746 remaining_wait = max(0, remaining_wait - HZ); 747 748 if (closure_wait_event_timeout(&j->async_wait, 749 !bch2_err_matches(ret = __journal_res_get(j, res, flags), BCH_ERR_operation_blocked) || 750 (flags & JOURNAL_RES_GET_NONBLOCK), 751 remaining_wait)) 752 return ret; 753 754 struct printbuf buf = PRINTBUF; 755 bch2_journal_debug_to_text(&buf, j); 756 bch2_print_str(c, KERN_ERR, buf.buf); 757 prt_printf(&buf, bch2_fmt(c, "Journal stuck? Waited for 10 seconds, err %s"), bch2_err_str(ret)); 758 printbuf_exit(&buf); 759 760 closure_wait_event(&j->async_wait, 761 !bch2_err_matches(ret = __journal_res_get(j, res, flags), BCH_ERR_operation_blocked) || 762 (flags & JOURNAL_RES_GET_NONBLOCK)); 763 return ret; 764 } 765 766 /* journal_entry_res: */ 767 768 void bch2_journal_entry_res_resize(struct journal *j, 769 struct journal_entry_res *res, 770 unsigned new_u64s) 771 { 772 union journal_res_state state; 773 int d = new_u64s - res->u64s; 774 775 spin_lock(&j->lock); 776 777 j->entry_u64s_reserved += d; 778 if (d <= 0) 779 goto out; 780 781 j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d); 782 state = READ_ONCE(j->reservations); 783 784 if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL && 785 state.cur_entry_offset > j->cur_entry_u64s) { 786 j->cur_entry_u64s += d; 787 /* 788 * Not enough room in current journal entry, have to flush it: 789 */ 790 __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); 791 } else { 792 journal_cur_buf(j)->u64s_reserved += d; 793 } 794 out: 795 spin_unlock(&j->lock); 796 res->u64s += d; 797 } 798 799 /* journal flushing: */ 800 801 /** 802 * bch2_journal_flush_seq_async - wait for a journal entry to be written 803 * @j: journal object 804 * @seq: seq to flush 805 * @parent: closure object to wait with 806 * Returns: 1 if @seq has already been flushed, 0 if @seq is being flushed, 807 * -BCH_ERR_journal_flush_err if @seq will never be flushed 808 * 809 * Like bch2_journal_wait_on_seq, except that it triggers a write immediately if 810 * necessary 811 */ 812 int bch2_journal_flush_seq_async(struct journal *j, u64 seq, 813 struct closure *parent) 814 { 815 struct bch_fs *c = container_of(j, struct bch_fs, journal); 816 struct journal_buf *buf; 817 int ret = 0; 818 819 if (seq <= j->flushed_seq_ondisk) 820 return 1; 821 822 spin_lock(&j->lock); 823 824 if (WARN_ONCE(seq > journal_cur_seq(j), 825 "requested to flush journal seq %llu, but currently at %llu", 826 seq, journal_cur_seq(j))) 827 goto out; 828 829 /* Recheck under lock: */ 830 if (j->err_seq && seq >= j->err_seq) { 831 ret = bch_err_throw(c, journal_flush_err); 832 goto out; 833 } 834 835 if (seq <= j->flushed_seq_ondisk) { 836 ret = 1; 837 goto out; 838 } 839 840 /* if seq was written, but not flushed - flush a newer one instead */ 841 seq = max(seq, journal_last_unwritten_seq(j)); 842 843 recheck_need_open: 844 if (seq > journal_cur_seq(j)) { 845 struct journal_res res = { 0 }; 846 847 if (journal_entry_is_open(j)) 848 __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); 849 850 spin_unlock(&j->lock); 851 852 /* 853 * We're called from bch2_journal_flush_seq() -> wait_event(); 854 * but this might block. We won't usually block, so we won't 855 * livelock: 856 */ 857 sched_annotate_sleep(); 858 ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0, NULL); 859 if (ret) 860 return ret; 861 862 seq = res.seq; 863 buf = journal_seq_to_buf(j, seq); 864 buf->must_flush = true; 865 866 if (!buf->flush_time) { 867 buf->flush_time = local_clock() ?: 1; 868 buf->expires = jiffies; 869 } 870 871 if (parent && !closure_wait(&buf->wait, parent)) 872 BUG(); 873 874 bch2_journal_res_put(j, &res); 875 876 spin_lock(&j->lock); 877 goto want_write; 878 } 879 880 /* 881 * if write was kicked off without a flush, or if we promised it 882 * wouldn't be a flush, flush the next sequence number instead 883 */ 884 buf = journal_seq_to_buf(j, seq); 885 if (buf->noflush) { 886 seq++; 887 goto recheck_need_open; 888 } 889 890 buf->must_flush = true; 891 j->flushing_seq = max(j->flushing_seq, seq); 892 893 if (parent && !closure_wait(&buf->wait, parent)) 894 BUG(); 895 want_write: 896 if (seq == journal_cur_seq(j)) 897 journal_entry_want_write(j); 898 out: 899 spin_unlock(&j->lock); 900 return ret; 901 } 902 903 int bch2_journal_flush_seq(struct journal *j, u64 seq, unsigned task_state) 904 { 905 u64 start_time = local_clock(); 906 int ret, ret2; 907 908 /* 909 * Don't update time_stats when @seq is already flushed: 910 */ 911 if (seq <= j->flushed_seq_ondisk) 912 return 0; 913 914 ret = wait_event_state(j->wait, 915 (ret2 = bch2_journal_flush_seq_async(j, seq, NULL)), 916 task_state); 917 918 if (!ret) 919 bch2_time_stats_update(j->flush_seq_time, start_time); 920 921 return ret ?: ret2 < 0 ? ret2 : 0; 922 } 923 924 /* 925 * bch2_journal_flush_async - if there is an open journal entry, or a journal 926 * still being written, write it and wait for the write to complete 927 */ 928 void bch2_journal_flush_async(struct journal *j, struct closure *parent) 929 { 930 bch2_journal_flush_seq_async(j, atomic64_read(&j->seq), parent); 931 } 932 933 int bch2_journal_flush(struct journal *j) 934 { 935 return bch2_journal_flush_seq(j, atomic64_read(&j->seq), TASK_UNINTERRUPTIBLE); 936 } 937 938 /* 939 * bch2_journal_noflush_seq - ask the journal not to issue any flushes in the 940 * range [start, end) 941 * @seq 942 */ 943 bool bch2_journal_noflush_seq(struct journal *j, u64 start, u64 end) 944 { 945 struct bch_fs *c = container_of(j, struct bch_fs, journal); 946 u64 unwritten_seq; 947 bool ret = false; 948 949 if (!(c->sb.features & (1ULL << BCH_FEATURE_journal_no_flush))) 950 return false; 951 952 if (c->journal.flushed_seq_ondisk >= start) 953 return false; 954 955 spin_lock(&j->lock); 956 if (c->journal.flushed_seq_ondisk >= start) 957 goto out; 958 959 for (unwritten_seq = journal_last_unwritten_seq(j); 960 unwritten_seq < end; 961 unwritten_seq++) { 962 struct journal_buf *buf = journal_seq_to_buf(j, unwritten_seq); 963 964 /* journal flush already in flight, or flush requseted */ 965 if (buf->must_flush) 966 goto out; 967 968 buf->noflush = true; 969 } 970 971 ret = true; 972 out: 973 spin_unlock(&j->lock); 974 return ret; 975 } 976 977 static int __bch2_journal_meta(struct journal *j) 978 { 979 struct journal_res res = {}; 980 int ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0, NULL); 981 if (ret) 982 return ret; 983 984 struct journal_buf *buf = j->buf + (res.seq & JOURNAL_BUF_MASK); 985 buf->must_flush = true; 986 987 if (!buf->flush_time) { 988 buf->flush_time = local_clock() ?: 1; 989 buf->expires = jiffies; 990 } 991 992 bch2_journal_res_put(j, &res); 993 994 return bch2_journal_flush_seq(j, res.seq, TASK_UNINTERRUPTIBLE); 995 } 996 997 int bch2_journal_meta(struct journal *j) 998 { 999 struct bch_fs *c = container_of(j, struct bch_fs, journal); 1000 1001 if (!enumerated_ref_tryget(&c->writes, BCH_WRITE_REF_journal)) 1002 return bch_err_throw(c, erofs_no_writes); 1003 1004 int ret = __bch2_journal_meta(j); 1005 enumerated_ref_put(&c->writes, BCH_WRITE_REF_journal); 1006 return ret; 1007 } 1008 1009 /* block/unlock the journal: */ 1010 1011 void bch2_journal_unblock(struct journal *j) 1012 { 1013 spin_lock(&j->lock); 1014 if (!--j->blocked && 1015 j->cur_entry_offset_if_blocked < JOURNAL_ENTRY_CLOSED_VAL && 1016 j->reservations.cur_entry_offset == JOURNAL_ENTRY_BLOCKED_VAL) { 1017 union journal_res_state old, new; 1018 1019 old.v = atomic64_read(&j->reservations.counter); 1020 do { 1021 new.v = old.v; 1022 new.cur_entry_offset = j->cur_entry_offset_if_blocked; 1023 } while (!atomic64_try_cmpxchg(&j->reservations.counter, &old.v, new.v)); 1024 } 1025 spin_unlock(&j->lock); 1026 1027 journal_wake(j); 1028 } 1029 1030 static void __bch2_journal_block(struct journal *j) 1031 { 1032 if (!j->blocked++) { 1033 union journal_res_state old, new; 1034 1035 old.v = atomic64_read(&j->reservations.counter); 1036 do { 1037 j->cur_entry_offset_if_blocked = old.cur_entry_offset; 1038 1039 if (j->cur_entry_offset_if_blocked >= JOURNAL_ENTRY_CLOSED_VAL) 1040 break; 1041 1042 new.v = old.v; 1043 new.cur_entry_offset = JOURNAL_ENTRY_BLOCKED_VAL; 1044 } while (!atomic64_try_cmpxchg(&j->reservations.counter, &old.v, new.v)); 1045 1046 if (old.cur_entry_offset < JOURNAL_ENTRY_BLOCKED_VAL) 1047 journal_cur_buf(j)->data->u64s = cpu_to_le32(old.cur_entry_offset); 1048 } 1049 } 1050 1051 void bch2_journal_block(struct journal *j) 1052 { 1053 spin_lock(&j->lock); 1054 __bch2_journal_block(j); 1055 spin_unlock(&j->lock); 1056 1057 journal_quiesce(j); 1058 } 1059 1060 static struct journal_buf *__bch2_next_write_buffer_flush_journal_buf(struct journal *j, 1061 u64 max_seq, bool *blocked) 1062 { 1063 struct journal_buf *ret = NULL; 1064 1065 /* We're inside wait_event(), but using mutex_lock(: */ 1066 sched_annotate_sleep(); 1067 mutex_lock(&j->buf_lock); 1068 spin_lock(&j->lock); 1069 max_seq = min(max_seq, journal_cur_seq(j)); 1070 1071 for (u64 seq = journal_last_unwritten_seq(j); 1072 seq <= max_seq; 1073 seq++) { 1074 unsigned idx = seq & JOURNAL_BUF_MASK; 1075 struct journal_buf *buf = j->buf + idx; 1076 1077 if (buf->need_flush_to_write_buffer) { 1078 union journal_res_state s; 1079 s.v = atomic64_read_acquire(&j->reservations.counter); 1080 1081 unsigned open = seq == journal_cur_seq(j) && __journal_entry_is_open(s); 1082 1083 if (open && !*blocked) { 1084 __bch2_journal_block(j); 1085 *blocked = true; 1086 } 1087 1088 ret = journal_state_count(s, idx & JOURNAL_STATE_BUF_MASK) > open 1089 ? ERR_PTR(-EAGAIN) 1090 : buf; 1091 break; 1092 } 1093 } 1094 1095 spin_unlock(&j->lock); 1096 if (IS_ERR_OR_NULL(ret)) 1097 mutex_unlock(&j->buf_lock); 1098 return ret; 1099 } 1100 1101 struct journal_buf *bch2_next_write_buffer_flush_journal_buf(struct journal *j, 1102 u64 max_seq, bool *blocked) 1103 { 1104 struct journal_buf *ret; 1105 *blocked = false; 1106 1107 wait_event(j->wait, (ret = __bch2_next_write_buffer_flush_journal_buf(j, 1108 max_seq, blocked)) != ERR_PTR(-EAGAIN)); 1109 if (IS_ERR_OR_NULL(ret) && *blocked) 1110 bch2_journal_unblock(j); 1111 1112 return ret; 1113 } 1114 1115 /* allocate journal on a device: */ 1116 1117 static int bch2_set_nr_journal_buckets_iter(struct bch_dev *ca, unsigned nr, 1118 bool new_fs, struct closure *cl) 1119 { 1120 struct bch_fs *c = ca->fs; 1121 struct journal_device *ja = &ca->journal; 1122 u64 *new_bucket_seq = NULL, *new_buckets = NULL; 1123 struct open_bucket **ob = NULL; 1124 long *bu = NULL; 1125 unsigned i, pos, nr_got = 0, nr_want = nr - ja->nr; 1126 int ret = 0; 1127 1128 BUG_ON(nr <= ja->nr); 1129 1130 bu = kcalloc(nr_want, sizeof(*bu), GFP_KERNEL); 1131 ob = kcalloc(nr_want, sizeof(*ob), GFP_KERNEL); 1132 new_buckets = kcalloc(nr, sizeof(u64), GFP_KERNEL); 1133 new_bucket_seq = kcalloc(nr, sizeof(u64), GFP_KERNEL); 1134 if (!bu || !ob || !new_buckets || !new_bucket_seq) { 1135 ret = bch_err_throw(c, ENOMEM_set_nr_journal_buckets); 1136 goto err_free; 1137 } 1138 1139 for (nr_got = 0; nr_got < nr_want; nr_got++) { 1140 enum bch_watermark watermark = new_fs 1141 ? BCH_WATERMARK_btree 1142 : BCH_WATERMARK_normal; 1143 1144 ob[nr_got] = bch2_bucket_alloc(c, ca, watermark, 1145 BCH_DATA_journal, cl); 1146 ret = PTR_ERR_OR_ZERO(ob[nr_got]); 1147 if (ret) 1148 break; 1149 1150 if (!new_fs) { 1151 ret = bch2_trans_run(c, 1152 bch2_trans_mark_metadata_bucket(trans, ca, 1153 ob[nr_got]->bucket, BCH_DATA_journal, 1154 ca->mi.bucket_size, BTREE_TRIGGER_transactional)); 1155 if (ret) { 1156 bch2_open_bucket_put(c, ob[nr_got]); 1157 bch_err_msg(c, ret, "marking new journal buckets"); 1158 break; 1159 } 1160 } 1161 1162 bu[nr_got] = ob[nr_got]->bucket; 1163 } 1164 1165 if (!nr_got) 1166 goto err_free; 1167 1168 /* Don't return an error if we successfully allocated some buckets: */ 1169 ret = 0; 1170 1171 if (c) { 1172 bch2_journal_flush_all_pins(&c->journal); 1173 bch2_journal_block(&c->journal); 1174 mutex_lock(&c->sb_lock); 1175 } 1176 1177 memcpy(new_buckets, ja->buckets, ja->nr * sizeof(u64)); 1178 memcpy(new_bucket_seq, ja->bucket_seq, ja->nr * sizeof(u64)); 1179 1180 BUG_ON(ja->discard_idx > ja->nr); 1181 1182 pos = ja->discard_idx ?: ja->nr; 1183 1184 memmove(new_buckets + pos + nr_got, 1185 new_buckets + pos, 1186 sizeof(new_buckets[0]) * (ja->nr - pos)); 1187 memmove(new_bucket_seq + pos + nr_got, 1188 new_bucket_seq + pos, 1189 sizeof(new_bucket_seq[0]) * (ja->nr - pos)); 1190 1191 for (i = 0; i < nr_got; i++) { 1192 new_buckets[pos + i] = bu[i]; 1193 new_bucket_seq[pos + i] = 0; 1194 } 1195 1196 nr = ja->nr + nr_got; 1197 1198 ret = bch2_journal_buckets_to_sb(c, ca, new_buckets, nr); 1199 if (ret) 1200 goto err_unblock; 1201 1202 bch2_write_super(c); 1203 1204 /* Commit: */ 1205 if (c) 1206 spin_lock(&c->journal.lock); 1207 1208 swap(new_buckets, ja->buckets); 1209 swap(new_bucket_seq, ja->bucket_seq); 1210 ja->nr = nr; 1211 1212 if (pos <= ja->discard_idx) 1213 ja->discard_idx = (ja->discard_idx + nr_got) % ja->nr; 1214 if (pos <= ja->dirty_idx_ondisk) 1215 ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + nr_got) % ja->nr; 1216 if (pos <= ja->dirty_idx) 1217 ja->dirty_idx = (ja->dirty_idx + nr_got) % ja->nr; 1218 if (pos <= ja->cur_idx) 1219 ja->cur_idx = (ja->cur_idx + nr_got) % ja->nr; 1220 1221 if (c) 1222 spin_unlock(&c->journal.lock); 1223 err_unblock: 1224 if (c) { 1225 bch2_journal_unblock(&c->journal); 1226 mutex_unlock(&c->sb_lock); 1227 } 1228 1229 if (ret && !new_fs) 1230 for (i = 0; i < nr_got; i++) 1231 bch2_trans_run(c, 1232 bch2_trans_mark_metadata_bucket(trans, ca, 1233 bu[i], BCH_DATA_free, 0, 1234 BTREE_TRIGGER_transactional)); 1235 err_free: 1236 for (i = 0; i < nr_got; i++) 1237 bch2_open_bucket_put(c, ob[i]); 1238 1239 kfree(new_bucket_seq); 1240 kfree(new_buckets); 1241 kfree(ob); 1242 kfree(bu); 1243 return ret; 1244 } 1245 1246 static int bch2_set_nr_journal_buckets_loop(struct bch_fs *c, struct bch_dev *ca, 1247 unsigned nr, bool new_fs) 1248 { 1249 struct journal_device *ja = &ca->journal; 1250 int ret = 0; 1251 1252 struct closure cl; 1253 closure_init_stack(&cl); 1254 1255 /* don't handle reducing nr of buckets yet: */ 1256 if (nr < ja->nr) 1257 return 0; 1258 1259 while (!ret && ja->nr < nr) { 1260 struct disk_reservation disk_res = { 0, 0, 0 }; 1261 1262 /* 1263 * note: journal buckets aren't really counted as _sectors_ used yet, so 1264 * we don't need the disk reservation to avoid the BUG_ON() in buckets.c 1265 * when space used goes up without a reservation - but we do need the 1266 * reservation to ensure we'll actually be able to allocate: 1267 * 1268 * XXX: that's not right, disk reservations only ensure a 1269 * filesystem-wide allocation will succeed, this is a device 1270 * specific allocation - we can hang here: 1271 */ 1272 if (!new_fs) { 1273 ret = bch2_disk_reservation_get(c, &disk_res, 1274 bucket_to_sector(ca, nr - ja->nr), 1, 0); 1275 if (ret) 1276 break; 1277 } 1278 1279 ret = bch2_set_nr_journal_buckets_iter(ca, nr, new_fs, &cl); 1280 1281 if (ret == -BCH_ERR_bucket_alloc_blocked || 1282 ret == -BCH_ERR_open_buckets_empty) 1283 ret = 0; /* wait and retry */ 1284 1285 bch2_disk_reservation_put(c, &disk_res); 1286 bch2_wait_on_allocator(c, &cl); 1287 } 1288 1289 return ret; 1290 } 1291 1292 /* 1293 * Allocate more journal space at runtime - not currently making use if it, but 1294 * the code works: 1295 */ 1296 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca, 1297 unsigned nr) 1298 { 1299 down_write(&c->state_lock); 1300 int ret = bch2_set_nr_journal_buckets_loop(c, ca, nr, false); 1301 up_write(&c->state_lock); 1302 1303 bch_err_fn(c, ret); 1304 return ret; 1305 } 1306 1307 int bch2_dev_journal_bucket_delete(struct bch_dev *ca, u64 b) 1308 { 1309 struct bch_fs *c = ca->fs; 1310 struct journal *j = &c->journal; 1311 struct journal_device *ja = &ca->journal; 1312 1313 guard(mutex)(&c->sb_lock); 1314 unsigned pos; 1315 for (pos = 0; pos < ja->nr; pos++) 1316 if (ja->buckets[pos] == b) 1317 break; 1318 1319 if (pos == ja->nr) { 1320 bch_err(ca, "journal bucket %llu not found when deleting", b); 1321 return -EINVAL; 1322 } 1323 1324 u64 *new_buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);; 1325 if (!new_buckets) 1326 return bch_err_throw(c, ENOMEM_set_nr_journal_buckets); 1327 1328 memcpy(new_buckets, ja->buckets, ja->nr * sizeof(u64)); 1329 memmove(&new_buckets[pos], 1330 &new_buckets[pos + 1], 1331 (ja->nr - 1 - pos) * sizeof(new_buckets[0])); 1332 1333 int ret = bch2_journal_buckets_to_sb(c, ca, ja->buckets, ja->nr - 1) ?: 1334 bch2_write_super(c); 1335 if (ret) { 1336 kfree(new_buckets); 1337 return ret; 1338 } 1339 1340 scoped_guard(spinlock, &j->lock) { 1341 if (pos < ja->discard_idx) 1342 --ja->discard_idx; 1343 if (pos < ja->dirty_idx_ondisk) 1344 --ja->dirty_idx_ondisk; 1345 if (pos < ja->dirty_idx) 1346 --ja->dirty_idx; 1347 if (pos < ja->cur_idx) 1348 --ja->cur_idx; 1349 1350 ja->nr--; 1351 1352 memmove(&ja->buckets[pos], 1353 &ja->buckets[pos + 1], 1354 (ja->nr - pos) * sizeof(ja->buckets[0])); 1355 1356 memmove(&ja->bucket_seq[pos], 1357 &ja->bucket_seq[pos + 1], 1358 (ja->nr - pos) * sizeof(ja->bucket_seq[0])); 1359 1360 bch2_journal_space_available(j); 1361 } 1362 1363 kfree(new_buckets); 1364 return 0; 1365 } 1366 1367 int bch2_dev_journal_alloc(struct bch_dev *ca, bool new_fs) 1368 { 1369 struct bch_fs *c = ca->fs; 1370 1371 if (!(ca->mi.data_allowed & BIT(BCH_DATA_journal))) 1372 return 0; 1373 1374 if (c->sb.features & BIT_ULL(BCH_FEATURE_small_image)) { 1375 bch_err(c, "cannot allocate journal, filesystem is an unresized image file"); 1376 return bch_err_throw(c, erofs_filesystem_full); 1377 } 1378 1379 unsigned nr; 1380 int ret; 1381 1382 if (dynamic_fault("bcachefs:add:journal_alloc")) { 1383 ret = bch_err_throw(c, ENOMEM_set_nr_journal_buckets); 1384 goto err; 1385 } 1386 1387 /* 1/128th of the device by default: */ 1388 nr = ca->mi.nbuckets >> 7; 1389 1390 /* 1391 * clamp journal size to 8192 buckets or 8GB (in sectors), whichever 1392 * is smaller: 1393 */ 1394 nr = clamp_t(unsigned, nr, 1395 BCH_JOURNAL_BUCKETS_MIN, 1396 min(1 << 13, 1397 (1 << 24) / ca->mi.bucket_size)); 1398 1399 ret = bch2_set_nr_journal_buckets_loop(c, ca, nr, new_fs); 1400 err: 1401 bch_err_fn(ca, ret); 1402 return ret; 1403 } 1404 1405 int bch2_fs_journal_alloc(struct bch_fs *c) 1406 { 1407 for_each_online_member(c, ca, BCH_DEV_READ_REF_fs_journal_alloc) { 1408 if (ca->journal.nr) 1409 continue; 1410 1411 int ret = bch2_dev_journal_alloc(ca, true); 1412 if (ret) { 1413 enumerated_ref_put(&ca->io_ref[READ], 1414 BCH_DEV_READ_REF_fs_journal_alloc); 1415 return ret; 1416 } 1417 } 1418 1419 return 0; 1420 } 1421 1422 /* startup/shutdown: */ 1423 1424 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx) 1425 { 1426 bool ret = false; 1427 u64 seq; 1428 1429 spin_lock(&j->lock); 1430 for (seq = journal_last_unwritten_seq(j); 1431 seq <= journal_cur_seq(j) && !ret; 1432 seq++) { 1433 struct journal_buf *buf = journal_seq_to_buf(j, seq); 1434 1435 if (bch2_bkey_has_device_c(bkey_i_to_s_c(&buf->key), dev_idx)) 1436 ret = true; 1437 } 1438 spin_unlock(&j->lock); 1439 1440 return ret; 1441 } 1442 1443 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca) 1444 { 1445 wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx)); 1446 } 1447 1448 void bch2_fs_journal_stop(struct journal *j) 1449 { 1450 if (!test_bit(JOURNAL_running, &j->flags)) 1451 return; 1452 1453 bch2_journal_reclaim_stop(j); 1454 bch2_journal_flush_all_pins(j); 1455 1456 wait_event(j->wait, bch2_journal_entry_close(j)); 1457 1458 /* 1459 * Always write a new journal entry, to make sure the clock hands are up 1460 * to date (and match the superblock) 1461 */ 1462 __bch2_journal_meta(j); 1463 1464 journal_quiesce(j); 1465 cancel_delayed_work_sync(&j->write_work); 1466 1467 WARN(!bch2_journal_error(j) && 1468 test_bit(JOURNAL_replay_done, &j->flags) && 1469 j->last_empty_seq != journal_cur_seq(j), 1470 "journal shutdown error: cur seq %llu but last empty seq %llu", 1471 journal_cur_seq(j), j->last_empty_seq); 1472 1473 if (!bch2_journal_error(j)) 1474 clear_bit(JOURNAL_running, &j->flags); 1475 } 1476 1477 int bch2_fs_journal_start(struct journal *j, u64 last_seq, u64 cur_seq) 1478 { 1479 struct bch_fs *c = container_of(j, struct bch_fs, journal); 1480 struct journal_entry_pin_list *p; 1481 struct journal_replay *i, **_i; 1482 struct genradix_iter iter; 1483 bool had_entries = false; 1484 1485 /* 1486 * 1487 * XXX pick most recent non blacklisted sequence number 1488 */ 1489 1490 cur_seq = max(cur_seq, bch2_journal_last_blacklisted_seq(c)); 1491 1492 if (cur_seq >= JOURNAL_SEQ_MAX) { 1493 bch_err(c, "cannot start: journal seq overflow"); 1494 return -EINVAL; 1495 } 1496 1497 /* Clean filesystem? */ 1498 if (!last_seq) 1499 last_seq = cur_seq; 1500 1501 u64 nr = cur_seq - last_seq; 1502 1503 /* 1504 * Extra fudge factor, in case we crashed when the journal pin fifo was 1505 * nearly or completely full. We'll need to be able to open additional 1506 * journal entries (at least a few) in order for journal replay to get 1507 * going: 1508 */ 1509 nr += nr / 4; 1510 1511 nr = max(nr, JOURNAL_PIN); 1512 init_fifo(&j->pin, roundup_pow_of_two(nr), GFP_KERNEL); 1513 if (!j->pin.data) { 1514 bch_err(c, "error reallocating journal fifo (%llu open entries)", nr); 1515 return bch_err_throw(c, ENOMEM_journal_pin_fifo); 1516 } 1517 1518 j->replay_journal_seq = last_seq; 1519 j->replay_journal_seq_end = cur_seq; 1520 j->last_seq_ondisk = last_seq; 1521 j->flushed_seq_ondisk = cur_seq - 1; 1522 j->seq_write_started = cur_seq - 1; 1523 j->seq_ondisk = cur_seq - 1; 1524 j->pin.front = last_seq; 1525 j->pin.back = cur_seq; 1526 atomic64_set(&j->seq, cur_seq - 1); 1527 1528 u64 seq; 1529 fifo_for_each_entry_ptr(p, &j->pin, seq) 1530 journal_pin_list_init(p, 1); 1531 1532 genradix_for_each(&c->journal_entries, iter, _i) { 1533 i = *_i; 1534 1535 if (journal_replay_ignore(i)) 1536 continue; 1537 1538 seq = le64_to_cpu(i->j.seq); 1539 BUG_ON(seq >= cur_seq); 1540 1541 if (seq < last_seq) 1542 continue; 1543 1544 if (journal_entry_empty(&i->j)) 1545 j->last_empty_seq = le64_to_cpu(i->j.seq); 1546 1547 p = journal_seq_pin(j, seq); 1548 1549 p->devs.nr = 0; 1550 darray_for_each(i->ptrs, ptr) 1551 bch2_dev_list_add_dev(&p->devs, ptr->dev); 1552 1553 had_entries = true; 1554 } 1555 1556 if (!had_entries) 1557 j->last_empty_seq = cur_seq - 1; /* to match j->seq */ 1558 1559 spin_lock(&j->lock); 1560 j->last_flush_write = jiffies; 1561 1562 j->reservations.idx = journal_cur_seq(j); 1563 1564 c->last_bucket_seq_cleanup = journal_cur_seq(j); 1565 spin_unlock(&j->lock); 1566 1567 return 0; 1568 } 1569 1570 void bch2_journal_set_replay_done(struct journal *j) 1571 { 1572 /* 1573 * journal_space_available must happen before setting JOURNAL_running 1574 * JOURNAL_running must happen before JOURNAL_replay_done 1575 */ 1576 spin_lock(&j->lock); 1577 bch2_journal_space_available(j); 1578 1579 set_bit(JOURNAL_need_flush_write, &j->flags); 1580 set_bit(JOURNAL_running, &j->flags); 1581 set_bit(JOURNAL_replay_done, &j->flags); 1582 spin_unlock(&j->lock); 1583 } 1584 1585 /* init/exit: */ 1586 1587 void bch2_dev_journal_exit(struct bch_dev *ca) 1588 { 1589 struct journal_device *ja = &ca->journal; 1590 1591 for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { 1592 kfree(ja->bio[i]); 1593 ja->bio[i] = NULL; 1594 } 1595 1596 kfree(ja->buckets); 1597 kfree(ja->bucket_seq); 1598 ja->buckets = NULL; 1599 ja->bucket_seq = NULL; 1600 } 1601 1602 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) 1603 { 1604 struct bch_fs *c = ca->fs; 1605 struct journal_device *ja = &ca->journal; 1606 struct bch_sb_field_journal *journal_buckets = 1607 bch2_sb_field_get(sb, journal); 1608 struct bch_sb_field_journal_v2 *journal_buckets_v2 = 1609 bch2_sb_field_get(sb, journal_v2); 1610 1611 ja->nr = 0; 1612 1613 if (journal_buckets_v2) { 1614 unsigned nr = bch2_sb_field_journal_v2_nr_entries(journal_buckets_v2); 1615 1616 for (unsigned i = 0; i < nr; i++) 1617 ja->nr += le64_to_cpu(journal_buckets_v2->d[i].nr); 1618 } else if (journal_buckets) { 1619 ja->nr = bch2_nr_journal_buckets(journal_buckets); 1620 } 1621 1622 ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL); 1623 if (!ja->bucket_seq) 1624 return bch_err_throw(c, ENOMEM_dev_journal_init); 1625 1626 unsigned nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE); 1627 1628 for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { 1629 ja->bio[i] = kzalloc(struct_size(ja->bio[i], bio.bi_inline_vecs, 1630 nr_bvecs), GFP_KERNEL); 1631 if (!ja->bio[i]) 1632 return bch_err_throw(c, ENOMEM_dev_journal_init); 1633 1634 ja->bio[i]->ca = ca; 1635 ja->bio[i]->buf_idx = i; 1636 bio_init(&ja->bio[i]->bio, NULL, ja->bio[i]->bio.bi_inline_vecs, nr_bvecs, 0); 1637 } 1638 1639 ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL); 1640 if (!ja->buckets) 1641 return bch_err_throw(c, ENOMEM_dev_journal_init); 1642 1643 if (journal_buckets_v2) { 1644 unsigned nr = bch2_sb_field_journal_v2_nr_entries(journal_buckets_v2); 1645 unsigned dst = 0; 1646 1647 for (unsigned i = 0; i < nr; i++) 1648 for (unsigned j = 0; j < le64_to_cpu(journal_buckets_v2->d[i].nr); j++) 1649 ja->buckets[dst++] = 1650 le64_to_cpu(journal_buckets_v2->d[i].start) + j; 1651 } else if (journal_buckets) { 1652 for (unsigned i = 0; i < ja->nr; i++) 1653 ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]); 1654 } 1655 1656 return 0; 1657 } 1658 1659 void bch2_fs_journal_exit(struct journal *j) 1660 { 1661 if (j->wq) 1662 destroy_workqueue(j->wq); 1663 1664 darray_exit(&j->early_journal_entries); 1665 1666 for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) 1667 kvfree(j->buf[i].data); 1668 kvfree(j->free_buf); 1669 free_fifo(&j->pin); 1670 } 1671 1672 void bch2_fs_journal_init_early(struct journal *j) 1673 { 1674 static struct lock_class_key res_key; 1675 1676 mutex_init(&j->buf_lock); 1677 spin_lock_init(&j->lock); 1678 spin_lock_init(&j->err_lock); 1679 init_waitqueue_head(&j->wait); 1680 INIT_DELAYED_WORK(&j->write_work, journal_write_work); 1681 init_waitqueue_head(&j->reclaim_wait); 1682 init_waitqueue_head(&j->pin_flush_wait); 1683 mutex_init(&j->reclaim_lock); 1684 mutex_init(&j->discard_lock); 1685 1686 lockdep_init_map(&j->res_map, "journal res", &res_key, 0); 1687 1688 atomic64_set(&j->reservations.counter, 1689 ((union journal_res_state) 1690 { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v); 1691 } 1692 1693 int bch2_fs_journal_init(struct journal *j) 1694 { 1695 struct bch_fs *c = container_of(j, struct bch_fs, journal); 1696 1697 j->free_buf_size = j->buf_size_want = JOURNAL_ENTRY_SIZE_MIN; 1698 j->free_buf = kvmalloc(j->free_buf_size, GFP_KERNEL); 1699 if (!j->free_buf) 1700 return bch_err_throw(c, ENOMEM_journal_buf); 1701 1702 for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) 1703 j->buf[i].idx = i; 1704 1705 j->wq = alloc_workqueue("bcachefs_journal", 1706 WQ_HIGHPRI|WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 512); 1707 if (!j->wq) 1708 return bch_err_throw(c, ENOMEM_fs_other_alloc); 1709 return 0; 1710 } 1711 1712 /* debug: */ 1713 1714 static const char * const bch2_journal_flags_strs[] = { 1715 #define x(n) #n, 1716 JOURNAL_FLAGS() 1717 #undef x 1718 NULL 1719 }; 1720 1721 void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) 1722 { 1723 struct bch_fs *c = container_of(j, struct bch_fs, journal); 1724 union journal_res_state s; 1725 unsigned long now = jiffies; 1726 u64 nr_writes = j->nr_flush_writes + j->nr_noflush_writes; 1727 1728 printbuf_tabstops_reset(out); 1729 printbuf_tabstop_push(out, 28); 1730 out->atomic++; 1731 1732 guard(rcu)(); 1733 s = READ_ONCE(j->reservations); 1734 1735 prt_printf(out, "flags:\t"); 1736 prt_bitflags(out, bch2_journal_flags_strs, j->flags); 1737 prt_newline(out); 1738 prt_printf(out, "dirty journal entries:\t%llu/%llu\n", fifo_used(&j->pin), j->pin.size); 1739 prt_printf(out, "seq:\t%llu\n", journal_cur_seq(j)); 1740 prt_printf(out, "seq_ondisk:\t%llu\n", j->seq_ondisk); 1741 prt_printf(out, "last_seq:\t%llu\n", journal_last_seq(j)); 1742 prt_printf(out, "last_seq_ondisk:\t%llu\n", j->last_seq_ondisk); 1743 prt_printf(out, "flushed_seq_ondisk:\t%llu\n", j->flushed_seq_ondisk); 1744 prt_printf(out, "watermark:\t%s\n", bch2_watermarks[j->watermark]); 1745 prt_printf(out, "each entry reserved:\t%u\n", j->entry_u64s_reserved); 1746 prt_printf(out, "nr flush writes:\t%llu\n", j->nr_flush_writes); 1747 prt_printf(out, "nr noflush writes:\t%llu\n", j->nr_noflush_writes); 1748 prt_printf(out, "average write size:\t"); 1749 prt_human_readable_u64(out, nr_writes ? div64_u64(j->entry_bytes_written, nr_writes) : 0); 1750 prt_newline(out); 1751 prt_printf(out, "free buf:\t%u\n", j->free_buf ? j->free_buf_size : 0); 1752 prt_printf(out, "nr direct reclaim:\t%llu\n", j->nr_direct_reclaim); 1753 prt_printf(out, "nr background reclaim:\t%llu\n", j->nr_background_reclaim); 1754 prt_printf(out, "reclaim kicked:\t%u\n", j->reclaim_kicked); 1755 prt_printf(out, "reclaim runs in:\t%u ms\n", time_after(j->next_reclaim, now) 1756 ? jiffies_to_msecs(j->next_reclaim - jiffies) : 0); 1757 prt_printf(out, "blocked:\t%u\n", j->blocked); 1758 prt_printf(out, "current entry sectors:\t%u\n", j->cur_entry_sectors); 1759 prt_printf(out, "current entry error:\t%s\n", bch2_err_str(j->cur_entry_error)); 1760 prt_printf(out, "current entry:\t"); 1761 1762 switch (s.cur_entry_offset) { 1763 case JOURNAL_ENTRY_ERROR_VAL: 1764 prt_printf(out, "error\n"); 1765 break; 1766 case JOURNAL_ENTRY_CLOSED_VAL: 1767 prt_printf(out, "closed\n"); 1768 break; 1769 case JOURNAL_ENTRY_BLOCKED_VAL: 1770 prt_printf(out, "blocked\n"); 1771 break; 1772 default: 1773 prt_printf(out, "%u/%u\n", s.cur_entry_offset, j->cur_entry_u64s); 1774 break; 1775 } 1776 1777 prt_printf(out, "unwritten entries:\n"); 1778 bch2_journal_bufs_to_text(out, j); 1779 1780 prt_printf(out, "space:\n"); 1781 printbuf_indent_add(out, 2); 1782 prt_printf(out, "discarded\t%u:%u\n", 1783 j->space[journal_space_discarded].next_entry, 1784 j->space[journal_space_discarded].total); 1785 prt_printf(out, "clean ondisk\t%u:%u\n", 1786 j->space[journal_space_clean_ondisk].next_entry, 1787 j->space[journal_space_clean_ondisk].total); 1788 prt_printf(out, "clean\t%u:%u\n", 1789 j->space[journal_space_clean].next_entry, 1790 j->space[journal_space_clean].total); 1791 prt_printf(out, "total\t%u:%u\n", 1792 j->space[journal_space_total].next_entry, 1793 j->space[journal_space_total].total); 1794 printbuf_indent_sub(out, 2); 1795 1796 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) { 1797 if (!ca->mi.durability) 1798 continue; 1799 1800 struct journal_device *ja = &ca->journal; 1801 1802 if (!test_bit(ca->dev_idx, c->rw_devs[BCH_DATA_journal].d)) 1803 continue; 1804 1805 if (!ja->nr) 1806 continue; 1807 1808 prt_printf(out, "dev %u:\n", ca->dev_idx); 1809 prt_printf(out, "durability %u:\n", ca->mi.durability); 1810 printbuf_indent_add(out, 2); 1811 prt_printf(out, "nr\t%u\n", ja->nr); 1812 prt_printf(out, "bucket size\t%u\n", ca->mi.bucket_size); 1813 prt_printf(out, "available\t%u:%u\n", bch2_journal_dev_buckets_available(j, ja, journal_space_discarded), ja->sectors_free); 1814 prt_printf(out, "discard_idx\t%u\n", ja->discard_idx); 1815 prt_printf(out, "dirty_ondisk\t%u (seq %llu)\n",ja->dirty_idx_ondisk, ja->bucket_seq[ja->dirty_idx_ondisk]); 1816 prt_printf(out, "dirty_idx\t%u (seq %llu)\n", ja->dirty_idx, ja->bucket_seq[ja->dirty_idx]); 1817 prt_printf(out, "cur_idx\t%u (seq %llu)\n", ja->cur_idx, ja->bucket_seq[ja->cur_idx]); 1818 printbuf_indent_sub(out, 2); 1819 } 1820 1821 prt_printf(out, "replicas want %u need %u\n", c->opts.metadata_replicas, c->opts.metadata_replicas_required); 1822 1823 --out->atomic; 1824 } 1825 1826 void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) 1827 { 1828 spin_lock(&j->lock); 1829 __bch2_journal_debug_to_text(out, j); 1830 spin_unlock(&j->lock); 1831 } 1832