xref: /linux/fs/bcachefs/journal.c (revision 4077991c8536595b50b52bab739ef1e9ac6a72cf)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * bcachefs journalling code, for btree insertions
4  *
5  * Copyright 2012 Google, Inc.
6  */
7 
8 #include "bcachefs.h"
9 #include "alloc.h"
10 #include "bkey_methods.h"
11 #include "btree_gc.h"
12 #include "buckets.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 #include "journal_seq_blacklist.h"
17 #include "super-io.h"
18 #include "trace.h"
19 
20 static bool journal_entry_is_open(struct journal *j)
21 {
22 	return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
23 }
24 
25 void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
26 {
27 	struct journal_buf *w = journal_prev_buf(j);
28 
29 	atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
30 
31 	if (!need_write_just_set &&
32 	    test_bit(JOURNAL_NEED_WRITE, &j->flags))
33 		bch2_time_stats_update(j->delay_time,
34 				       j->need_write_time);
35 #if 0
36 	closure_call(&j->io, bch2_journal_write, NULL, NULL);
37 #else
38 	/* Shut sparse up: */
39 	closure_init(&j->io, NULL);
40 	set_closure_fn(&j->io, bch2_journal_write, NULL);
41 	bch2_journal_write(&j->io);
42 #endif
43 }
44 
45 static void journal_pin_new_entry(struct journal *j, int count)
46 {
47 	struct journal_entry_pin_list *p;
48 
49 	/*
50 	 * The fifo_push() needs to happen at the same time as j->seq is
51 	 * incremented for journal_last_seq() to be calculated correctly
52 	 */
53 	atomic64_inc(&j->seq);
54 	p = fifo_push_ref(&j->pin);
55 
56 	INIT_LIST_HEAD(&p->list);
57 	INIT_LIST_HEAD(&p->flushed);
58 	atomic_set(&p->count, count);
59 	p->devs.nr = 0;
60 }
61 
62 static void bch2_journal_buf_init(struct journal *j)
63 {
64 	struct journal_buf *buf = journal_cur_buf(j);
65 
66 	memset(buf->has_inode, 0, sizeof(buf->has_inode));
67 
68 	memset(buf->data, 0, sizeof(*buf->data));
69 	buf->data->seq	= cpu_to_le64(journal_cur_seq(j));
70 	buf->data->u64s	= 0;
71 }
72 
73 static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
74 {
75 	return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
76 }
77 
78 static inline bool journal_entry_empty(struct jset *j)
79 {
80 	struct jset_entry *i;
81 
82 	if (j->seq != j->last_seq)
83 		return false;
84 
85 	vstruct_for_each(j, i)
86 		if (i->type || i->u64s)
87 			return false;
88 	return true;
89 }
90 
91 static enum {
92 	JOURNAL_ENTRY_ERROR,
93 	JOURNAL_ENTRY_INUSE,
94 	JOURNAL_ENTRY_CLOSED,
95 	JOURNAL_UNLOCKED,
96 } journal_buf_switch(struct journal *j, bool need_write_just_set)
97 {
98 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
99 	struct journal_buf *buf;
100 	union journal_res_state old, new;
101 	u64 v = atomic64_read(&j->reservations.counter);
102 
103 	lockdep_assert_held(&j->lock);
104 
105 	do {
106 		old.v = new.v = v;
107 		if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
108 			return JOURNAL_ENTRY_CLOSED;
109 
110 		if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
111 			return JOURNAL_ENTRY_ERROR;
112 
113 		if (new.prev_buf_unwritten)
114 			return JOURNAL_ENTRY_INUSE;
115 
116 		/*
117 		 * avoid race between setting buf->data->u64s and
118 		 * journal_res_put starting write:
119 		 */
120 		journal_state_inc(&new);
121 
122 		new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
123 		new.idx++;
124 		new.prev_buf_unwritten = 1;
125 
126 		BUG_ON(journal_state_count(new, new.idx));
127 	} while ((v = atomic64_cmpxchg(&j->reservations.counter,
128 				       old.v, new.v)) != old.v);
129 
130 	clear_bit(JOURNAL_NEED_WRITE, &j->flags);
131 
132 	buf = &j->buf[old.idx];
133 	buf->data->u64s		= cpu_to_le32(old.cur_entry_offset);
134 
135 	j->prev_buf_sectors =
136 		vstruct_blocks_plus(buf->data, c->block_bits,
137 				    journal_entry_u64s_reserve(buf)) *
138 		c->opts.block_size;
139 	BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
140 
141 	/*
142 	 * We have to set last_seq here, _before_ opening a new journal entry:
143 	 *
144 	 * A threads may replace an old pin with a new pin on their current
145 	 * journal reservation - the expectation being that the journal will
146 	 * contain either what the old pin protected or what the new pin
147 	 * protects.
148 	 *
149 	 * After the old pin is dropped journal_last_seq() won't include the old
150 	 * pin, so we can only write the updated last_seq on the entry that
151 	 * contains whatever the new pin protects.
152 	 *
153 	 * Restated, we can _not_ update last_seq for a given entry if there
154 	 * could be a newer entry open with reservations/pins that have been
155 	 * taken against it.
156 	 *
157 	 * Hence, we want update/set last_seq on the current journal entry right
158 	 * before we open a new one:
159 	 */
160 	bch2_journal_reclaim_fast(j);
161 	buf->data->last_seq	= cpu_to_le64(journal_last_seq(j));
162 
163 	if (journal_entry_empty(buf->data))
164 		clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
165 	else
166 		set_bit(JOURNAL_NOT_EMPTY, &j->flags);
167 
168 	journal_pin_new_entry(j, 1);
169 
170 	bch2_journal_buf_init(j);
171 
172 	cancel_delayed_work(&j->write_work);
173 	spin_unlock(&j->lock);
174 
175 	if (c->bucket_journal_seq > 1 << 14) {
176 		c->bucket_journal_seq = 0;
177 		bch2_bucket_seq_cleanup(c);
178 	}
179 
180 	c->bucket_journal_seq++;
181 
182 	/* ugh - might be called from __journal_res_get() under wait_event() */
183 	__set_current_state(TASK_RUNNING);
184 	bch2_journal_buf_put(j, old.idx, need_write_just_set);
185 
186 	return JOURNAL_UNLOCKED;
187 }
188 
189 void bch2_journal_halt(struct journal *j)
190 {
191 	union journal_res_state old, new;
192 	u64 v = atomic64_read(&j->reservations.counter);
193 
194 	do {
195 		old.v = new.v = v;
196 		if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
197 			return;
198 
199 		new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
200 	} while ((v = atomic64_cmpxchg(&j->reservations.counter,
201 				       old.v, new.v)) != old.v);
202 
203 	journal_wake(j);
204 	closure_wake_up(&journal_cur_buf(j)->wait);
205 	closure_wake_up(&journal_prev_buf(j)->wait);
206 }
207 
208 /*
209  * should _only_ called from journal_res_get() - when we actually want a
210  * journal reservation - journal entry is open means journal is dirty:
211  *
212  * returns:
213  * 1:		success
214  * 0:		journal currently full (must wait)
215  * -EROFS:	insufficient rw devices
216  * -EIO:	journal error
217  */
218 static int journal_entry_open(struct journal *j)
219 {
220 	struct journal_buf *buf = journal_cur_buf(j);
221 	union journal_res_state old, new;
222 	ssize_t u64s;
223 	int sectors;
224 	u64 v;
225 
226 	lockdep_assert_held(&j->lock);
227 	BUG_ON(journal_entry_is_open(j));
228 
229 	if (!fifo_free(&j->pin))
230 		return 0;
231 
232 	sectors = bch2_journal_entry_sectors(j);
233 	if (sectors <= 0)
234 		return sectors;
235 
236 	buf->disk_sectors	= sectors;
237 
238 	sectors = min_t(unsigned, sectors, buf->size >> 9);
239 	j->cur_buf_sectors	= sectors;
240 
241 	u64s = (sectors << 9) / sizeof(u64);
242 
243 	/* Subtract the journal header */
244 	u64s -= sizeof(struct jset) / sizeof(u64);
245 	/*
246 	 * Btree roots, prio pointers don't get added until right before we do
247 	 * the write:
248 	 */
249 	u64s -= journal_entry_u64s_reserve(buf);
250 	u64s  = max_t(ssize_t, 0L, u64s);
251 
252 	BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
253 
254 	if (u64s <= le32_to_cpu(buf->data->u64s))
255 		return 0;
256 
257 	/*
258 	 * Must be set before marking the journal entry as open:
259 	 */
260 	j->cur_entry_u64s = u64s;
261 
262 	v = atomic64_read(&j->reservations.counter);
263 	do {
264 		old.v = new.v = v;
265 
266 		if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
267 			return -EIO;
268 
269 		/* Handle any already added entries */
270 		new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
271 	} while ((v = atomic64_cmpxchg(&j->reservations.counter,
272 				       old.v, new.v)) != old.v);
273 
274 	if (j->res_get_blocked_start)
275 		bch2_time_stats_update(j->blocked_time,
276 				       j->res_get_blocked_start);
277 	j->res_get_blocked_start = 0;
278 
279 	mod_delayed_work(system_freezable_wq,
280 			 &j->write_work,
281 			 msecs_to_jiffies(j->write_delay_ms));
282 	journal_wake(j);
283 	return 1;
284 }
285 
286 /*
287  * returns true if there's nothing to flush and no journal write still in flight
288  */
289 static bool journal_flush_write(struct journal *j)
290 {
291 	bool ret;
292 
293 	spin_lock(&j->lock);
294 	ret = !j->reservations.prev_buf_unwritten;
295 
296 	if (!journal_entry_is_open(j)) {
297 		spin_unlock(&j->lock);
298 		return ret;
299 	}
300 
301 	set_bit(JOURNAL_NEED_WRITE, &j->flags);
302 	if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
303 		ret = false;
304 	else
305 		spin_unlock(&j->lock);
306 	return ret;
307 }
308 
309 static void journal_write_work(struct work_struct *work)
310 {
311 	struct journal *j = container_of(work, struct journal, write_work.work);
312 
313 	journal_flush_write(j);
314 }
315 
316 /*
317  * Given an inode number, if that inode number has data in the journal that
318  * hasn't yet been flushed, return the journal sequence number that needs to be
319  * flushed:
320  */
321 u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
322 {
323 	size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
324 	u64 seq = 0;
325 
326 	if (!test_bit(h, j->buf[0].has_inode) &&
327 	    !test_bit(h, j->buf[1].has_inode))
328 		return 0;
329 
330 	spin_lock(&j->lock);
331 	if (test_bit(h, journal_cur_buf(j)->has_inode))
332 		seq = journal_cur_seq(j);
333 	else if (test_bit(h, journal_prev_buf(j)->has_inode))
334 		seq = journal_cur_seq(j) - 1;
335 	spin_unlock(&j->lock);
336 
337 	return seq;
338 }
339 
340 static int __journal_res_get(struct journal *j, struct journal_res *res,
341 			      unsigned u64s_min, unsigned u64s_max)
342 {
343 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
344 	struct journal_buf *buf;
345 	int ret;
346 retry:
347 	ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
348 	if (ret)
349 		return ret;
350 
351 	spin_lock(&j->lock);
352 	/*
353 	 * Recheck after taking the lock, so we don't race with another thread
354 	 * that just did journal_entry_open() and call journal_entry_close()
355 	 * unnecessarily
356 	 */
357 	ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
358 	if (ret) {
359 		spin_unlock(&j->lock);
360 		return 1;
361 	}
362 
363 	/*
364 	 * If we couldn't get a reservation because the current buf filled up,
365 	 * and we had room for a bigger entry on disk, signal that we want to
366 	 * realloc the journal bufs:
367 	 */
368 	buf = journal_cur_buf(j);
369 	if (journal_entry_is_open(j) &&
370 	    buf->size >> 9 < buf->disk_sectors &&
371 	    buf->size < JOURNAL_ENTRY_SIZE_MAX)
372 		j->buf_size_want = max(j->buf_size_want, buf->size << 1);
373 
374 	/*
375 	 * Close the current journal entry if necessary, then try to start a new
376 	 * one:
377 	 */
378 	switch (journal_buf_switch(j, false)) {
379 	case JOURNAL_ENTRY_ERROR:
380 		spin_unlock(&j->lock);
381 		return -EROFS;
382 	case JOURNAL_ENTRY_INUSE:
383 		/* haven't finished writing out the previous one: */
384 		spin_unlock(&j->lock);
385 		trace_journal_entry_full(c);
386 		goto blocked;
387 	case JOURNAL_ENTRY_CLOSED:
388 		break;
389 	case JOURNAL_UNLOCKED:
390 		goto retry;
391 	}
392 
393 	/* We now have a new, closed journal buf - see if we can open it: */
394 	ret = journal_entry_open(j);
395 	spin_unlock(&j->lock);
396 
397 	if (ret < 0)
398 		return ret;
399 	if (ret)
400 		goto retry;
401 
402 	/* Journal's full, we have to wait */
403 
404 	/*
405 	 * Direct reclaim - can't rely on reclaim from work item
406 	 * due to freezing..
407 	 */
408 	bch2_journal_reclaim_work(&j->reclaim_work.work);
409 
410 	trace_journal_full(c);
411 blocked:
412 	if (!j->res_get_blocked_start)
413 		j->res_get_blocked_start = local_clock() ?: 1;
414 	return 0;
415 }
416 
417 /*
418  * Essentially the entry function to the journaling code. When bcachefs is doing
419  * a btree insert, it calls this function to get the current journal write.
420  * Journal write is the structure used set up journal writes. The calling
421  * function will then add its keys to the structure, queuing them for the next
422  * write.
423  *
424  * To ensure forward progress, the current task must not be holding any
425  * btree node write locks.
426  */
427 int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
428 				 unsigned u64s_min, unsigned u64s_max)
429 {
430 	int ret;
431 
432 	wait_event(j->wait,
433 		   (ret = __journal_res_get(j, res, u64s_min,
434 					    u64s_max)));
435 	return ret < 0 ? ret : 0;
436 }
437 
438 u64 bch2_journal_last_unwritten_seq(struct journal *j)
439 {
440 	u64 seq;
441 
442 	spin_lock(&j->lock);
443 	seq = journal_cur_seq(j);
444 	if (j->reservations.prev_buf_unwritten)
445 		seq--;
446 	spin_unlock(&j->lock);
447 
448 	return seq;
449 }
450 
451 /**
452  * bch2_journal_open_seq_async - try to open a new journal entry if @seq isn't
453  * open yet, or wait if we cannot
454  *
455  * used by the btree interior update machinery, when it needs to write a new
456  * btree root - every journal entry contains the roots of all the btrees, so it
457  * doesn't need to bother with getting a journal reservation
458  */
459 int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
460 {
461 	int ret;
462 
463 	spin_lock(&j->lock);
464 	BUG_ON(seq > journal_cur_seq(j));
465 
466 	if (seq < journal_cur_seq(j) ||
467 	    journal_entry_is_open(j)) {
468 		spin_unlock(&j->lock);
469 		return 1;
470 	}
471 
472 	ret = journal_entry_open(j);
473 	if (!ret)
474 		closure_wait(&j->async_wait, parent);
475 	spin_unlock(&j->lock);
476 
477 	if (!ret)
478 		bch2_journal_reclaim_work(&j->reclaim_work.work);
479 
480 	return ret;
481 }
482 
483 /**
484  * bch2_journal_wait_on_seq - wait for a journal entry to be written
485  *
486  * does _not_ cause @seq to be written immediately - if there is no other
487  * activity to cause the relevant journal entry to be filled up or flushed it
488  * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
489  * configurable).
490  */
491 void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
492 {
493 	spin_lock(&j->lock);
494 
495 	BUG_ON(seq > journal_cur_seq(j));
496 
497 	if (bch2_journal_error(j)) {
498 		spin_unlock(&j->lock);
499 		return;
500 	}
501 
502 	if (seq == journal_cur_seq(j)) {
503 		if (!closure_wait(&journal_cur_buf(j)->wait, parent))
504 			BUG();
505 	} else if (seq + 1 == journal_cur_seq(j) &&
506 		   j->reservations.prev_buf_unwritten) {
507 		if (!closure_wait(&journal_prev_buf(j)->wait, parent))
508 			BUG();
509 
510 		smp_mb();
511 
512 		/* check if raced with write completion (or failure) */
513 		if (!j->reservations.prev_buf_unwritten ||
514 		    bch2_journal_error(j))
515 			closure_wake_up(&journal_prev_buf(j)->wait);
516 	}
517 
518 	spin_unlock(&j->lock);
519 }
520 
521 /**
522  * bch2_journal_flush_seq_async - wait for a journal entry to be written
523  *
524  * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
525  * necessary
526  */
527 void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
528 {
529 	struct journal_buf *buf;
530 
531 	spin_lock(&j->lock);
532 
533 	BUG_ON(seq > journal_cur_seq(j));
534 
535 	if (bch2_journal_error(j)) {
536 		spin_unlock(&j->lock);
537 		return;
538 	}
539 
540 	if (seq == journal_cur_seq(j)) {
541 		bool set_need_write = false;
542 
543 		buf = journal_cur_buf(j);
544 
545 		if (parent && !closure_wait(&buf->wait, parent))
546 			BUG();
547 
548 		if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
549 			j->need_write_time = local_clock();
550 			set_need_write = true;
551 		}
552 
553 		switch (journal_buf_switch(j, set_need_write)) {
554 		case JOURNAL_ENTRY_ERROR:
555 			if (parent)
556 				closure_wake_up(&buf->wait);
557 			break;
558 		case JOURNAL_ENTRY_CLOSED:
559 			/*
560 			 * Journal entry hasn't been opened yet, but caller
561 			 * claims it has something
562 			 */
563 			BUG();
564 		case JOURNAL_ENTRY_INUSE:
565 			break;
566 		case JOURNAL_UNLOCKED:
567 			return;
568 		}
569 	} else if (parent &&
570 		   seq + 1 == journal_cur_seq(j) &&
571 		   j->reservations.prev_buf_unwritten) {
572 		buf = journal_prev_buf(j);
573 
574 		if (!closure_wait(&buf->wait, parent))
575 			BUG();
576 
577 		smp_mb();
578 
579 		/* check if raced with write completion (or failure) */
580 		if (!j->reservations.prev_buf_unwritten ||
581 		    bch2_journal_error(j))
582 			closure_wake_up(&buf->wait);
583 	}
584 
585 	spin_unlock(&j->lock);
586 }
587 
588 static int journal_seq_flushed(struct journal *j, u64 seq)
589 {
590 	struct journal_buf *buf;
591 	int ret = 1;
592 
593 	spin_lock(&j->lock);
594 	BUG_ON(seq > journal_cur_seq(j));
595 
596 	if (seq == journal_cur_seq(j)) {
597 		bool set_need_write = false;
598 
599 		ret = 0;
600 
601 		buf = journal_cur_buf(j);
602 
603 		if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
604 			j->need_write_time = local_clock();
605 			set_need_write = true;
606 		}
607 
608 		switch (journal_buf_switch(j, set_need_write)) {
609 		case JOURNAL_ENTRY_ERROR:
610 			ret = -EIO;
611 			break;
612 		case JOURNAL_ENTRY_CLOSED:
613 			/*
614 			 * Journal entry hasn't been opened yet, but caller
615 			 * claims it has something
616 			 */
617 			BUG();
618 		case JOURNAL_ENTRY_INUSE:
619 			break;
620 		case JOURNAL_UNLOCKED:
621 			return 0;
622 		}
623 	} else if (seq + 1 == journal_cur_seq(j) &&
624 		   j->reservations.prev_buf_unwritten) {
625 		ret = bch2_journal_error(j);
626 	}
627 
628 	spin_unlock(&j->lock);
629 
630 	return ret;
631 }
632 
633 int bch2_journal_flush_seq(struct journal *j, u64 seq)
634 {
635 	u64 start_time = local_clock();
636 	int ret, ret2;
637 
638 	ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
639 
640 	bch2_time_stats_update(j->flush_seq_time, start_time);
641 
642 	return ret ?: ret2 < 0 ? ret2 : 0;
643 }
644 
645 /**
646  * bch2_journal_meta_async - force a journal entry to be written
647  */
648 void bch2_journal_meta_async(struct journal *j, struct closure *parent)
649 {
650 	struct journal_res res;
651 	unsigned u64s = jset_u64s(0);
652 
653 	memset(&res, 0, sizeof(res));
654 
655 	bch2_journal_res_get(j, &res, u64s, u64s);
656 	bch2_journal_res_put(j, &res);
657 
658 	bch2_journal_flush_seq_async(j, res.seq, parent);
659 }
660 
661 int bch2_journal_meta(struct journal *j)
662 {
663 	struct journal_res res;
664 	unsigned u64s = jset_u64s(0);
665 	int ret;
666 
667 	memset(&res, 0, sizeof(res));
668 
669 	ret = bch2_journal_res_get(j, &res, u64s, u64s);
670 	if (ret)
671 		return ret;
672 
673 	bch2_journal_res_put(j, &res);
674 
675 	return bch2_journal_flush_seq(j, res.seq);
676 }
677 
678 /*
679  * bch2_journal_flush_async - if there is an open journal entry, or a journal
680  * still being written, write it and wait for the write to complete
681  */
682 void bch2_journal_flush_async(struct journal *j, struct closure *parent)
683 {
684 	u64 seq, journal_seq;
685 
686 	spin_lock(&j->lock);
687 	journal_seq = journal_cur_seq(j);
688 
689 	if (journal_entry_is_open(j)) {
690 		seq = journal_seq;
691 	} else if (journal_seq) {
692 		seq = journal_seq - 1;
693 	} else {
694 		spin_unlock(&j->lock);
695 		return;
696 	}
697 	spin_unlock(&j->lock);
698 
699 	bch2_journal_flush_seq_async(j, seq, parent);
700 }
701 
702 int bch2_journal_flush(struct journal *j)
703 {
704 	u64 seq, journal_seq;
705 
706 	spin_lock(&j->lock);
707 	journal_seq = journal_cur_seq(j);
708 
709 	if (journal_entry_is_open(j)) {
710 		seq = journal_seq;
711 	} else if (journal_seq) {
712 		seq = journal_seq - 1;
713 	} else {
714 		spin_unlock(&j->lock);
715 		return 0;
716 	}
717 	spin_unlock(&j->lock);
718 
719 	return bch2_journal_flush_seq(j, seq);
720 }
721 
722 /* allocate journal on a device: */
723 
724 static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
725 					 bool new_fs, struct closure *cl)
726 {
727 	struct bch_fs *c = ca->fs;
728 	struct journal_device *ja = &ca->journal;
729 	struct bch_sb_field_journal *journal_buckets;
730 	u64 *new_bucket_seq = NULL, *new_buckets = NULL;
731 	int ret = 0;
732 
733 	/* don't handle reducing nr of buckets yet: */
734 	if (nr <= ja->nr)
735 		return 0;
736 
737 	ret = -ENOMEM;
738 	new_buckets	= kzalloc(nr * sizeof(u64), GFP_KERNEL);
739 	new_bucket_seq	= kzalloc(nr * sizeof(u64), GFP_KERNEL);
740 	if (!new_buckets || !new_bucket_seq)
741 		goto err;
742 
743 	journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
744 				nr + sizeof(*journal_buckets) / sizeof(u64));
745 	if (!journal_buckets)
746 		goto err;
747 
748 	/*
749 	 * We may be called from the device add path, before the new device has
750 	 * actually been added to the running filesystem:
751 	 */
752 	if (c)
753 		spin_lock(&c->journal.lock);
754 
755 	memcpy(new_buckets,	ja->buckets,	ja->nr * sizeof(u64));
756 	memcpy(new_bucket_seq,	ja->bucket_seq,	ja->nr * sizeof(u64));
757 	swap(new_buckets,	ja->buckets);
758 	swap(new_bucket_seq,	ja->bucket_seq);
759 
760 	if (c)
761 		spin_unlock(&c->journal.lock);
762 
763 	while (ja->nr < nr) {
764 		struct open_bucket *ob = NULL;
765 		long bucket;
766 
767 		if (new_fs) {
768 			bucket = bch2_bucket_alloc_new_fs(ca);
769 			if (bucket < 0) {
770 				ret = -ENOSPC;
771 				goto err;
772 			}
773 		} else {
774 			int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl);
775 			if (ob_idx < 0) {
776 				ret = cl ? -EAGAIN : -ENOSPC;
777 				goto err;
778 			}
779 
780 			ob = c->open_buckets + ob_idx;
781 			bucket = sector_to_bucket(ca, ob->ptr.offset);
782 		}
783 
784 		if (c) {
785 			percpu_down_read(&c->usage_lock);
786 			spin_lock(&c->journal.lock);
787 		} else {
788 			preempt_disable();
789 		}
790 
791 		__array_insert_item(ja->buckets,		ja->nr, ja->last_idx);
792 		__array_insert_item(ja->bucket_seq,		ja->nr, ja->last_idx);
793 		__array_insert_item(journal_buckets->buckets,	ja->nr, ja->last_idx);
794 
795 		ja->buckets[ja->last_idx] = bucket;
796 		ja->bucket_seq[ja->last_idx] = 0;
797 		journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
798 
799 		if (ja->last_idx < ja->nr) {
800 			if (ja->cur_idx >= ja->last_idx)
801 				ja->cur_idx++;
802 			ja->last_idx++;
803 		}
804 		ja->nr++;
805 
806 		bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
807 				ca->mi.bucket_size,
808 				gc_phase(GC_PHASE_SB),
809 				new_fs
810 				? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
811 				: 0);
812 
813 		if (c) {
814 			spin_unlock(&c->journal.lock);
815 			percpu_up_read(&c->usage_lock);
816 		} else {
817 			preempt_enable();
818 		}
819 
820 		if (!new_fs)
821 			bch2_open_bucket_put(c, ob);
822 	}
823 
824 	ret = 0;
825 err:
826 	kfree(new_bucket_seq);
827 	kfree(new_buckets);
828 
829 	return ret;
830 }
831 
832 /*
833  * Allocate more journal space at runtime - not currently making use if it, but
834  * the code works:
835  */
836 int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
837 				unsigned nr)
838 {
839 	struct journal_device *ja = &ca->journal;
840 	struct closure cl;
841 	unsigned current_nr;
842 	int ret;
843 
844 	closure_init_stack(&cl);
845 
846 	do {
847 		struct disk_reservation disk_res = { 0, 0 };
848 
849 		closure_sync(&cl);
850 
851 		mutex_lock(&c->sb_lock);
852 		current_nr = ja->nr;
853 
854 		/*
855 		 * note: journal buckets aren't really counted as _sectors_ used yet, so
856 		 * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
857 		 * when space used goes up without a reservation - but we do need the
858 		 * reservation to ensure we'll actually be able to allocate:
859 		 */
860 
861 		if (bch2_disk_reservation_get(c, &disk_res,
862 				bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
863 			mutex_unlock(&c->sb_lock);
864 			return -ENOSPC;
865 		}
866 
867 		ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
868 
869 		bch2_disk_reservation_put(c, &disk_res);
870 
871 		if (ja->nr != current_nr)
872 			bch2_write_super(c);
873 		mutex_unlock(&c->sb_lock);
874 	} while (ret == -EAGAIN);
875 
876 	return ret;
877 }
878 
879 int bch2_dev_journal_alloc(struct bch_dev *ca)
880 {
881 	unsigned nr;
882 
883 	if (dynamic_fault("bcachefs:add:journal_alloc"))
884 		return -ENOMEM;
885 
886 	/*
887 	 * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
888 	 * is smaller:
889 	 */
890 	nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
891 		     BCH_JOURNAL_BUCKETS_MIN,
892 		     min(1 << 10,
893 			 (1 << 20) / ca->mi.bucket_size));
894 
895 	return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
896 }
897 
898 /* startup/shutdown: */
899 
900 static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
901 {
902 	union journal_res_state state;
903 	struct journal_buf *w;
904 	bool ret;
905 
906 	spin_lock(&j->lock);
907 	state = READ_ONCE(j->reservations);
908 	w = j->buf + !state.idx;
909 
910 	ret = state.prev_buf_unwritten &&
911 		bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
912 	spin_unlock(&j->lock);
913 
914 	return ret;
915 }
916 
917 void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
918 {
919 	spin_lock(&j->lock);
920 	bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
921 	spin_unlock(&j->lock);
922 
923 	wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
924 }
925 
926 void bch2_fs_journal_stop(struct journal *j)
927 {
928 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
929 
930 	wait_event(j->wait, journal_flush_write(j));
931 
932 	/* do we need to write another journal entry? */
933 	if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) ||
934 	    c->btree_roots_dirty)
935 		bch2_journal_meta(j);
936 
937 	BUG_ON(!bch2_journal_error(j) &&
938 	       test_bit(JOURNAL_NOT_EMPTY, &j->flags));
939 
940 	cancel_delayed_work_sync(&j->write_work);
941 	cancel_delayed_work_sync(&j->reclaim_work);
942 }
943 
944 void bch2_fs_journal_start(struct journal *j)
945 {
946 	struct journal_seq_blacklist *bl;
947 	u64 blacklist = 0;
948 
949 	list_for_each_entry(bl, &j->seq_blacklist, list)
950 		blacklist = max(blacklist, bl->end);
951 
952 	spin_lock(&j->lock);
953 
954 	set_bit(JOURNAL_STARTED, &j->flags);
955 
956 	while (journal_cur_seq(j) < blacklist)
957 		journal_pin_new_entry(j, 0);
958 
959 	/*
960 	 * journal_buf_switch() only inits the next journal entry when it
961 	 * closes an open journal entry - the very first journal entry gets
962 	 * initialized here:
963 	 */
964 	journal_pin_new_entry(j, 1);
965 	bch2_journal_buf_init(j);
966 
967 	spin_unlock(&j->lock);
968 
969 	/*
970 	 * Adding entries to the next journal entry before allocating space on
971 	 * disk for the next journal entry - this is ok, because these entries
972 	 * only have to go down with the next journal entry we write:
973 	 */
974 	bch2_journal_seq_blacklist_write(j);
975 
976 	queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
977 }
978 
979 /* init/exit: */
980 
981 void bch2_dev_journal_exit(struct bch_dev *ca)
982 {
983 	kfree(ca->journal.bio);
984 	kfree(ca->journal.buckets);
985 	kfree(ca->journal.bucket_seq);
986 
987 	ca->journal.bio		= NULL;
988 	ca->journal.buckets	= NULL;
989 	ca->journal.bucket_seq	= NULL;
990 }
991 
992 int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
993 {
994 	struct journal_device *ja = &ca->journal;
995 	struct bch_sb_field_journal *journal_buckets =
996 		bch2_sb_get_journal(sb);
997 	unsigned i, nr_bvecs;
998 
999 	ja->nr = bch2_nr_journal_buckets(journal_buckets);
1000 
1001 	ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1002 	if (!ja->bucket_seq)
1003 		return -ENOMEM;
1004 
1005 	nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE);
1006 
1007 	ca->journal.bio = bio_kmalloc(nr_bvecs, GFP_KERNEL);
1008 	if (!ca->journal.bio)
1009 		return -ENOMEM;
1010 
1011 	bio_init(ca->journal.bio, NULL, ca->journal.bio->bi_inline_vecs, nr_bvecs, 0);
1012 
1013 	ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
1014 	if (!ja->buckets)
1015 		return -ENOMEM;
1016 
1017 	for (i = 0; i < ja->nr; i++)
1018 		ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
1019 
1020 	return 0;
1021 }
1022 
1023 void bch2_fs_journal_exit(struct journal *j)
1024 {
1025 	kvpfree(j->buf[1].data, j->buf[1].size);
1026 	kvpfree(j->buf[0].data, j->buf[0].size);
1027 	free_fifo(&j->pin);
1028 }
1029 
1030 int bch2_fs_journal_init(struct journal *j)
1031 {
1032 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
1033 	static struct lock_class_key res_key;
1034 	int ret = 0;
1035 
1036 	pr_verbose_init(c->opts, "");
1037 
1038 	spin_lock_init(&j->lock);
1039 	spin_lock_init(&j->err_lock);
1040 	init_waitqueue_head(&j->wait);
1041 	INIT_DELAYED_WORK(&j->write_work, journal_write_work);
1042 	INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work);
1043 	init_waitqueue_head(&j->pin_flush_wait);
1044 	mutex_init(&j->blacklist_lock);
1045 	INIT_LIST_HEAD(&j->seq_blacklist);
1046 	mutex_init(&j->reclaim_lock);
1047 
1048 	lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
1049 
1050 	j->buf[0].size		= JOURNAL_ENTRY_SIZE_MIN;
1051 	j->buf[1].size		= JOURNAL_ENTRY_SIZE_MIN;
1052 	j->write_delay_ms	= 1000;
1053 	j->reclaim_delay_ms	= 100;
1054 
1055 	bkey_extent_init(&j->key);
1056 
1057 	atomic64_set(&j->reservations.counter,
1058 		((union journal_res_state)
1059 		 { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
1060 
1061 	if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
1062 	    !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
1063 	    !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
1064 		ret = -ENOMEM;
1065 		goto out;
1066 	}
1067 
1068 	j->pin.front = j->pin.back = 1;
1069 out:
1070 	pr_verbose_init(c->opts, "ret %i", ret);
1071 	return ret;
1072 }
1073 
1074 /* debug: */
1075 
1076 ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
1077 {
1078 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
1079 	union journal_res_state *s = &j->reservations;
1080 	struct bch_dev *ca;
1081 	unsigned iter;
1082 	ssize_t ret = 0;
1083 
1084 	rcu_read_lock();
1085 	spin_lock(&j->lock);
1086 
1087 	ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1088 			 "active journal entries:\t%llu\n"
1089 			 "seq:\t\t\t%llu\n"
1090 			 "last_seq:\t\t%llu\n"
1091 			 "last_seq_ondisk:\t%llu\n"
1092 			 "reservation count:\t%u\n"
1093 			 "reservation offset:\t%u\n"
1094 			 "current entry u64s:\t%u\n"
1095 			 "io in flight:\t\t%i\n"
1096 			 "need write:\t\t%i\n"
1097 			 "dirty:\t\t\t%i\n"
1098 			 "replay done:\t\t%i\n",
1099 			 fifo_used(&j->pin),
1100 			 journal_cur_seq(j),
1101 			 journal_last_seq(j),
1102 			 j->last_seq_ondisk,
1103 			 journal_state_count(*s, s->idx),
1104 			 s->cur_entry_offset,
1105 			 j->cur_entry_u64s,
1106 			 s->prev_buf_unwritten,
1107 			 test_bit(JOURNAL_NEED_WRITE,	&j->flags),
1108 			 journal_entry_is_open(j),
1109 			 test_bit(JOURNAL_REPLAY_DONE,	&j->flags));
1110 
1111 	for_each_member_device_rcu(ca, c, iter,
1112 				   &c->rw_devs[BCH_DATA_JOURNAL]) {
1113 		struct journal_device *ja = &ca->journal;
1114 
1115 		if (!ja->nr)
1116 			continue;
1117 
1118 		ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1119 				 "dev %u:\n"
1120 				 "\tnr\t\t%u\n"
1121 				 "\tcur_idx\t\t%u (seq %llu)\n"
1122 				 "\tlast_idx\t%u (seq %llu)\n",
1123 				 iter, ja->nr,
1124 				 ja->cur_idx,	ja->bucket_seq[ja->cur_idx],
1125 				 ja->last_idx,	ja->bucket_seq[ja->last_idx]);
1126 	}
1127 
1128 	spin_unlock(&j->lock);
1129 	rcu_read_unlock();
1130 
1131 	return ret;
1132 }
1133 
1134 ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
1135 {
1136 	struct journal_entry_pin_list *pin_list;
1137 	struct journal_entry_pin *pin;
1138 	ssize_t ret = 0;
1139 	u64 i;
1140 
1141 	spin_lock(&j->lock);
1142 	fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
1143 		ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1144 				 "%llu: count %u\n",
1145 				 i, atomic_read(&pin_list->count));
1146 
1147 		list_for_each_entry(pin, &pin_list->list, list)
1148 			ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1149 					 "\t%p %pf\n",
1150 					 pin, pin->flush);
1151 
1152 		if (!list_empty(&pin_list->flushed))
1153 			ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1154 					 "flushed:\n");
1155 
1156 		list_for_each_entry(pin, &pin_list->flushed, list)
1157 			ret += scnprintf(buf + ret, PAGE_SIZE - ret,
1158 					 "\t%p %pf\n",
1159 					 pin, pin->flush);
1160 	}
1161 	spin_unlock(&j->lock);
1162 
1163 	return ret;
1164 }
1165