xref: /linux/fs/bcachefs/journal_reclaim.c (revision e3234e547a4db0572e271e490d044bdb4cb7233b)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include "bcachefs.h"
4 #include "btree_key_cache.h"
5 #include "btree_update.h"
6 #include "buckets.h"
7 #include "errcode.h"
8 #include "error.h"
9 #include "journal.h"
10 #include "journal_io.h"
11 #include "journal_reclaim.h"
12 #include "replicas.h"
13 #include "sb-members.h"
14 #include "trace.h"
15 
16 #include <linux/kthread.h>
17 #include <linux/sched/mm.h>
18 
19 /* Free space calculations: */
20 
21 static unsigned journal_space_from(struct journal_device *ja,
22 				   enum journal_space_from from)
23 {
24 	switch (from) {
25 	case journal_space_discarded:
26 		return ja->discard_idx;
27 	case journal_space_clean_ondisk:
28 		return ja->dirty_idx_ondisk;
29 	case journal_space_clean:
30 		return ja->dirty_idx;
31 	default:
32 		BUG();
33 	}
34 }
35 
36 unsigned bch2_journal_dev_buckets_available(struct journal *j,
37 					    struct journal_device *ja,
38 					    enum journal_space_from from)
39 {
40 	unsigned available = (journal_space_from(ja, from) -
41 			      ja->cur_idx - 1 + ja->nr) % ja->nr;
42 
43 	/*
44 	 * Don't use the last bucket unless writing the new last_seq
45 	 * will make another bucket available:
46 	 */
47 	if (available && ja->dirty_idx_ondisk == ja->dirty_idx)
48 		--available;
49 
50 	return available;
51 }
52 
53 static inline void journal_set_watermark(struct journal *j, bool low_on_space)
54 {
55 	unsigned watermark = BCH_WATERMARK_stripe;
56 
57 	if (low_on_space)
58 		watermark = max_t(unsigned, watermark, BCH_WATERMARK_reclaim);
59 	if (fifo_free(&j->pin) < j->pin.size / 4)
60 		watermark = max_t(unsigned, watermark, BCH_WATERMARK_reclaim);
61 
62 	if (watermark == j->watermark)
63 		return;
64 
65 	swap(watermark, j->watermark);
66 	if (watermark > j->watermark)
67 		journal_wake(j);
68 }
69 
70 static struct journal_space
71 journal_dev_space_available(struct journal *j, struct bch_dev *ca,
72 			    enum journal_space_from from)
73 {
74 	struct journal_device *ja = &ca->journal;
75 	unsigned sectors, buckets, unwritten;
76 	u64 seq;
77 
78 	if (from == journal_space_total)
79 		return (struct journal_space) {
80 			.next_entry	= ca->mi.bucket_size,
81 			.total		= ca->mi.bucket_size * ja->nr,
82 		};
83 
84 	buckets = bch2_journal_dev_buckets_available(j, ja, from);
85 	sectors = ja->sectors_free;
86 
87 	/*
88 	 * We that we don't allocate the space for a journal entry
89 	 * until we write it out - thus, account for it here:
90 	 */
91 	for (seq = journal_last_unwritten_seq(j);
92 	     seq <= journal_cur_seq(j);
93 	     seq++) {
94 		unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors;
95 
96 		if (!unwritten)
97 			continue;
98 
99 		/* entry won't fit on this device, skip: */
100 		if (unwritten > ca->mi.bucket_size)
101 			continue;
102 
103 		if (unwritten >= sectors) {
104 			if (!buckets) {
105 				sectors = 0;
106 				break;
107 			}
108 
109 			buckets--;
110 			sectors = ca->mi.bucket_size;
111 		}
112 
113 		sectors -= unwritten;
114 	}
115 
116 	if (sectors < ca->mi.bucket_size && buckets) {
117 		buckets--;
118 		sectors = ca->mi.bucket_size;
119 	}
120 
121 	return (struct journal_space) {
122 		.next_entry	= sectors,
123 		.total		= sectors + buckets * ca->mi.bucket_size,
124 	};
125 }
126 
127 static struct journal_space __journal_space_available(struct journal *j, unsigned nr_devs_want,
128 			    enum journal_space_from from)
129 {
130 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
131 	struct bch_dev *ca;
132 	unsigned i, pos, nr_devs = 0;
133 	struct journal_space space, dev_space[BCH_SB_MEMBERS_MAX];
134 
135 	BUG_ON(nr_devs_want > ARRAY_SIZE(dev_space));
136 
137 	rcu_read_lock();
138 	for_each_member_device_rcu(ca, c, i,
139 				   &c->rw_devs[BCH_DATA_journal]) {
140 		if (!ca->journal.nr)
141 			continue;
142 
143 		space = journal_dev_space_available(j, ca, from);
144 		if (!space.next_entry)
145 			continue;
146 
147 		for (pos = 0; pos < nr_devs; pos++)
148 			if (space.total > dev_space[pos].total)
149 				break;
150 
151 		array_insert_item(dev_space, nr_devs, pos, space);
152 	}
153 	rcu_read_unlock();
154 
155 	if (nr_devs < nr_devs_want)
156 		return (struct journal_space) { 0, 0 };
157 
158 	/*
159 	 * We sorted largest to smallest, and we want the smallest out of the
160 	 * @nr_devs_want largest devices:
161 	 */
162 	return dev_space[nr_devs_want - 1];
163 }
164 
165 void bch2_journal_space_available(struct journal *j)
166 {
167 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
168 	struct bch_dev *ca;
169 	unsigned clean, clean_ondisk, total;
170 	unsigned max_entry_size	 = min(j->buf[0].buf_size >> 9,
171 				       j->buf[1].buf_size >> 9);
172 	unsigned i, nr_online = 0, nr_devs_want;
173 	bool can_discard = false;
174 	int ret = 0;
175 
176 	lockdep_assert_held(&j->lock);
177 
178 	rcu_read_lock();
179 	for_each_member_device_rcu(ca, c, i,
180 				   &c->rw_devs[BCH_DATA_journal]) {
181 		struct journal_device *ja = &ca->journal;
182 
183 		if (!ja->nr)
184 			continue;
185 
186 		while (ja->dirty_idx != ja->cur_idx &&
187 		       ja->bucket_seq[ja->dirty_idx] < journal_last_seq(j))
188 			ja->dirty_idx = (ja->dirty_idx + 1) % ja->nr;
189 
190 		while (ja->dirty_idx_ondisk != ja->dirty_idx &&
191 		       ja->bucket_seq[ja->dirty_idx_ondisk] < j->last_seq_ondisk)
192 			ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + 1) % ja->nr;
193 
194 		if (ja->discard_idx != ja->dirty_idx_ondisk)
195 			can_discard = true;
196 
197 		max_entry_size = min_t(unsigned, max_entry_size, ca->mi.bucket_size);
198 		nr_online++;
199 	}
200 	rcu_read_unlock();
201 
202 	j->can_discard = can_discard;
203 
204 	if (nr_online < c->opts.metadata_replicas_required) {
205 		ret = JOURNAL_ERR_insufficient_devices;
206 		goto out;
207 	}
208 
209 	nr_devs_want = min_t(unsigned, nr_online, c->opts.metadata_replicas);
210 
211 	for (i = 0; i < journal_space_nr; i++)
212 		j->space[i] = __journal_space_available(j, nr_devs_want, i);
213 
214 	clean_ondisk	= j->space[journal_space_clean_ondisk].total;
215 	clean		= j->space[journal_space_clean].total;
216 	total		= j->space[journal_space_total].total;
217 
218 	if (!j->space[journal_space_discarded].next_entry)
219 		ret = JOURNAL_ERR_journal_full;
220 
221 	if ((j->space[journal_space_clean_ondisk].next_entry <
222 	     j->space[journal_space_clean_ondisk].total) &&
223 	    (clean - clean_ondisk <= total / 8) &&
224 	    (clean_ondisk * 2 > clean))
225 		set_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags);
226 	else
227 		clear_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags);
228 
229 	journal_set_watermark(j, clean * 4 <= total);
230 out:
231 	j->cur_entry_sectors	= !ret ? j->space[journal_space_discarded].next_entry : 0;
232 	j->cur_entry_error	= ret;
233 
234 	if (!ret)
235 		journal_wake(j);
236 }
237 
238 /* Discards - last part of journal reclaim: */
239 
240 static bool should_discard_bucket(struct journal *j, struct journal_device *ja)
241 {
242 	bool ret;
243 
244 	spin_lock(&j->lock);
245 	ret = ja->discard_idx != ja->dirty_idx_ondisk;
246 	spin_unlock(&j->lock);
247 
248 	return ret;
249 }
250 
251 /*
252  * Advance ja->discard_idx as long as it points to buckets that are no longer
253  * dirty, issuing discards if necessary:
254  */
255 void bch2_journal_do_discards(struct journal *j)
256 {
257 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
258 	struct bch_dev *ca;
259 	unsigned iter;
260 
261 	mutex_lock(&j->discard_lock);
262 
263 	for_each_rw_member(ca, c, iter) {
264 		struct journal_device *ja = &ca->journal;
265 
266 		while (should_discard_bucket(j, ja)) {
267 			if (!c->opts.nochanges &&
268 			    ca->mi.discard &&
269 			    bdev_max_discard_sectors(ca->disk_sb.bdev))
270 				blkdev_issue_discard(ca->disk_sb.bdev,
271 					bucket_to_sector(ca,
272 						ja->buckets[ja->discard_idx]),
273 					ca->mi.bucket_size, GFP_NOFS);
274 
275 			spin_lock(&j->lock);
276 			ja->discard_idx = (ja->discard_idx + 1) % ja->nr;
277 
278 			bch2_journal_space_available(j);
279 			spin_unlock(&j->lock);
280 		}
281 	}
282 
283 	mutex_unlock(&j->discard_lock);
284 }
285 
286 /*
287  * Journal entry pinning - machinery for holding a reference on a given journal
288  * entry, holding it open to ensure it gets replayed during recovery:
289  */
290 
291 void bch2_journal_reclaim_fast(struct journal *j)
292 {
293 	bool popped = false;
294 
295 	lockdep_assert_held(&j->lock);
296 
297 	/*
298 	 * Unpin journal entries whose reference counts reached zero, meaning
299 	 * all btree nodes got written out
300 	 */
301 	while (!fifo_empty(&j->pin) &&
302 	       !atomic_read(&fifo_peek_front(&j->pin).count)) {
303 		j->pin.front++;
304 		popped = true;
305 	}
306 
307 	if (popped)
308 		bch2_journal_space_available(j);
309 }
310 
311 bool __bch2_journal_pin_put(struct journal *j, u64 seq)
312 {
313 	struct journal_entry_pin_list *pin_list = journal_seq_pin(j, seq);
314 
315 	return atomic_dec_and_test(&pin_list->count);
316 }
317 
318 void bch2_journal_pin_put(struct journal *j, u64 seq)
319 {
320 	if (__bch2_journal_pin_put(j, seq)) {
321 		spin_lock(&j->lock);
322 		bch2_journal_reclaim_fast(j);
323 		spin_unlock(&j->lock);
324 	}
325 }
326 
327 static inline bool __journal_pin_drop(struct journal *j,
328 				      struct journal_entry_pin *pin)
329 {
330 	struct journal_entry_pin_list *pin_list;
331 
332 	if (!journal_pin_active(pin))
333 		return false;
334 
335 	if (j->flush_in_progress == pin)
336 		j->flush_in_progress_dropped = true;
337 
338 	pin_list = journal_seq_pin(j, pin->seq);
339 	pin->seq = 0;
340 	list_del_init(&pin->list);
341 
342 	/*
343 	 * Unpinning a journal entry may make journal_next_bucket() succeed, if
344 	 * writing a new last_seq will now make another bucket available:
345 	 */
346 	return atomic_dec_and_test(&pin_list->count) &&
347 		pin_list == &fifo_peek_front(&j->pin);
348 }
349 
350 void bch2_journal_pin_drop(struct journal *j,
351 			   struct journal_entry_pin *pin)
352 {
353 	spin_lock(&j->lock);
354 	if (__journal_pin_drop(j, pin))
355 		bch2_journal_reclaim_fast(j);
356 	spin_unlock(&j->lock);
357 }
358 
359 static enum journal_pin_type journal_pin_type(journal_pin_flush_fn fn)
360 {
361 	if (fn == bch2_btree_node_flush0 ||
362 	    fn == bch2_btree_node_flush1)
363 		return JOURNAL_PIN_btree;
364 	else if (fn == bch2_btree_key_cache_journal_flush)
365 		return JOURNAL_PIN_key_cache;
366 	else
367 		return JOURNAL_PIN_other;
368 }
369 
370 void bch2_journal_pin_set(struct journal *j, u64 seq,
371 			  struct journal_entry_pin *pin,
372 			  journal_pin_flush_fn flush_fn)
373 {
374 	struct journal_entry_pin_list *pin_list;
375 	bool reclaim;
376 
377 	spin_lock(&j->lock);
378 
379 	if (seq < journal_last_seq(j)) {
380 		/*
381 		 * bch2_journal_pin_copy() raced with bch2_journal_pin_drop() on
382 		 * the src pin - with the pin dropped, the entry to pin might no
383 		 * longer to exist, but that means there's no longer anything to
384 		 * copy and we can bail out here:
385 		 */
386 		spin_unlock(&j->lock);
387 		return;
388 	}
389 
390 	pin_list = journal_seq_pin(j, seq);
391 
392 	reclaim = __journal_pin_drop(j, pin);
393 
394 	atomic_inc(&pin_list->count);
395 	pin->seq	= seq;
396 	pin->flush	= flush_fn;
397 
398 	if (flush_fn)
399 		list_add(&pin->list, &pin_list->list[journal_pin_type(flush_fn)]);
400 	else
401 		list_add(&pin->list, &pin_list->flushed);
402 
403 	if (reclaim)
404 		bch2_journal_reclaim_fast(j);
405 	spin_unlock(&j->lock);
406 
407 	/*
408 	 * If the journal is currently full,  we might want to call flush_fn
409 	 * immediately:
410 	 */
411 	journal_wake(j);
412 }
413 
414 /**
415  * bch2_journal_pin_flush: ensure journal pin callback is no longer running
416  * @j:		journal object
417  * @pin:	pin to flush
418  */
419 void bch2_journal_pin_flush(struct journal *j, struct journal_entry_pin *pin)
420 {
421 	BUG_ON(journal_pin_active(pin));
422 
423 	wait_event(j->pin_flush_wait, j->flush_in_progress != pin);
424 }
425 
426 /*
427  * Journal reclaim: flush references to open journal entries to reclaim space in
428  * the journal
429  *
430  * May be done by the journal code in the background as needed to free up space
431  * for more journal entries, or as part of doing a clean shutdown, or to migrate
432  * data off of a specific device:
433  */
434 
435 static struct journal_entry_pin *
436 journal_get_next_pin(struct journal *j,
437 		     u64 seq_to_flush,
438 		     unsigned allowed_below_seq,
439 		     unsigned allowed_above_seq,
440 		     u64 *seq)
441 {
442 	struct journal_entry_pin_list *pin_list;
443 	struct journal_entry_pin *ret = NULL;
444 	unsigned i;
445 
446 	fifo_for_each_entry_ptr(pin_list, &j->pin, *seq) {
447 		if (*seq > seq_to_flush && !allowed_above_seq)
448 			break;
449 
450 		for (i = 0; i < JOURNAL_PIN_NR; i++)
451 			if ((((1U << i) & allowed_below_seq) && *seq <= seq_to_flush) ||
452 			    ((1U << i) & allowed_above_seq)) {
453 				ret = list_first_entry_or_null(&pin_list->list[i],
454 					struct journal_entry_pin, list);
455 				if (ret)
456 					return ret;
457 			}
458 	}
459 
460 	return NULL;
461 }
462 
463 /* returns true if we did work */
464 static size_t journal_flush_pins(struct journal *j,
465 				 u64 seq_to_flush,
466 				 unsigned allowed_below_seq,
467 				 unsigned allowed_above_seq,
468 				 unsigned min_any,
469 				 unsigned min_key_cache)
470 {
471 	struct journal_entry_pin *pin;
472 	size_t nr_flushed = 0;
473 	journal_pin_flush_fn flush_fn;
474 	u64 seq;
475 	int err;
476 
477 	lockdep_assert_held(&j->reclaim_lock);
478 
479 	while (1) {
480 		unsigned allowed_above = allowed_above_seq;
481 		unsigned allowed_below = allowed_below_seq;
482 
483 		if (min_any) {
484 			allowed_above |= ~0;
485 			allowed_below |= ~0;
486 		}
487 
488 		if (min_key_cache) {
489 			allowed_above |= 1U << JOURNAL_PIN_key_cache;
490 			allowed_below |= 1U << JOURNAL_PIN_key_cache;
491 		}
492 
493 		cond_resched();
494 
495 		j->last_flushed = jiffies;
496 
497 		spin_lock(&j->lock);
498 		pin = journal_get_next_pin(j, seq_to_flush, allowed_below, allowed_above, &seq);
499 		if (pin) {
500 			BUG_ON(j->flush_in_progress);
501 			j->flush_in_progress = pin;
502 			j->flush_in_progress_dropped = false;
503 			flush_fn = pin->flush;
504 		}
505 		spin_unlock(&j->lock);
506 
507 		if (!pin)
508 			break;
509 
510 		if (min_key_cache && pin->flush == bch2_btree_key_cache_journal_flush)
511 			min_key_cache--;
512 
513 		if (min_any)
514 			min_any--;
515 
516 		err = flush_fn(j, pin, seq);
517 
518 		spin_lock(&j->lock);
519 		/* Pin might have been dropped or rearmed: */
520 		if (likely(!err && !j->flush_in_progress_dropped))
521 			list_move(&pin->list, &journal_seq_pin(j, seq)->flushed);
522 		j->flush_in_progress = NULL;
523 		j->flush_in_progress_dropped = false;
524 		spin_unlock(&j->lock);
525 
526 		wake_up(&j->pin_flush_wait);
527 
528 		if (err)
529 			break;
530 
531 		nr_flushed++;
532 	}
533 
534 	return nr_flushed;
535 }
536 
537 static u64 journal_seq_to_flush(struct journal *j)
538 {
539 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
540 	struct bch_dev *ca;
541 	u64 seq_to_flush = 0;
542 	unsigned iter;
543 
544 	spin_lock(&j->lock);
545 
546 	for_each_rw_member(ca, c, iter) {
547 		struct journal_device *ja = &ca->journal;
548 		unsigned nr_buckets, bucket_to_flush;
549 
550 		if (!ja->nr)
551 			continue;
552 
553 		/* Try to keep the journal at most half full: */
554 		nr_buckets = ja->nr / 2;
555 
556 		nr_buckets = min(nr_buckets, ja->nr);
557 
558 		bucket_to_flush = (ja->cur_idx + nr_buckets) % ja->nr;
559 		seq_to_flush = max(seq_to_flush,
560 				   ja->bucket_seq[bucket_to_flush]);
561 	}
562 
563 	/* Also flush if the pin fifo is more than half full */
564 	seq_to_flush = max_t(s64, seq_to_flush,
565 			     (s64) journal_cur_seq(j) -
566 			     (j->pin.size >> 1));
567 	spin_unlock(&j->lock);
568 
569 	return seq_to_flush;
570 }
571 
572 /**
573  * __bch2_journal_reclaim - free up journal buckets
574  * @j:		journal object
575  * @direct:	direct or background reclaim?
576  * @kicked:	requested to run since we last ran?
577  * Returns:	0 on success, or -EIO if the journal has been shutdown
578  *
579  * Background journal reclaim writes out btree nodes. It should be run
580  * early enough so that we never completely run out of journal buckets.
581  *
582  * High watermarks for triggering background reclaim:
583  * - FIFO has fewer than 512 entries left
584  * - fewer than 25% journal buckets free
585  *
586  * Background reclaim runs until low watermarks are reached:
587  * - FIFO has more than 1024 entries left
588  * - more than 50% journal buckets free
589  *
590  * As long as a reclaim can complete in the time it takes to fill up
591  * 512 journal entries or 25% of all journal buckets, then
592  * journal_next_bucket() should not stall.
593  */
594 static int __bch2_journal_reclaim(struct journal *j, bool direct, bool kicked)
595 {
596 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
597 	bool kthread = (current->flags & PF_KTHREAD) != 0;
598 	u64 seq_to_flush;
599 	size_t min_nr, min_key_cache, nr_flushed;
600 	unsigned flags;
601 	int ret = 0;
602 
603 	/*
604 	 * We can't invoke memory reclaim while holding the reclaim_lock -
605 	 * journal reclaim is required to make progress for memory reclaim
606 	 * (cleaning the caches), so we can't get stuck in memory reclaim while
607 	 * we're holding the reclaim lock:
608 	 */
609 	lockdep_assert_held(&j->reclaim_lock);
610 	flags = memalloc_noreclaim_save();
611 
612 	do {
613 		if (kthread && kthread_should_stop())
614 			break;
615 
616 		if (bch2_journal_error(j)) {
617 			ret = -EIO;
618 			break;
619 		}
620 
621 		bch2_journal_do_discards(j);
622 
623 		seq_to_flush = journal_seq_to_flush(j);
624 		min_nr = 0;
625 
626 		/*
627 		 * If it's been longer than j->reclaim_delay_ms since we last flushed,
628 		 * make sure to flush at least one journal pin:
629 		 */
630 		if (time_after(jiffies, j->last_flushed +
631 			       msecs_to_jiffies(c->opts.journal_reclaim_delay)))
632 			min_nr = 1;
633 
634 		if (j->watermark != BCH_WATERMARK_stripe)
635 			min_nr = 1;
636 
637 		if (atomic_read(&c->btree_cache.dirty) * 2 > c->btree_cache.used)
638 			min_nr = 1;
639 
640 		min_key_cache = min(bch2_nr_btree_keys_need_flush(c), (size_t) 128);
641 
642 		trace_and_count(c, journal_reclaim_start, c,
643 				direct, kicked,
644 				min_nr, min_key_cache,
645 				atomic_read(&c->btree_cache.dirty),
646 				c->btree_cache.used,
647 				atomic_long_read(&c->btree_key_cache.nr_dirty),
648 				atomic_long_read(&c->btree_key_cache.nr_keys));
649 
650 		nr_flushed = journal_flush_pins(j, seq_to_flush,
651 						~0, 0,
652 						min_nr, min_key_cache);
653 
654 		if (direct)
655 			j->nr_direct_reclaim += nr_flushed;
656 		else
657 			j->nr_background_reclaim += nr_flushed;
658 		trace_and_count(c, journal_reclaim_finish, c, nr_flushed);
659 
660 		if (nr_flushed)
661 			wake_up(&j->reclaim_wait);
662 	} while ((min_nr || min_key_cache) && nr_flushed && !direct);
663 
664 	memalloc_noreclaim_restore(flags);
665 
666 	return ret;
667 }
668 
669 int bch2_journal_reclaim(struct journal *j)
670 {
671 	return __bch2_journal_reclaim(j, true, true);
672 }
673 
674 static int bch2_journal_reclaim_thread(void *arg)
675 {
676 	struct journal *j = arg;
677 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
678 	unsigned long delay, now;
679 	bool journal_empty;
680 	int ret = 0;
681 
682 	set_freezable();
683 
684 	j->last_flushed = jiffies;
685 
686 	while (!ret && !kthread_should_stop()) {
687 		bool kicked = j->reclaim_kicked;
688 
689 		j->reclaim_kicked = false;
690 
691 		mutex_lock(&j->reclaim_lock);
692 		ret = __bch2_journal_reclaim(j, false, kicked);
693 		mutex_unlock(&j->reclaim_lock);
694 
695 		now = jiffies;
696 		delay = msecs_to_jiffies(c->opts.journal_reclaim_delay);
697 		j->next_reclaim = j->last_flushed + delay;
698 
699 		if (!time_in_range(j->next_reclaim, now, now + delay))
700 			j->next_reclaim = now + delay;
701 
702 		while (1) {
703 			set_current_state(TASK_INTERRUPTIBLE|TASK_FREEZABLE);
704 			if (kthread_should_stop())
705 				break;
706 			if (j->reclaim_kicked)
707 				break;
708 
709 			spin_lock(&j->lock);
710 			journal_empty = fifo_empty(&j->pin);
711 			spin_unlock(&j->lock);
712 
713 			if (journal_empty)
714 				schedule();
715 			else if (time_after(j->next_reclaim, jiffies))
716 				schedule_timeout(j->next_reclaim - jiffies);
717 			else
718 				break;
719 		}
720 		__set_current_state(TASK_RUNNING);
721 	}
722 
723 	return 0;
724 }
725 
726 void bch2_journal_reclaim_stop(struct journal *j)
727 {
728 	struct task_struct *p = j->reclaim_thread;
729 
730 	j->reclaim_thread = NULL;
731 
732 	if (p) {
733 		kthread_stop(p);
734 		put_task_struct(p);
735 	}
736 }
737 
738 int bch2_journal_reclaim_start(struct journal *j)
739 {
740 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
741 	struct task_struct *p;
742 	int ret;
743 
744 	if (j->reclaim_thread)
745 		return 0;
746 
747 	p = kthread_create(bch2_journal_reclaim_thread, j,
748 			   "bch-reclaim/%s", c->name);
749 	ret = PTR_ERR_OR_ZERO(p);
750 	if (ret) {
751 		bch_err_msg(c, ret, "creating journal reclaim thread");
752 		return ret;
753 	}
754 
755 	get_task_struct(p);
756 	j->reclaim_thread = p;
757 	wake_up_process(p);
758 	return 0;
759 }
760 
761 static int journal_flush_done(struct journal *j, u64 seq_to_flush,
762 			      bool *did_work)
763 {
764 	int ret;
765 
766 	ret = bch2_journal_error(j);
767 	if (ret)
768 		return ret;
769 
770 	mutex_lock(&j->reclaim_lock);
771 
772 	if (journal_flush_pins(j, seq_to_flush,
773 			       (1U << JOURNAL_PIN_key_cache)|
774 			       (1U << JOURNAL_PIN_other), 0, 0, 0) ||
775 	    journal_flush_pins(j, seq_to_flush,
776 			       (1U << JOURNAL_PIN_btree), 0, 0, 0))
777 		*did_work = true;
778 
779 	spin_lock(&j->lock);
780 	/*
781 	 * If journal replay hasn't completed, the unreplayed journal entries
782 	 * hold refs on their corresponding sequence numbers
783 	 */
784 	ret = !test_bit(JOURNAL_REPLAY_DONE, &j->flags) ||
785 		journal_last_seq(j) > seq_to_flush ||
786 		!fifo_used(&j->pin);
787 
788 	spin_unlock(&j->lock);
789 	mutex_unlock(&j->reclaim_lock);
790 
791 	return ret;
792 }
793 
794 bool bch2_journal_flush_pins(struct journal *j, u64 seq_to_flush)
795 {
796 	bool did_work = false;
797 
798 	if (!test_bit(JOURNAL_STARTED, &j->flags))
799 		return false;
800 
801 	closure_wait_event(&j->async_wait,
802 		journal_flush_done(j, seq_to_flush, &did_work));
803 
804 	return did_work;
805 }
806 
807 int bch2_journal_flush_device_pins(struct journal *j, int dev_idx)
808 {
809 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
810 	struct journal_entry_pin_list *p;
811 	u64 iter, seq = 0;
812 	int ret = 0;
813 
814 	spin_lock(&j->lock);
815 	fifo_for_each_entry_ptr(p, &j->pin, iter)
816 		if (dev_idx >= 0
817 		    ? bch2_dev_list_has_dev(p->devs, dev_idx)
818 		    : p->devs.nr < c->opts.metadata_replicas)
819 			seq = iter;
820 	spin_unlock(&j->lock);
821 
822 	bch2_journal_flush_pins(j, seq);
823 
824 	ret = bch2_journal_error(j);
825 	if (ret)
826 		return ret;
827 
828 	mutex_lock(&c->replicas_gc_lock);
829 	bch2_replicas_gc_start(c, 1 << BCH_DATA_journal);
830 
831 	/*
832 	 * Now that we've populated replicas_gc, write to the journal to mark
833 	 * active journal devices. This handles the case where the journal might
834 	 * be empty. Otherwise we could clear all journal replicas and
835 	 * temporarily put the fs into an unrecoverable state. Journal recovery
836 	 * expects to find devices marked for journal data on unclean mount.
837 	 */
838 	ret = bch2_journal_meta(&c->journal);
839 	if (ret)
840 		goto err;
841 
842 	seq = 0;
843 	spin_lock(&j->lock);
844 	while (!ret) {
845 		struct bch_replicas_padded replicas;
846 
847 		seq = max(seq, journal_last_seq(j));
848 		if (seq >= j->pin.back)
849 			break;
850 		bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal,
851 					 journal_seq_pin(j, seq)->devs);
852 		seq++;
853 
854 		spin_unlock(&j->lock);
855 		ret = bch2_mark_replicas(c, &replicas.e);
856 		spin_lock(&j->lock);
857 	}
858 	spin_unlock(&j->lock);
859 err:
860 	ret = bch2_replicas_gc_end(c, ret);
861 	mutex_unlock(&c->replicas_gc_lock);
862 
863 	return ret;
864 }
865