xref: /linux/fs/bcachefs/journal_reclaim.c (revision 031fba65fc202abf1f193e321be7a2c274fd88ba)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include "bcachefs.h"
4 #include "btree_key_cache.h"
5 #include "btree_update.h"
6 #include "buckets.h"
7 #include "errcode.h"
8 #include "error.h"
9 #include "journal.h"
10 #include "journal_io.h"
11 #include "journal_reclaim.h"
12 #include "replicas.h"
13 #include "sb-members.h"
14 #include "trace.h"
15 
16 #include <linux/kthread.h>
17 #include <linux/sched/mm.h>
18 
19 /* Free space calculations: */
20 
21 static unsigned journal_space_from(struct journal_device *ja,
22 				   enum journal_space_from from)
23 {
24 	switch (from) {
25 	case journal_space_discarded:
26 		return ja->discard_idx;
27 	case journal_space_clean_ondisk:
28 		return ja->dirty_idx_ondisk;
29 	case journal_space_clean:
30 		return ja->dirty_idx;
31 	default:
32 		BUG();
33 	}
34 }
35 
36 unsigned bch2_journal_dev_buckets_available(struct journal *j,
37 					    struct journal_device *ja,
38 					    enum journal_space_from from)
39 {
40 	unsigned available = (journal_space_from(ja, from) -
41 			      ja->cur_idx - 1 + ja->nr) % ja->nr;
42 
43 	/*
44 	 * Don't use the last bucket unless writing the new last_seq
45 	 * will make another bucket available:
46 	 */
47 	if (available && ja->dirty_idx_ondisk == ja->dirty_idx)
48 		--available;
49 
50 	return available;
51 }
52 
53 static void journal_set_remaining(struct journal *j, unsigned u64s_remaining)
54 {
55 	union journal_preres_state old, new;
56 	u64 v = atomic64_read(&j->prereserved.counter);
57 
58 	do {
59 		old.v = new.v = v;
60 		new.remaining = u64s_remaining;
61 	} while ((v = atomic64_cmpxchg(&j->prereserved.counter,
62 				       old.v, new.v)) != old.v);
63 }
64 
65 static struct journal_space
66 journal_dev_space_available(struct journal *j, struct bch_dev *ca,
67 			    enum journal_space_from from)
68 {
69 	struct journal_device *ja = &ca->journal;
70 	unsigned sectors, buckets, unwritten;
71 	u64 seq;
72 
73 	if (from == journal_space_total)
74 		return (struct journal_space) {
75 			.next_entry	= ca->mi.bucket_size,
76 			.total		= ca->mi.bucket_size * ja->nr,
77 		};
78 
79 	buckets = bch2_journal_dev_buckets_available(j, ja, from);
80 	sectors = ja->sectors_free;
81 
82 	/*
83 	 * We that we don't allocate the space for a journal entry
84 	 * until we write it out - thus, account for it here:
85 	 */
86 	for (seq = journal_last_unwritten_seq(j);
87 	     seq <= journal_cur_seq(j);
88 	     seq++) {
89 		unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors;
90 
91 		if (!unwritten)
92 			continue;
93 
94 		/* entry won't fit on this device, skip: */
95 		if (unwritten > ca->mi.bucket_size)
96 			continue;
97 
98 		if (unwritten >= sectors) {
99 			if (!buckets) {
100 				sectors = 0;
101 				break;
102 			}
103 
104 			buckets--;
105 			sectors = ca->mi.bucket_size;
106 		}
107 
108 		sectors -= unwritten;
109 	}
110 
111 	if (sectors < ca->mi.bucket_size && buckets) {
112 		buckets--;
113 		sectors = ca->mi.bucket_size;
114 	}
115 
116 	return (struct journal_space) {
117 		.next_entry	= sectors,
118 		.total		= sectors + buckets * ca->mi.bucket_size,
119 	};
120 }
121 
122 static struct journal_space __journal_space_available(struct journal *j, unsigned nr_devs_want,
123 			    enum journal_space_from from)
124 {
125 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
126 	struct bch_dev *ca;
127 	unsigned i, pos, nr_devs = 0;
128 	struct journal_space space, dev_space[BCH_SB_MEMBERS_MAX];
129 
130 	BUG_ON(nr_devs_want > ARRAY_SIZE(dev_space));
131 
132 	rcu_read_lock();
133 	for_each_member_device_rcu(ca, c, i,
134 				   &c->rw_devs[BCH_DATA_journal]) {
135 		if (!ca->journal.nr)
136 			continue;
137 
138 		space = journal_dev_space_available(j, ca, from);
139 		if (!space.next_entry)
140 			continue;
141 
142 		for (pos = 0; pos < nr_devs; pos++)
143 			if (space.total > dev_space[pos].total)
144 				break;
145 
146 		array_insert_item(dev_space, nr_devs, pos, space);
147 	}
148 	rcu_read_unlock();
149 
150 	if (nr_devs < nr_devs_want)
151 		return (struct journal_space) { 0, 0 };
152 
153 	/*
154 	 * We sorted largest to smallest, and we want the smallest out of the
155 	 * @nr_devs_want largest devices:
156 	 */
157 	return dev_space[nr_devs_want - 1];
158 }
159 
160 void bch2_journal_space_available(struct journal *j)
161 {
162 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
163 	struct bch_dev *ca;
164 	unsigned clean, clean_ondisk, total;
165 	s64 u64s_remaining = 0;
166 	unsigned max_entry_size	 = min(j->buf[0].buf_size >> 9,
167 				       j->buf[1].buf_size >> 9);
168 	unsigned i, nr_online = 0, nr_devs_want;
169 	bool can_discard = false;
170 	int ret = 0;
171 
172 	lockdep_assert_held(&j->lock);
173 
174 	rcu_read_lock();
175 	for_each_member_device_rcu(ca, c, i,
176 				   &c->rw_devs[BCH_DATA_journal]) {
177 		struct journal_device *ja = &ca->journal;
178 
179 		if (!ja->nr)
180 			continue;
181 
182 		while (ja->dirty_idx != ja->cur_idx &&
183 		       ja->bucket_seq[ja->dirty_idx] < journal_last_seq(j))
184 			ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
185 
186 		while (ja->dirty_idx_ondisk != ja->dirty_idx &&
187 		       ja->bucket_seq[ja->dirty_idx_ondisk] < j->last_seq_ondisk)
188 			ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
189 
190 		if (ja->discard_idx != ja->dirty_idx_ondisk)
191 			can_discard = true;
192 
193 		max_entry_size = min_t(unsigned, max_entry_size, ca->mi.bucket_size);
194 		nr_online++;
195 	}
196 	rcu_read_unlock();
197 
198 	j->can_discard = can_discard;
199 
200 	if (nr_online < c->opts.metadata_replicas_required) {
201 		ret = JOURNAL_ERR_insufficient_devices;
202 		goto out;
203 	}
204 
205 	nr_devs_want = min_t(unsigned, nr_online, c->opts.metadata_replicas);
206 
207 	for (i = 0; i < journal_space_nr; i++)
208 		j->space[i] = __journal_space_available(j, nr_devs_want, i);
209 
210 	clean_ondisk	= j->space[journal_space_clean_ondisk].total;
211 	clean		= j->space[journal_space_clean].total;
212 	total		= j->space[journal_space_total].total;
213 
214 	if (!j->space[journal_space_discarded].next_entry)
215 		ret = JOURNAL_ERR_journal_full;
216 
217 	if ((j->space[journal_space_clean_ondisk].next_entry <
218 	     j->space[journal_space_clean_ondisk].total) &&
219 	    (clean - clean_ondisk <= total / 8) &&
220 	    (clean_ondisk * 2 > clean))
221 		set_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags);
222 	else
223 		clear_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags);
224 
225 	u64s_remaining  = (u64) clean << 6;
226 	u64s_remaining -= (u64) total << 3;
227 	u64s_remaining = max(0LL, u64s_remaining);
228 	u64s_remaining /= 4;
229 	u64s_remaining = min_t(u64, u64s_remaining, U32_MAX);
230 out:
231 	j->cur_entry_sectors	= !ret ? j->space[journal_space_discarded].next_entry : 0;
232 	j->cur_entry_error	= ret;
233 	journal_set_remaining(j, u64s_remaining);
234 	journal_set_watermark(j);
235 
236 	if (!ret)
237 		journal_wake(j);
238 }
239 
240 /* Discards - last part of journal reclaim: */
241 
242 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
243 {
244 	bool ret;
245 
246 	spin_lock(&j->lock);
247 	ret = ja->discard_idx != ja->dirty_idx_ondisk;
248 	spin_unlock(&j->lock);
249 
250 	return ret;
251 }
252 
253 /*
254  * Advance ja->discard_idx as long as it points to buckets that are no longer
255  * dirty, issuing discards if necessary:
256  */
257 void bch2_journal_do_discards(struct journal *j)
258 {
259 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
260 	struct bch_dev *ca;
261 	unsigned iter;
262 
263 	mutex_lock(&j->discard_lock);
264 
265 	for_each_rw_member(ca, c, iter) {
266 		struct journal_device *ja = &ca->journal;
267 
268 		while (should_discard_bucket(j, ja)) {
269 			if (!c->opts.nochanges &&
270 			    ca->mi.discard &&
271 			    bdev_max_discard_sectors(ca->disk_sb.bdev))
272 				blkdev_issue_discard(ca->disk_sb.bdev,
273 					bucket_to_sector(ca,
274 						ja->buckets[ja->discard_idx]),
275 					ca->mi.bucket_size, GFP_NOFS);
276 
277 			spin_lock(&j->lock);
278 			ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
279 
280 			bch2_journal_space_available(j);
281 			spin_unlock(&j->lock);
282 		}
283 	}
284 
285 	mutex_unlock(&j->discard_lock);
286 }
287 
288 /*
289  * Journal entry pinning - machinery for holding a reference on a given journal
290  * entry, holding it open to ensure it gets replayed during recovery:
291  */
292 
293 void bch2_journal_reclaim_fast(struct journal *j)
294 {
295 	bool popped = false;
296 
297 	lockdep_assert_held(&j->lock);
298 
299 	/*
300 	 * Unpin journal entries whose reference counts reached zero, meaning
301 	 * all btree nodes got written out
302 	 */
303 	while (!fifo_empty(&j->pin) &&
304 	       !atomic_read(&fifo_peek_front(&j->pin).count)) {
305 		j->pin.front++;
306 		popped = true;
307 	}
308 
309 	if (popped)
310 		bch2_journal_space_available(j);
311 }
312 
313 bool __bch2_journal_pin_put(struct journal *j, u64 seq)
314 {
315 	struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq);
316 
317 	return atomic_dec_and_test(&pin_list->count);
318 }
319 
320 void bch2_journal_pin_put(struct journal *j, u64 seq)
321 {
322 	if (__bch2_journal_pin_put(j, seq)) {
323 		spin_lock(&j->lock);
324 		bch2_journal_reclaim_fast(j);
325 		spin_unlock(&j->lock);
326 	}
327 }
328 
329 static inline bool __journal_pin_drop(struct journal *j,
330 				      struct journal_entry_pin *pin)
331 {
332 	struct journal_entry_pin_list *pin_list;
333 
334 	if (!journal_pin_active(pin))
335 		return false;
336 
337 	if (j->flush_in_progress == pin)
338 		j->flush_in_progress_dropped = true;
339 
340 	pin_list = journal_seq_pin(j, pin->seq);
341 	pin->seq = 0;
342 	list_del_init(&pin->list);
343 
344 	/*
345 	 * Unpinning a journal entry may make journal_next_bucket() succeed, if
346 	 * writing a new last_seq will now make another bucket available:
347 	 */
348 	return atomic_dec_and_test(&pin_list->count) &&
349 		pin_list == &fifo_peek_front(&j->pin);
350 }
351 
352 void bch2_journal_pin_drop(struct journal *j,
353 			   struct journal_entry_pin *pin)
354 {
355 	spin_lock(&j->lock);
356 	if (__journal_pin_drop(j, pin))
357 		bch2_journal_reclaim_fast(j);
358 	spin_unlock(&j->lock);
359 }
360 
361 static enum journal_pin_type journal_pin_type(journal_pin_flush_fn fn)
362 {
363 	if (fn == bch2_btree_node_flush0 ||
364 	    fn == bch2_btree_node_flush1)
365 		return JOURNAL_PIN_btree;
366 	else if (fn == bch2_btree_key_cache_journal_flush)
367 		return JOURNAL_PIN_key_cache;
368 	else
369 		return JOURNAL_PIN_other;
370 }
371 
372 void bch2_journal_pin_set(struct journal *j, u64 seq,
373 			  struct journal_entry_pin *pin,
374 			  journal_pin_flush_fn flush_fn)
375 {
376 	struct journal_entry_pin_list *pin_list;
377 	bool reclaim;
378 
379 	spin_lock(&j->lock);
380 
381 	if (seq < journal_last_seq(j)) {
382 		/*
383 		 * bch2_journal_pin_copy() raced with bch2_journal_pin_drop() on
384 		 * the src pin - with the pin dropped, the entry to pin might no
385 		 * longer to exist, but that means there's no longer anything to
386 		 * copy and we can bail out here:
387 		 */
388 		spin_unlock(&j->lock);
389 		return;
390 	}
391 
392 	pin_list = journal_seq_pin(j, seq);
393 
394 	reclaim = __journal_pin_drop(j, pin);
395 
396 	atomic_inc(&pin_list->count);
397 	pin->seq	= seq;
398 	pin->flush	= flush_fn;
399 
400 	if (flush_fn)
401 		list_add(&pin->list, &pin_list->list[journal_pin_type(flush_fn)]);
402 	else
403 		list_add(&pin->list, &pin_list->flushed);
404 
405 	if (reclaim)
406 		bch2_journal_reclaim_fast(j);
407 	spin_unlock(&j->lock);
408 
409 	/*
410 	 * If the journal is currently full,  we might want to call flush_fn
411 	 * immediately:
412 	 */
413 	journal_wake(j);
414 }
415 
416 /**
417  * bch2_journal_pin_flush: ensure journal pin callback is no longer running
418  * @j:		journal object
419  * @pin:	pin to flush
420  */
421 void bch2_journal_pin_flush(struct journal *j, struct journal_entry_pin *pin)
422 {
423 	BUG_ON(journal_pin_active(pin));
424 
425 	wait_event(j->pin_flush_wait, j->flush_in_progress != pin);
426 }
427 
428 /*
429  * Journal reclaim: flush references to open journal entries to reclaim space in
430  * the journal
431  *
432  * May be done by the journal code in the background as needed to free up space
433  * for more journal entries, or as part of doing a clean shutdown, or to migrate
434  * data off of a specific device:
435  */
436 
437 static struct journal_entry_pin *
438 journal_get_next_pin(struct journal *j,
439 		     u64 seq_to_flush,
440 		     unsigned allowed_below_seq,
441 		     unsigned allowed_above_seq,
442 		     u64 *seq)
443 {
444 	struct journal_entry_pin_list *pin_list;
445 	struct journal_entry_pin *ret = NULL;
446 	unsigned i;
447 
448 	fifo_for_each_entry_ptr(pin_list, &j->pin, *seq) {
449 		if (*seq > seq_to_flush && !allowed_above_seq)
450 			break;
451 
452 		for (i = 0; i < JOURNAL_PIN_NR; i++)
453 			if ((((1U << i) & allowed_below_seq) && *seq <= seq_to_flush) ||
454 			    ((1U << i) & allowed_above_seq)) {
455 				ret = list_first_entry_or_null(&pin_list->list[i],
456 					struct journal_entry_pin, list);
457 				if (ret)
458 					return ret;
459 			}
460 	}
461 
462 	return NULL;
463 }
464 
465 /* returns true if we did work */
466 static size_t journal_flush_pins(struct journal *j,
467 				 u64 seq_to_flush,
468 				 unsigned allowed_below_seq,
469 				 unsigned allowed_above_seq,
470 				 unsigned min_any,
471 				 unsigned min_key_cache)
472 {
473 	struct journal_entry_pin *pin;
474 	size_t nr_flushed = 0;
475 	journal_pin_flush_fn flush_fn;
476 	u64 seq;
477 	int err;
478 
479 	lockdep_assert_held(&j->reclaim_lock);
480 
481 	while (1) {
482 		unsigned allowed_above = allowed_above_seq;
483 		unsigned allowed_below = allowed_below_seq;
484 
485 		if (min_any) {
486 			allowed_above |= ~0;
487 			allowed_below |= ~0;
488 		}
489 
490 		if (min_key_cache) {
491 			allowed_above |= 1U << JOURNAL_PIN_key_cache;
492 			allowed_below |= 1U << JOURNAL_PIN_key_cache;
493 		}
494 
495 		cond_resched();
496 
497 		j->last_flushed = jiffies;
498 
499 		spin_lock(&j->lock);
500 		pin = journal_get_next_pin(j, seq_to_flush, allowed_below, allowed_above, &seq);
501 		if (pin) {
502 			BUG_ON(j->flush_in_progress);
503 			j->flush_in_progress = pin;
504 			j->flush_in_progress_dropped = false;
505 			flush_fn = pin->flush;
506 		}
507 		spin_unlock(&j->lock);
508 
509 		if (!pin)
510 			break;
511 
512 		if (min_key_cache && pin->flush == bch2_btree_key_cache_journal_flush)
513 			min_key_cache--;
514 
515 		if (min_any)
516 			min_any--;
517 
518 		err = flush_fn(j, pin, seq);
519 
520 		spin_lock(&j->lock);
521 		/* Pin might have been dropped or rearmed: */
522 		if (likely(!err && !j->flush_in_progress_dropped))
523 			list_move(&pin->list, &journal_seq_pin(j, seq)->flushed);
524 		j->flush_in_progress = NULL;
525 		j->flush_in_progress_dropped = false;
526 		spin_unlock(&j->lock);
527 
528 		wake_up(&j->pin_flush_wait);
529 
530 		if (err)
531 			break;
532 
533 		nr_flushed++;
534 	}
535 
536 	return nr_flushed;
537 }
538 
539 static u64 journal_seq_to_flush(struct journal *j)
540 {
541 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
542 	struct bch_dev *ca;
543 	u64 seq_to_flush = 0;
544 	unsigned iter;
545 
546 	spin_lock(&j->lock);
547 
548 	for_each_rw_member(ca, c, iter) {
549 		struct journal_device *ja = &ca->journal;
550 		unsigned nr_buckets, bucket_to_flush;
551 
552 		if (!ja->nr)
553 			continue;
554 
555 		/* Try to keep the journal at most half full: */
556 		nr_buckets = ja->nr / 2;
557 
558 		/* And include pre-reservations: */
559 		nr_buckets += DIV_ROUND_UP(j->prereserved.reserved,
560 					   (ca->mi.bucket_size << 6) -
561 					   journal_entry_overhead(j));
562 
563 		nr_buckets = min(nr_buckets, ja->nr);
564 
565 		bucket_to_flush = (ja->cur_idx + nr_buckets) % ja->nr;
566 		seq_to_flush = max(seq_to_flush,
567 				   ja->bucket_seq[bucket_to_flush]);
568 	}
569 
570 	/* Also flush if the pin fifo is more than half full */
571 	seq_to_flush = max_t(s64, seq_to_flush,
572 			     (s64) journal_cur_seq(j) -
573 			     (j->pin.size >> 1));
574 	spin_unlock(&j->lock);
575 
576 	return seq_to_flush;
577 }
578 
579 /**
580  * __bch2_journal_reclaim - free up journal buckets
581  * @j:		journal object
582  * @direct:	direct or background reclaim?
583  * @kicked:	requested to run since we last ran?
584  * Returns:	0 on success, or -EIO if the journal has been shutdown
585  *
586  * Background journal reclaim writes out btree nodes. It should be run
587  * early enough so that we never completely run out of journal buckets.
588  *
589  * High watermarks for triggering background reclaim:
590  * - FIFO has fewer than 512 entries left
591  * - fewer than 25% journal buckets free
592  *
593  * Background reclaim runs until low watermarks are reached:
594  * - FIFO has more than 1024 entries left
595  * - more than 50% journal buckets free
596  *
597  * As long as a reclaim can complete in the time it takes to fill up
598  * 512 journal entries or 25% of all journal buckets, then
599  * journal_next_bucket() should not stall.
600  */
601 static int __bch2_journal_reclaim(struct journal *j, bool direct, bool kicked)
602 {
603 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
604 	bool kthread = (current->flags & PF_KTHREAD) != 0;
605 	u64 seq_to_flush;
606 	size_t min_nr, min_key_cache, nr_flushed;
607 	unsigned flags;
608 	int ret = 0;
609 
610 	/*
611 	 * We can't invoke memory reclaim while holding the reclaim_lock -
612 	 * journal reclaim is required to make progress for memory reclaim
613 	 * (cleaning the caches), so we can't get stuck in memory reclaim while
614 	 * we're holding the reclaim lock:
615 	 */
616 	lockdep_assert_held(&j->reclaim_lock);
617 	flags = memalloc_noreclaim_save();
618 
619 	do {
620 		if (kthread && kthread_should_stop())
621 			break;
622 
623 		if (bch2_journal_error(j)) {
624 			ret = -EIO;
625 			break;
626 		}
627 
628 		bch2_journal_do_discards(j);
629 
630 		seq_to_flush = journal_seq_to_flush(j);
631 		min_nr = 0;
632 
633 		/*
634 		 * If it's been longer than j->reclaim_delay_ms since we last flushed,
635 		 * make sure to flush at least one journal pin:
636 		 */
637 		if (time_after(jiffies, j->last_flushed +
638 			       msecs_to_jiffies(c->opts.journal_reclaim_delay)))
639 			min_nr = 1;
640 
641 		if (j->prereserved.reserved * 4 > j->prereserved.remaining)
642 			min_nr = 1;
643 
644 		if (fifo_free(&j->pin) <= 32)
645 			min_nr = 1;
646 
647 		if (atomic_read(&c->btree_cache.dirty) * 2 > c->btree_cache.used)
648 			min_nr = 1;
649 
650 		min_key_cache = min(bch2_nr_btree_keys_need_flush(c), (size_t) 128);
651 
652 		trace_and_count(c, journal_reclaim_start, c,
653 				direct, kicked,
654 				min_nr, min_key_cache,
655 				j->prereserved.reserved,
656 				j->prereserved.remaining,
657 				atomic_read(&c->btree_cache.dirty),
658 				c->btree_cache.used,
659 				atomic_long_read(&c->btree_key_cache.nr_dirty),
660 				atomic_long_read(&c->btree_key_cache.nr_keys));
661 
662 		nr_flushed = journal_flush_pins(j, seq_to_flush,
663 						~0, 0,
664 						min_nr, min_key_cache);
665 
666 		if (direct)
667 			j->nr_direct_reclaim += nr_flushed;
668 		else
669 			j->nr_background_reclaim += nr_flushed;
670 		trace_and_count(c, journal_reclaim_finish, c, nr_flushed);
671 
672 		if (nr_flushed)
673 			wake_up(&j->reclaim_wait);
674 	} while ((min_nr || min_key_cache) && nr_flushed && !direct);
675 
676 	memalloc_noreclaim_restore(flags);
677 
678 	return ret;
679 }
680 
681 int bch2_journal_reclaim(struct journal *j)
682 {
683 	return __bch2_journal_reclaim(j, true, true);
684 }
685 
686 static int bch2_journal_reclaim_thread(void *arg)
687 {
688 	struct journal *j = arg;
689 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
690 	unsigned long delay, now;
691 	bool journal_empty;
692 	int ret = 0;
693 
694 	set_freezable();
695 
696 	j->last_flushed = jiffies;
697 
698 	while (!ret && !kthread_should_stop()) {
699 		bool kicked = j->reclaim_kicked;
700 
701 		j->reclaim_kicked = false;
702 
703 		mutex_lock(&j->reclaim_lock);
704 		ret = __bch2_journal_reclaim(j, false, kicked);
705 		mutex_unlock(&j->reclaim_lock);
706 
707 		now = jiffies;
708 		delay = msecs_to_jiffies(c->opts.journal_reclaim_delay);
709 		j->next_reclaim = j->last_flushed + delay;
710 
711 		if (!time_in_range(j->next_reclaim, now, now + delay))
712 			j->next_reclaim = now + delay;
713 
714 		while (1) {
715 			set_current_state(TASK_INTERRUPTIBLE|TASK_FREEZABLE);
716 			if (kthread_should_stop())
717 				break;
718 			if (j->reclaim_kicked)
719 				break;
720 
721 			spin_lock(&j->lock);
722 			journal_empty = fifo_empty(&j->pin);
723 			spin_unlock(&j->lock);
724 
725 			if (journal_empty)
726 				schedule();
727 			else if (time_after(j->next_reclaim, jiffies))
728 				schedule_timeout(j->next_reclaim - jiffies);
729 			else
730 				break;
731 		}
732 		__set_current_state(TASK_RUNNING);
733 	}
734 
735 	return 0;
736 }
737 
738 void bch2_journal_reclaim_stop(struct journal *j)
739 {
740 	struct task_struct *p = j->reclaim_thread;
741 
742 	j->reclaim_thread = NULL;
743 
744 	if (p) {
745 		kthread_stop(p);
746 		put_task_struct(p);
747 	}
748 }
749 
750 int bch2_journal_reclaim_start(struct journal *j)
751 {
752 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
753 	struct task_struct *p;
754 	int ret;
755 
756 	if (j->reclaim_thread)
757 		return 0;
758 
759 	p = kthread_create(bch2_journal_reclaim_thread, j,
760 			   "bch-reclaim/%s", c->name);
761 	ret = PTR_ERR_OR_ZERO(p);
762 	if (ret) {
763 		bch_err_msg(c, ret, "creating journal reclaim thread");
764 		return ret;
765 	}
766 
767 	get_task_struct(p);
768 	j->reclaim_thread = p;
769 	wake_up_process(p);
770 	return 0;
771 }
772 
773 static int journal_flush_done(struct journal *j, u64 seq_to_flush,
774 			      bool *did_work)
775 {
776 	int ret;
777 
778 	ret = bch2_journal_error(j);
779 	if (ret)
780 		return ret;
781 
782 	mutex_lock(&j->reclaim_lock);
783 
784 	if (journal_flush_pins(j, seq_to_flush,
785 			       (1U << JOURNAL_PIN_key_cache)|
786 			       (1U << JOURNAL_PIN_other), 0, 0, 0) ||
787 	    journal_flush_pins(j, seq_to_flush,
788 			       (1U << JOURNAL_PIN_btree), 0, 0, 0))
789 		*did_work = true;
790 
791 	spin_lock(&j->lock);
792 	/*
793 	 * If journal replay hasn't completed, the unreplayed journal entries
794 	 * hold refs on their corresponding sequence numbers
795 	 */
796 	ret = !test_bit(JOURNAL_REPLAY_DONE, &j->flags) ||
797 		journal_last_seq(j) > seq_to_flush ||
798 		!fifo_used(&j->pin);
799 
800 	spin_unlock(&j->lock);
801 	mutex_unlock(&j->reclaim_lock);
802 
803 	return ret;
804 }
805 
806 bool bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
807 {
808 	bool did_work = false;
809 
810 	if (!test_bit(JOURNAL_STARTED, &j->flags))
811 		return false;
812 
813 	closure_wait_event(&j->async_wait,
814 		journal_flush_done(j, seq_to_flush, &did_work));
815 
816 	return did_work;
817 }
818 
819 int bch2_journal_flush_device_pins(struct journal *j, int dev_idx)
820 {
821 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
822 	struct journal_entry_pin_list *p;
823 	u64 iter, seq = 0;
824 	int ret = 0;
825 
826 	spin_lock(&j->lock);
827 	fifo_for_each_entry_ptr(p, &j->pin, iter)
828 		if (dev_idx >= 0
829 		    ? bch2_dev_list_has_dev(p->devs, dev_idx)
830 		    : p->devs.nr < c->opts.metadata_replicas)
831 			seq = iter;
832 	spin_unlock(&j->lock);
833 
834 	bch2_journal_flush_pins(j, seq);
835 
836 	ret = bch2_journal_error(j);
837 	if (ret)
838 		return ret;
839 
840 	mutex_lock(&c->replicas_gc_lock);
841 	bch2_replicas_gc_start(c, 1 << BCH_DATA_journal);
842 
843 	/*
844 	 * Now that we've populated replicas_gc, write to the journal to mark
845 	 * active journal devices. This handles the case where the journal might
846 	 * be empty. Otherwise we could clear all journal replicas and
847 	 * temporarily put the fs into an unrecoverable state. Journal recovery
848 	 * expects to find devices marked for journal data on unclean mount.
849 	 */
850 	ret = bch2_journal_meta(&c->journal);
851 	if (ret)
852 		goto err;
853 
854 	seq = 0;
855 	spin_lock(&j->lock);
856 	while (!ret) {
857 		struct bch_replicas_padded replicas;
858 
859 		seq = max(seq, journal_last_seq(j));
860 		if (seq >= j->pin.back)
861 			break;
862 		bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal,
863 					 journal_seq_pin(j, seq)->devs);
864 		seq++;
865 
866 		spin_unlock(&j->lock);
867 		ret = bch2_mark_replicas(c, &replicas.e);
868 		spin_lock(&j->lock);
869 	}
870 	spin_unlock(&j->lock);
871 err:
872 	ret = bch2_replicas_gc_end(c, ret);
873 	mutex_unlock(&c->replicas_gc_lock);
874 
875 	return ret;
876 }
877