1 // SPDX-License-Identifier: GPL-2.0
2
3 #include "bcachefs.h"
4 #include "btree_key_cache.h"
5 #include "btree_update.h"
6 #include "btree_write_buffer.h"
7 #include "buckets.h"
8 #include "errcode.h"
9 #include "error.h"
10 #include "journal.h"
11 #include "journal_io.h"
12 #include "journal_reclaim.h"
13 #include "replicas.h"
14 #include "sb-members.h"
15 #include "trace.h"
16
17 #include <linux/kthread.h>
18 #include <linux/sched/mm.h>
19
20 static bool __should_discard_bucket(struct journal *, struct journal_device *);
21
22 /* Free space calculations: */
23
journal_space_from(struct journal_device * ja,enum journal_space_from from)24 static unsigned journal_space_from(struct journal_device *ja,
25 enum journal_space_from from)
26 {
27 switch (from) {
28 case journal_space_discarded:
29 return ja->discard_idx;
30 case journal_space_clean_ondisk:
31 return ja->dirty_idx_ondisk;
32 case journal_space_clean:
33 return ja->dirty_idx;
34 default:
35 BUG();
36 }
37 }
38
bch2_journal_dev_buckets_available(struct journal * j,struct journal_device * ja,enum journal_space_from from)39 unsigned bch2_journal_dev_buckets_available(struct journal *j,
40 struct journal_device *ja,
41 enum journal_space_from from)
42 {
43 if (!ja->nr)
44 return 0;
45
46 unsigned available = (journal_space_from(ja, from) -
47 ja->cur_idx - 1 + ja->nr) % ja->nr;
48
49 /*
50 * Don't use the last bucket unless writing the new last_seq
51 * will make another bucket available:
52 */
53 if (available && ja->dirty_idx_ondisk == ja->dirty_idx)
54 --available;
55
56 return available;
57 }
58
bch2_journal_set_watermark(struct journal * j)59 void bch2_journal_set_watermark(struct journal *j)
60 {
61 struct bch_fs *c = container_of(j, struct bch_fs, journal);
62 bool low_on_space = j->space[journal_space_clean].total * 4 <=
63 j->space[journal_space_total].total;
64 bool low_on_pin = fifo_free(&j->pin) < j->pin.size / 4;
65 bool low_on_wb = bch2_btree_write_buffer_must_wait(c);
66 unsigned watermark = low_on_space || low_on_pin || low_on_wb
67 ? BCH_WATERMARK_reclaim
68 : BCH_WATERMARK_stripe;
69
70 if (track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_space], low_on_space) ||
71 track_event_change(&c->times[BCH_TIME_blocked_journal_low_on_pin], low_on_pin) ||
72 track_event_change(&c->times[BCH_TIME_blocked_write_buffer_full], low_on_wb))
73 trace_and_count(c, journal_full, c);
74
75 mod_bit(JOURNAL_space_low, &j->flags, low_on_space || low_on_pin);
76
77 swap(watermark, j->watermark);
78 if (watermark > j->watermark)
79 journal_wake(j);
80 }
81
82 static struct journal_space
journal_dev_space_available(struct journal * j,struct bch_dev * ca,enum journal_space_from from)83 journal_dev_space_available(struct journal *j, struct bch_dev *ca,
84 enum journal_space_from from)
85 {
86 struct bch_fs *c = container_of(j, struct bch_fs, journal);
87 struct journal_device *ja = &ca->journal;
88 unsigned sectors, buckets, unwritten;
89 unsigned bucket_size_aligned = round_down(ca->mi.bucket_size, block_sectors(c));
90 u64 seq;
91
92 if (from == journal_space_total)
93 return (struct journal_space) {
94 .next_entry = bucket_size_aligned,
95 .total = bucket_size_aligned * ja->nr,
96 };
97
98 buckets = bch2_journal_dev_buckets_available(j, ja, from);
99 sectors = round_down(ja->sectors_free, block_sectors(c));
100
101 /*
102 * We that we don't allocate the space for a journal entry
103 * until we write it out - thus, account for it here:
104 */
105 for (seq = journal_last_unwritten_seq(j);
106 seq <= journal_cur_seq(j);
107 seq++) {
108 unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors;
109
110 if (!unwritten)
111 continue;
112
113 /* entry won't fit on this device, skip: */
114 if (unwritten > bucket_size_aligned)
115 continue;
116
117 if (unwritten >= sectors) {
118 if (!buckets) {
119 sectors = 0;
120 break;
121 }
122
123 buckets--;
124 sectors = bucket_size_aligned;
125 }
126
127 sectors -= unwritten;
128 }
129
130 if (sectors < ca->mi.bucket_size && buckets) {
131 buckets--;
132 sectors = bucket_size_aligned;
133 }
134
135 return (struct journal_space) {
136 .next_entry = sectors,
137 .total = sectors + buckets * bucket_size_aligned,
138 };
139 }
140
__journal_space_available(struct journal * j,unsigned nr_devs_want,enum journal_space_from from)141 static struct journal_space __journal_space_available(struct journal *j, unsigned nr_devs_want,
142 enum journal_space_from from)
143 {
144 struct bch_fs *c = container_of(j, struct bch_fs, journal);
145 unsigned pos, nr_devs = 0;
146 struct journal_space space, dev_space[BCH_SB_MEMBERS_MAX];
147 unsigned min_bucket_size = U32_MAX;
148
149 BUG_ON(nr_devs_want > ARRAY_SIZE(dev_space));
150
151 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) {
152 if (!ca->journal.nr ||
153 !ca->mi.durability)
154 continue;
155
156 min_bucket_size = min(min_bucket_size, ca->mi.bucket_size);
157
158 space = journal_dev_space_available(j, ca, from);
159 if (!space.next_entry)
160 continue;
161
162 for (pos = 0; pos < nr_devs; pos++)
163 if (space.total > dev_space[pos].total)
164 break;
165
166 array_insert_item(dev_space, nr_devs, pos, space);
167 }
168
169 if (nr_devs < nr_devs_want)
170 return (struct journal_space) { 0, 0 };
171
172 /*
173 * It's possible for bucket size to be misaligned w.r.t. the filesystem
174 * block size:
175 */
176 min_bucket_size = round_down(min_bucket_size, block_sectors(c));
177
178 /*
179 * We sorted largest to smallest, and we want the smallest out of the
180 * @nr_devs_want largest devices:
181 */
182 space = dev_space[nr_devs_want - 1];
183 space.next_entry = min(space.next_entry, min_bucket_size);
184 return space;
185 }
186
bch2_journal_space_available(struct journal * j)187 void bch2_journal_space_available(struct journal *j)
188 {
189 struct bch_fs *c = container_of(j, struct bch_fs, journal);
190 unsigned clean, clean_ondisk, total;
191 unsigned max_entry_size = min(j->buf[0].buf_size >> 9,
192 j->buf[1].buf_size >> 9);
193 unsigned nr_online = 0, nr_devs_want;
194 bool can_discard = false;
195 int ret = 0;
196
197 lockdep_assert_held(&j->lock);
198 guard(rcu)();
199
200 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) {
201 struct journal_device *ja = &ca->journal;
202
203 if (!ja->nr)
204 continue;
205
206 while (ja->dirty_idx != ja->cur_idx &&
207 ja->bucket_seq[ja->dirty_idx] < journal_last_seq(j))
208 ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
209
210 while (ja->dirty_idx_ondisk != ja->dirty_idx &&
211 ja->bucket_seq[ja->dirty_idx_ondisk] < j->last_seq_ondisk)
212 ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
213
214 can_discard |= __should_discard_bucket(j, ja);
215
216 max_entry_size = min_t(unsigned, max_entry_size, ca->mi.bucket_size);
217 nr_online++;
218 }
219
220 j->can_discard = can_discard;
221
222 if (nr_online < metadata_replicas_required(c)) {
223 if (!(c->sb.features & BIT_ULL(BCH_FEATURE_small_image))) {
224 struct printbuf buf = PRINTBUF;
225 buf.atomic++;
226 prt_printf(&buf, "insufficient writeable journal devices available: have %u, need %u\n"
227 "rw journal devs:", nr_online, metadata_replicas_required(c));
228
229 for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal])
230 prt_printf(&buf, " %s", ca->name);
231
232 bch_err(c, "%s", buf.buf);
233 printbuf_exit(&buf);
234 }
235 ret = bch_err_throw(c, insufficient_journal_devices);
236 goto out;
237 }
238
239 nr_devs_want = min_t(unsigned, nr_online, c->opts.metadata_replicas);
240
241 for (unsigned i = 0; i < journal_space_nr; i++)
242 j->space[i] = __journal_space_available(j, nr_devs_want, i);
243
244 clean_ondisk = j->space[journal_space_clean_ondisk].total;
245 clean = j->space[journal_space_clean].total;
246 total = j->space[journal_space_total].total;
247
248 if (!j->space[journal_space_discarded].next_entry)
249 ret = bch_err_throw(c, journal_full);
250
251 if ((j->space[journal_space_clean_ondisk].next_entry <
252 j->space[journal_space_clean_ondisk].total) &&
253 (clean - clean_ondisk <= total / 8) &&
254 (clean_ondisk * 2 > clean))
255 set_bit(JOURNAL_may_skip_flush, &j->flags);
256 else
257 clear_bit(JOURNAL_may_skip_flush, &j->flags);
258
259 bch2_journal_set_watermark(j);
260 out:
261 j->cur_entry_sectors = !ret
262 ? j->space[journal_space_discarded].next_entry
263 : 0;
264 j->cur_entry_error = ret;
265
266 if (!ret)
267 journal_wake(j);
268 }
269
270 /* Discards - last part of journal reclaim: */
271
__should_discard_bucket(struct journal * j,struct journal_device * ja)272 static bool __should_discard_bucket(struct journal *j, struct journal_device *ja)
273 {
274 unsigned min_free = max(4, ja->nr / 8);
275
276 return bch2_journal_dev_buckets_available(j, ja, journal_space_discarded) <
277 min_free &&
278 ja->discard_idx != ja->dirty_idx_ondisk;
279 }
280
should_discard_bucket(struct journal * j,struct journal_device * ja)281 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
282 {
283 spin_lock(&j->lock);
284 bool ret = __should_discard_bucket(j, ja);
285 spin_unlock(&j->lock);
286
287 return ret;
288 }
289
290 /*
291 * Advance ja->discard_idx as long as it points to buckets that are no longer
292 * dirty, issuing discards if necessary:
293 */
bch2_journal_do_discards(struct journal * j)294 void bch2_journal_do_discards(struct journal *j)
295 {
296 struct bch_fs *c = container_of(j, struct bch_fs, journal);
297
298 mutex_lock(&j->discard_lock);
299
300 for_each_rw_member(c, ca, BCH_DEV_WRITE_REF_journal_do_discards) {
301 struct journal_device *ja = &ca->journal;
302
303 while (should_discard_bucket(j, ja)) {
304 if (!c->opts.nochanges &&
305 bch2_discard_opt_enabled(c, ca) &&
306 bdev_max_discard_sectors(ca->disk_sb.bdev))
307 blkdev_issue_discard(ca->disk_sb.bdev,
308 bucket_to_sector(ca,
309 ja->buckets[ja->discard_idx]),
310 ca->mi.bucket_size, GFP_NOFS);
311
312 spin_lock(&j->lock);
313 ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
314
315 bch2_journal_space_available(j);
316 spin_unlock(&j->lock);
317 }
318 }
319
320 mutex_unlock(&j->discard_lock);
321 }
322
323 /*
324 * Journal entry pinning - machinery for holding a reference on a given journal
325 * entry, holding it open to ensure it gets replayed during recovery:
326 */
327
bch2_journal_reclaim_fast(struct journal * j)328 void bch2_journal_reclaim_fast(struct journal *j)
329 {
330 bool popped = false;
331
332 lockdep_assert_held(&j->lock);
333
334 /*
335 * Unpin journal entries whose reference counts reached zero, meaning
336 * all btree nodes got written out
337 */
338 while (!fifo_empty(&j->pin) &&
339 j->pin.front <= j->seq_ondisk &&
340 !atomic_read(&fifo_peek_front(&j->pin).count)) {
341 j->pin.front++;
342 popped = true;
343 }
344
345 if (popped) {
346 bch2_journal_space_available(j);
347 __closure_wake_up(&j->reclaim_flush_wait);
348 }
349 }
350
__bch2_journal_pin_put(struct journal * j,u64 seq)351 bool __bch2_journal_pin_put(struct journal *j, u64 seq)
352 {
353 struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq);
354
355 return atomic_dec_and_test(&pin_list->count);
356 }
357
bch2_journal_pin_put(struct journal * j,u64 seq)358 void bch2_journal_pin_put(struct journal *j, u64 seq)
359 {
360 if (__bch2_journal_pin_put(j, seq)) {
361 spin_lock(&j->lock);
362 bch2_journal_reclaim_fast(j);
363 spin_unlock(&j->lock);
364 }
365 }
366
__journal_pin_drop(struct journal * j,struct journal_entry_pin * pin)367 static inline bool __journal_pin_drop(struct journal *j,
368 struct journal_entry_pin *pin)
369 {
370 struct journal_entry_pin_list *pin_list;
371
372 if (!journal_pin_active(pin))
373 return false;
374
375 if (j->flush_in_progress == pin)
376 j->flush_in_progress_dropped = true;
377
378 pin_list = journal_seq_pin(j, pin->seq);
379 pin->seq = 0;
380 list_del_init(&pin->list);
381
382 if (j->reclaim_flush_wait.list.first)
383 __closure_wake_up(&j->reclaim_flush_wait);
384
385 /*
386 * Unpinning a journal entry may make journal_next_bucket() succeed, if
387 * writing a new last_seq will now make another bucket available:
388 */
389 return atomic_dec_and_test(&pin_list->count) &&
390 pin_list == &fifo_peek_front(&j->pin);
391 }
392
bch2_journal_pin_drop(struct journal * j,struct journal_entry_pin * pin)393 void bch2_journal_pin_drop(struct journal *j,
394 struct journal_entry_pin *pin)
395 {
396 spin_lock(&j->lock);
397 if (__journal_pin_drop(j, pin))
398 bch2_journal_reclaim_fast(j);
399 spin_unlock(&j->lock);
400 }
401
journal_pin_type(struct journal_entry_pin * pin,journal_pin_flush_fn fn)402 static enum journal_pin_type journal_pin_type(struct journal_entry_pin *pin,
403 journal_pin_flush_fn fn)
404 {
405 if (fn == bch2_btree_node_flush0 ||
406 fn == bch2_btree_node_flush1) {
407 unsigned idx = fn == bch2_btree_node_flush1;
408 struct btree *b = container_of(pin, struct btree, writes[idx].journal);
409
410 return JOURNAL_PIN_TYPE_btree0 - b->c.level;
411 } else if (fn == bch2_btree_key_cache_journal_flush)
412 return JOURNAL_PIN_TYPE_key_cache;
413 else
414 return JOURNAL_PIN_TYPE_other;
415 }
416
bch2_journal_pin_set_locked(struct journal * j,u64 seq,struct journal_entry_pin * pin,journal_pin_flush_fn flush_fn,enum journal_pin_type type)417 static inline void bch2_journal_pin_set_locked(struct journal *j, u64 seq,
418 struct journal_entry_pin *pin,
419 journal_pin_flush_fn flush_fn,
420 enum journal_pin_type type)
421 {
422 struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq);
423
424 /*
425 * flush_fn is how we identify journal pins in debugfs, so must always
426 * exist, even if it doesn't do anything:
427 */
428 BUG_ON(!flush_fn);
429
430 atomic_inc(&pin_list->count);
431 pin->seq = seq;
432 pin->flush = flush_fn;
433
434 if (list_empty(&pin_list->unflushed[type]) &&
435 j->reclaim_flush_wait.list.first)
436 __closure_wake_up(&j->reclaim_flush_wait);
437
438 list_add(&pin->list, &pin_list->unflushed[type]);
439 }
440
bch2_journal_pin_copy(struct journal * j,struct journal_entry_pin * dst,struct journal_entry_pin * src,journal_pin_flush_fn flush_fn)441 void bch2_journal_pin_copy(struct journal *j,
442 struct journal_entry_pin *dst,
443 struct journal_entry_pin *src,
444 journal_pin_flush_fn flush_fn)
445 {
446 spin_lock(&j->lock);
447
448 u64 seq = READ_ONCE(src->seq);
449
450 if (seq < journal_last_seq(j)) {
451 /*
452 * bch2_journal_pin_copy() raced with bch2_journal_pin_drop() on
453 * the src pin - with the pin dropped, the entry to pin might no
454 * longer to exist, but that means there's no longer anything to
455 * copy and we can bail out here:
456 */
457 spin_unlock(&j->lock);
458 return;
459 }
460
461 bool reclaim = __journal_pin_drop(j, dst);
462
463 bch2_journal_pin_set_locked(j, seq, dst, flush_fn, journal_pin_type(dst, flush_fn));
464
465 if (reclaim)
466 bch2_journal_reclaim_fast(j);
467
468 /*
469 * If the journal is currently full, we might want to call flush_fn
470 * immediately:
471 */
472 if (seq == journal_last_seq(j))
473 journal_wake(j);
474 spin_unlock(&j->lock);
475 }
476
bch2_journal_pin_set(struct journal * j,u64 seq,struct journal_entry_pin * pin,journal_pin_flush_fn flush_fn)477 void bch2_journal_pin_set(struct journal *j, u64 seq,
478 struct journal_entry_pin *pin,
479 journal_pin_flush_fn flush_fn)
480 {
481 spin_lock(&j->lock);
482
483 BUG_ON(seq < journal_last_seq(j));
484
485 bool reclaim = __journal_pin_drop(j, pin);
486
487 bch2_journal_pin_set_locked(j, seq, pin, flush_fn, journal_pin_type(pin, flush_fn));
488
489 if (reclaim)
490 bch2_journal_reclaim_fast(j);
491 /*
492 * If the journal is currently full, we might want to call flush_fn
493 * immediately:
494 */
495 if (seq == journal_last_seq(j))
496 journal_wake(j);
497
498 spin_unlock(&j->lock);
499 }
500
501 /**
502 * bch2_journal_pin_flush: ensure journal pin callback is no longer running
503 * @j: journal object
504 * @pin: pin to flush
505 */
bch2_journal_pin_flush(struct journal * j,struct journal_entry_pin * pin)506 void bch2_journal_pin_flush(struct journal *j, struct journal_entry_pin *pin)
507 {
508 BUG_ON(journal_pin_active(pin));
509
510 wait_event(j->pin_flush_wait, j->flush_in_progress != pin);
511 }
512
513 /*
514 * Journal reclaim: flush references to open journal entries to reclaim space in
515 * the journal
516 *
517 * May be done by the journal code in the background as needed to free up space
518 * for more journal entries, or as part of doing a clean shutdown, or to migrate
519 * data off of a specific device:
520 */
521
522 static struct journal_entry_pin *
journal_get_next_pin(struct journal * j,u64 seq_to_flush,unsigned allowed_below_seq,unsigned allowed_above_seq,u64 * seq)523 journal_get_next_pin(struct journal *j,
524 u64 seq_to_flush,
525 unsigned allowed_below_seq,
526 unsigned allowed_above_seq,
527 u64 *seq)
528 {
529 struct journal_entry_pin_list *pin_list;
530 struct journal_entry_pin *ret = NULL;
531
532 fifo_for_each_entry_ptr(pin_list, &j->pin, *seq) {
533 if (*seq > seq_to_flush && !allowed_above_seq)
534 break;
535
536 for (unsigned i = 0; i < JOURNAL_PIN_TYPE_NR; i++)
537 if (((BIT(i) & allowed_below_seq) && *seq <= seq_to_flush) ||
538 (BIT(i) & allowed_above_seq)) {
539 ret = list_first_entry_or_null(&pin_list->unflushed[i],
540 struct journal_entry_pin, list);
541 if (ret)
542 return ret;
543 }
544 }
545
546 return NULL;
547 }
548
549 /* returns true if we did work */
journal_flush_pins(struct journal * j,u64 seq_to_flush,unsigned allowed_below_seq,unsigned allowed_above_seq,unsigned min_any,unsigned min_key_cache)550 static size_t journal_flush_pins(struct journal *j,
551 u64 seq_to_flush,
552 unsigned allowed_below_seq,
553 unsigned allowed_above_seq,
554 unsigned min_any,
555 unsigned min_key_cache)
556 {
557 struct journal_entry_pin *pin;
558 size_t nr_flushed = 0;
559 journal_pin_flush_fn flush_fn;
560 u64 seq;
561 int err;
562
563 lockdep_assert_held(&j->reclaim_lock);
564
565 while (1) {
566 unsigned allowed_above = allowed_above_seq;
567 unsigned allowed_below = allowed_below_seq;
568
569 if (min_any) {
570 allowed_above |= ~0;
571 allowed_below |= ~0;
572 }
573
574 if (min_key_cache) {
575 allowed_above |= BIT(JOURNAL_PIN_TYPE_key_cache);
576 allowed_below |= BIT(JOURNAL_PIN_TYPE_key_cache);
577 }
578
579 cond_resched();
580
581 j->last_flushed = jiffies;
582
583 spin_lock(&j->lock);
584 pin = journal_get_next_pin(j, seq_to_flush,
585 allowed_below,
586 allowed_above, &seq);
587 if (pin) {
588 BUG_ON(j->flush_in_progress);
589 j->flush_in_progress = pin;
590 j->flush_in_progress_dropped = false;
591 flush_fn = pin->flush;
592 }
593 spin_unlock(&j->lock);
594
595 if (!pin)
596 break;
597
598 if (min_key_cache && pin->flush == bch2_btree_key_cache_journal_flush)
599 min_key_cache--;
600
601 if (min_any)
602 min_any--;
603
604 err = flush_fn(j, pin, seq);
605
606 spin_lock(&j->lock);
607 /* Pin might have been dropped or rearmed: */
608 if (likely(!err && !j->flush_in_progress_dropped))
609 list_move(&pin->list, &journal_seq_pin(j, seq)->flushed[journal_pin_type(pin, flush_fn)]);
610 j->flush_in_progress = NULL;
611 j->flush_in_progress_dropped = false;
612 spin_unlock(&j->lock);
613
614 wake_up(&j->pin_flush_wait);
615
616 if (err)
617 break;
618
619 nr_flushed++;
620 }
621
622 return nr_flushed;
623 }
624
journal_seq_to_flush(struct journal * j)625 static u64 journal_seq_to_flush(struct journal *j)
626 {
627 struct bch_fs *c = container_of(j, struct bch_fs, journal);
628 u64 seq_to_flush = 0;
629
630 guard(spinlock)(&j->lock);
631 guard(rcu)();
632
633 for_each_rw_member_rcu(c, ca) {
634 struct journal_device *ja = &ca->journal;
635 unsigned nr_buckets, bucket_to_flush;
636
637 if (!ja->nr)
638 continue;
639
640 /* Try to keep the journal at most half full: */
641 nr_buckets = ja->nr / 2;
642
643 bucket_to_flush = (ja->cur_idx + nr_buckets) % ja->nr;
644 seq_to_flush = max(seq_to_flush,
645 ja->bucket_seq[bucket_to_flush]);
646 }
647
648 /* Also flush if the pin fifo is more than half full */
649 return max_t(s64, seq_to_flush,
650 (s64) journal_cur_seq(j) -
651 (j->pin.size >> 1));
652 }
653
654 /**
655 * __bch2_journal_reclaim - free up journal buckets
656 * @j: journal object
657 * @direct: direct or background reclaim?
658 * @kicked: requested to run since we last ran?
659 *
660 * Background journal reclaim writes out btree nodes. It should be run
661 * early enough so that we never completely run out of journal buckets.
662 *
663 * High watermarks for triggering background reclaim:
664 * - FIFO has fewer than 512 entries left
665 * - fewer than 25% journal buckets free
666 *
667 * Background reclaim runs until low watermarks are reached:
668 * - FIFO has more than 1024 entries left
669 * - more than 50% journal buckets free
670 *
671 * As long as a reclaim can complete in the time it takes to fill up
672 * 512 journal entries or 25% of all journal buckets, then
673 * journal_next_bucket() should not stall.
674 */
__bch2_journal_reclaim(struct journal * j,bool direct,bool kicked)675 static int __bch2_journal_reclaim(struct journal *j, bool direct, bool kicked)
676 {
677 struct bch_fs *c = container_of(j, struct bch_fs, journal);
678 struct btree_cache *bc = &c->btree_cache;
679 bool kthread = (current->flags & PF_KTHREAD) != 0;
680 u64 seq_to_flush;
681 size_t min_nr, min_key_cache, nr_flushed;
682 unsigned flags;
683 int ret = 0;
684
685 /*
686 * We can't invoke memory reclaim while holding the reclaim_lock -
687 * journal reclaim is required to make progress for memory reclaim
688 * (cleaning the caches), so we can't get stuck in memory reclaim while
689 * we're holding the reclaim lock:
690 */
691 lockdep_assert_held(&j->reclaim_lock);
692 flags = memalloc_noreclaim_save();
693
694 do {
695 if (kthread && kthread_should_stop())
696 break;
697
698 ret = bch2_journal_error(j);
699 if (ret)
700 break;
701
702 /* XXX shove journal discards off to another thread */
703 bch2_journal_do_discards(j);
704
705 seq_to_flush = journal_seq_to_flush(j);
706 min_nr = 0;
707
708 /*
709 * If it's been longer than j->reclaim_delay_ms since we last flushed,
710 * make sure to flush at least one journal pin:
711 */
712 if (time_after(jiffies, j->last_flushed +
713 msecs_to_jiffies(c->opts.journal_reclaim_delay)))
714 min_nr = 1;
715
716 if (j->watermark != BCH_WATERMARK_stripe)
717 min_nr = 1;
718
719 size_t btree_cache_live = bc->live[0].nr + bc->live[1].nr;
720 if (atomic_long_read(&bc->nr_dirty) * 2 > btree_cache_live)
721 min_nr = 1;
722
723 min_key_cache = min(bch2_nr_btree_keys_need_flush(c), (size_t) 128);
724
725 trace_and_count(c, journal_reclaim_start, c,
726 direct, kicked,
727 min_nr, min_key_cache,
728 atomic_long_read(&bc->nr_dirty), btree_cache_live,
729 atomic_long_read(&c->btree_key_cache.nr_dirty),
730 atomic_long_read(&c->btree_key_cache.nr_keys));
731
732 nr_flushed = journal_flush_pins(j, seq_to_flush,
733 ~0, 0,
734 min_nr, min_key_cache);
735
736 if (direct)
737 j->nr_direct_reclaim += nr_flushed;
738 else
739 j->nr_background_reclaim += nr_flushed;
740 trace_and_count(c, journal_reclaim_finish, c, nr_flushed);
741
742 if (nr_flushed)
743 wake_up(&j->reclaim_wait);
744 } while ((min_nr || min_key_cache) && nr_flushed && !direct);
745
746 memalloc_noreclaim_restore(flags);
747
748 return ret;
749 }
750
bch2_journal_reclaim(struct journal * j)751 int bch2_journal_reclaim(struct journal *j)
752 {
753 return __bch2_journal_reclaim(j, true, true);
754 }
755
bch2_journal_reclaim_thread(void * arg)756 static int bch2_journal_reclaim_thread(void *arg)
757 {
758 struct journal *j = arg;
759 struct bch_fs *c = container_of(j, struct bch_fs, journal);
760 unsigned long delay, now;
761 bool journal_empty;
762 int ret = 0;
763
764 set_freezable();
765
766 j->last_flushed = jiffies;
767
768 while (!ret && !kthread_should_stop()) {
769 bool kicked = j->reclaim_kicked;
770
771 j->reclaim_kicked = false;
772
773 mutex_lock(&j->reclaim_lock);
774 ret = __bch2_journal_reclaim(j, false, kicked);
775 mutex_unlock(&j->reclaim_lock);
776
777 now = jiffies;
778 delay = msecs_to_jiffies(c->opts.journal_reclaim_delay);
779 j->next_reclaim = j->last_flushed + delay;
780
781 if (!time_in_range(j->next_reclaim, now, now + delay))
782 j->next_reclaim = now + delay;
783
784 while (1) {
785 set_current_state(TASK_INTERRUPTIBLE|TASK_FREEZABLE);
786 if (kthread_should_stop())
787 break;
788 if (j->reclaim_kicked)
789 break;
790
791 spin_lock(&j->lock);
792 journal_empty = fifo_empty(&j->pin);
793 spin_unlock(&j->lock);
794
795 long timeout = j->next_reclaim - jiffies;
796
797 if (journal_empty)
798 schedule();
799 else if (timeout > 0)
800 schedule_timeout(timeout);
801 else
802 break;
803 }
804 __set_current_state(TASK_RUNNING);
805 }
806
807 return 0;
808 }
809
bch2_journal_reclaim_stop(struct journal * j)810 void bch2_journal_reclaim_stop(struct journal *j)
811 {
812 struct task_struct *p = j->reclaim_thread;
813
814 j->reclaim_thread = NULL;
815
816 if (p) {
817 kthread_stop(p);
818 put_task_struct(p);
819 }
820 }
821
bch2_journal_reclaim_start(struct journal * j)822 int bch2_journal_reclaim_start(struct journal *j)
823 {
824 struct bch_fs *c = container_of(j, struct bch_fs, journal);
825 struct task_struct *p;
826 int ret;
827
828 if (j->reclaim_thread)
829 return 0;
830
831 p = kthread_create(bch2_journal_reclaim_thread, j,
832 "bch-reclaim/%s", c->name);
833 ret = PTR_ERR_OR_ZERO(p);
834 bch_err_msg(c, ret, "creating journal reclaim thread");
835 if (ret)
836 return ret;
837
838 get_task_struct(p);
839 j->reclaim_thread = p;
840 wake_up_process(p);
841 return 0;
842 }
843
journal_pins_still_flushing(struct journal * j,u64 seq_to_flush,unsigned types)844 static bool journal_pins_still_flushing(struct journal *j, u64 seq_to_flush,
845 unsigned types)
846 {
847 struct journal_entry_pin_list *pin_list;
848 u64 seq;
849
850 spin_lock(&j->lock);
851 fifo_for_each_entry_ptr(pin_list, &j->pin, seq) {
852 if (seq > seq_to_flush)
853 break;
854
855 for (unsigned i = 0; i < JOURNAL_PIN_TYPE_NR; i++)
856 if ((BIT(i) & types) &&
857 (!list_empty(&pin_list->unflushed[i]) ||
858 !list_empty(&pin_list->flushed[i]))) {
859 spin_unlock(&j->lock);
860 return true;
861 }
862 }
863 spin_unlock(&j->lock);
864
865 return false;
866 }
867
journal_flush_pins_or_still_flushing(struct journal * j,u64 seq_to_flush,unsigned types)868 static bool journal_flush_pins_or_still_flushing(struct journal *j, u64 seq_to_flush,
869 unsigned types)
870 {
871 return journal_flush_pins(j, seq_to_flush, types, 0, 0, 0) ||
872 journal_pins_still_flushing(j, seq_to_flush, types);
873 }
874
journal_flush_done(struct journal * j,u64 seq_to_flush,bool * did_work)875 static int journal_flush_done(struct journal *j, u64 seq_to_flush,
876 bool *did_work)
877 {
878 int ret = 0;
879
880 ret = bch2_journal_error(j);
881 if (ret)
882 return ret;
883
884 mutex_lock(&j->reclaim_lock);
885
886 for (int type = JOURNAL_PIN_TYPE_NR - 1;
887 type >= 0;
888 --type)
889 if (journal_flush_pins_or_still_flushing(j, seq_to_flush, BIT(type))) {
890 *did_work = true;
891 goto unlock;
892 }
893
894 if (seq_to_flush > journal_cur_seq(j))
895 bch2_journal_entry_close(j);
896
897 spin_lock(&j->lock);
898 /*
899 * If journal replay hasn't completed, the unreplayed journal entries
900 * hold refs on their corresponding sequence numbers
901 */
902 ret = !test_bit(JOURNAL_replay_done, &j->flags) ||
903 journal_last_seq(j) > seq_to_flush ||
904 !fifo_used(&j->pin);
905
906 spin_unlock(&j->lock);
907 unlock:
908 mutex_unlock(&j->reclaim_lock);
909
910 return ret;
911 }
912
bch2_journal_flush_pins(struct journal * j,u64 seq_to_flush)913 bool bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
914 {
915 /* time_stats this */
916 bool did_work = false;
917
918 if (!test_bit(JOURNAL_running, &j->flags))
919 return false;
920
921 closure_wait_event(&j->reclaim_flush_wait,
922 journal_flush_done(j, seq_to_flush, &did_work));
923
924 return did_work;
925 }
926
bch2_journal_flush_device_pins(struct journal * j,int dev_idx)927 int bch2_journal_flush_device_pins(struct journal *j, int dev_idx)
928 {
929 struct bch_fs *c = container_of(j, struct bch_fs, journal);
930 struct journal_entry_pin_list *p;
931 u64 iter, seq = 0;
932 int ret = 0;
933
934 spin_lock(&j->lock);
935 fifo_for_each_entry_ptr(p, &j->pin, iter)
936 if (dev_idx >= 0
937 ? bch2_dev_list_has_dev(p->devs, dev_idx)
938 : p->devs.nr < c->opts.metadata_replicas)
939 seq = iter;
940 spin_unlock(&j->lock);
941
942 bch2_journal_flush_pins(j, seq);
943
944 ret = bch2_journal_error(j);
945 if (ret)
946 return ret;
947
948 mutex_lock(&c->replicas_gc_lock);
949 bch2_replicas_gc_start(c, 1 << BCH_DATA_journal);
950
951 /*
952 * Now that we've populated replicas_gc, write to the journal to mark
953 * active journal devices. This handles the case where the journal might
954 * be empty. Otherwise we could clear all journal replicas and
955 * temporarily put the fs into an unrecoverable state. Journal recovery
956 * expects to find devices marked for journal data on unclean mount.
957 */
958 ret = bch2_journal_meta(&c->journal);
959 if (ret)
960 goto err;
961
962 seq = 0;
963 spin_lock(&j->lock);
964 while (!ret) {
965 union bch_replicas_padded replicas;
966
967 seq = max(seq, journal_last_seq(j));
968 if (seq >= j->pin.back)
969 break;
970 bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal,
971 journal_seq_pin(j, seq)->devs);
972 seq++;
973
974 if (replicas.e.nr_devs) {
975 spin_unlock(&j->lock);
976 ret = bch2_mark_replicas(c, &replicas.e);
977 spin_lock(&j->lock);
978 }
979 }
980 spin_unlock(&j->lock);
981 err:
982 ret = bch2_replicas_gc_end(c, ret);
983 mutex_unlock(&c->replicas_gc_lock);
984
985 return ret;
986 }
987
bch2_journal_seq_pins_to_text(struct printbuf * out,struct journal * j,u64 * seq)988 bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 *seq)
989 {
990 struct journal_entry_pin_list *pin_list;
991 struct journal_entry_pin *pin;
992
993 spin_lock(&j->lock);
994 if (!test_bit(JOURNAL_running, &j->flags)) {
995 spin_unlock(&j->lock);
996 return true;
997 }
998
999 *seq = max(*seq, j->pin.front);
1000
1001 if (*seq >= j->pin.back) {
1002 spin_unlock(&j->lock);
1003 return true;
1004 }
1005
1006 out->atomic++;
1007
1008 pin_list = journal_seq_pin(j, *seq);
1009
1010 prt_printf(out, "%llu: count %u\n", *seq, atomic_read(&pin_list->count));
1011 printbuf_indent_add(out, 2);
1012
1013 prt_printf(out, "unflushed:\n");
1014 for (unsigned i = 0; i < ARRAY_SIZE(pin_list->unflushed); i++)
1015 list_for_each_entry(pin, &pin_list->unflushed[i], list)
1016 prt_printf(out, "\t%px %ps\n", pin, pin->flush);
1017
1018 prt_printf(out, "flushed:\n");
1019 for (unsigned i = 0; i < ARRAY_SIZE(pin_list->flushed); i++)
1020 list_for_each_entry(pin, &pin_list->flushed[i], list)
1021 prt_printf(out, "\t%px %ps\n", pin, pin->flush);
1022
1023 printbuf_indent_sub(out, 2);
1024
1025 --out->atomic;
1026 spin_unlock(&j->lock);
1027
1028 return false;
1029 }
1030
bch2_journal_pins_to_text(struct printbuf * out,struct journal * j)1031 void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j)
1032 {
1033 u64 seq = 0;
1034
1035 while (!bch2_journal_seq_pins_to_text(out, j, &seq))
1036 seq++;
1037 }
1038