xref: /linux/fs/bcachefs/btree_write_buffer.c (revision 9738280aae592b579a25b5b1b6584c894827d3c7)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include "bcachefs.h"
4 #include "bkey_buf.h"
5 #include "btree_locking.h"
6 #include "btree_update.h"
7 #include "btree_update_interior.h"
8 #include "btree_write_buffer.h"
9 #include "disk_accounting.h"
10 #include "enumerated_ref.h"
11 #include "error.h"
12 #include "extents.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 
17 #include <linux/prefetch.h>
18 #include <linux/sort.h>
19 
20 static int bch2_btree_write_buffer_journal_flush(struct journal *,
21 				struct journal_entry_pin *, u64);
22 
23 static inline bool __wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r)
24 {
25 	return (cmp_int(l->hi, r->hi) ?:
26 		cmp_int(l->mi, r->mi) ?:
27 		cmp_int(l->lo, r->lo)) >= 0;
28 }
29 
30 static inline bool wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r)
31 {
32 #ifdef CONFIG_X86_64
33 	int cmp;
34 
35 	asm("mov   (%[l]), %%rax;"
36 	    "sub   (%[r]), %%rax;"
37 	    "mov  8(%[l]), %%rax;"
38 	    "sbb  8(%[r]), %%rax;"
39 	    "mov 16(%[l]), %%rax;"
40 	    "sbb 16(%[r]), %%rax;"
41 	    : "=@ccae" (cmp)
42 	    : [l] "r" (l), [r] "r" (r)
43 	    : "rax", "cc");
44 
45 	EBUG_ON(cmp != __wb_key_ref_cmp(l, r));
46 	return cmp;
47 #else
48 	return __wb_key_ref_cmp(l, r);
49 #endif
50 }
51 
52 static int wb_key_seq_cmp(const void *_l, const void *_r)
53 {
54 	const struct btree_write_buffered_key *l = _l;
55 	const struct btree_write_buffered_key *r = _r;
56 
57 	return cmp_int(l->journal_seq, r->journal_seq);
58 }
59 
60 /* Compare excluding idx, the low 24 bits: */
61 static inline bool wb_key_eq(const void *_l, const void *_r)
62 {
63 	const struct wb_key_ref *l = _l;
64 	const struct wb_key_ref *r = _r;
65 
66 	return !((l->hi ^ r->hi)|
67 		 (l->mi ^ r->mi)|
68 		 ((l->lo >> 24) ^ (r->lo >> 24)));
69 }
70 
71 static noinline void wb_sort(struct wb_key_ref *base, size_t num)
72 {
73 	size_t n = num, a = num / 2;
74 
75 	if (!a)		/* num < 2 || size == 0 */
76 		return;
77 
78 	for (;;) {
79 		size_t b, c, d;
80 
81 		if (a)			/* Building heap: sift down --a */
82 			--a;
83 		else if (--n)		/* Sorting: Extract root to --n */
84 			swap(base[0], base[n]);
85 		else			/* Sort complete */
86 			break;
87 
88 		/*
89 		 * Sift element at "a" down into heap.  This is the
90 		 * "bottom-up" variant, which significantly reduces
91 		 * calls to cmp_func(): we find the sift-down path all
92 		 * the way to the leaves (one compare per level), then
93 		 * backtrack to find where to insert the target element.
94 		 *
95 		 * Because elements tend to sift down close to the leaves,
96 		 * this uses fewer compares than doing two per level
97 		 * on the way down.  (A bit more than half as many on
98 		 * average, 3/4 worst-case.)
99 		 */
100 		for (b = a; c = 2*b + 1, (d = c + 1) < n;)
101 			b = wb_key_ref_cmp(base + c, base + d) ? c : d;
102 		if (d == n)		/* Special case last leaf with no sibling */
103 			b = c;
104 
105 		/* Now backtrack from "b" to the correct location for "a" */
106 		while (b != a && wb_key_ref_cmp(base + a, base + b))
107 			b = (b - 1) / 2;
108 		c = b;			/* Where "a" belongs */
109 		while (b != a) {	/* Shift it into place */
110 			b = (b - 1) / 2;
111 			swap(base[b], base[c]);
112 		}
113 	}
114 }
115 
116 static noinline int wb_flush_one_slowpath(struct btree_trans *trans,
117 					  struct btree_iter *iter,
118 					  struct btree_write_buffered_key *wb)
119 {
120 	struct btree_path *path = btree_iter_path(trans, iter);
121 
122 	bch2_btree_node_unlock_write(trans, path, path->l[0].b);
123 
124 	trans->journal_res.seq = wb->journal_seq;
125 
126 	return bch2_trans_update(trans, iter, &wb->k,
127 				 BTREE_UPDATE_internal_snapshot_node) ?:
128 		bch2_trans_commit(trans, NULL, NULL,
129 				  BCH_TRANS_COMMIT_no_enospc|
130 				  BCH_TRANS_COMMIT_no_check_rw|
131 				  BCH_TRANS_COMMIT_no_journal_res|
132 				  BCH_TRANS_COMMIT_journal_reclaim);
133 }
134 
135 static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *iter,
136 			       struct btree_write_buffered_key *wb,
137 			       bool *write_locked,
138 			       bool *accounting_accumulated,
139 			       size_t *fast)
140 {
141 	struct btree_path *path;
142 	int ret;
143 
144 	EBUG_ON(!wb->journal_seq);
145 	EBUG_ON(!trans->c->btree_write_buffer.flushing.pin.seq);
146 	EBUG_ON(trans->c->btree_write_buffer.flushing.pin.seq > wb->journal_seq);
147 
148 	ret = bch2_btree_iter_traverse(trans, iter);
149 	if (ret)
150 		return ret;
151 
152 	if (!*accounting_accumulated && wb->k.k.type == KEY_TYPE_accounting) {
153 		struct bkey u;
154 		struct bkey_s_c k = bch2_btree_path_peek_slot_exact(btree_iter_path(trans, iter), &u);
155 
156 		if (k.k->type == KEY_TYPE_accounting)
157 			bch2_accounting_accumulate(bkey_i_to_accounting(&wb->k),
158 						   bkey_s_c_to_accounting(k));
159 	}
160 	*accounting_accumulated = true;
161 
162 	/*
163 	 * We can't clone a path that has write locks: unshare it now, before
164 	 * set_pos and traverse():
165 	 */
166 	if (btree_iter_path(trans, iter)->ref > 1)
167 		iter->path = __bch2_btree_path_make_mut(trans, iter->path, true, _THIS_IP_);
168 
169 	path = btree_iter_path(trans, iter);
170 
171 	if (!*write_locked) {
172 		ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c);
173 		if (ret)
174 			return ret;
175 
176 		bch2_btree_node_prep_for_write(trans, path, path->l[0].b);
177 		*write_locked = true;
178 	}
179 
180 	if (unlikely(!bch2_btree_node_insert_fits(path->l[0].b, wb->k.k.u64s))) {
181 		*write_locked = false;
182 		return wb_flush_one_slowpath(trans, iter, wb);
183 	}
184 
185 	EBUG_ON(!bpos_eq(wb->k.k.p, path->pos));
186 
187 	bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq);
188 	(*fast)++;
189 	return 0;
190 }
191 
192 /*
193  * Update a btree with a write buffered key using the journal seq of the
194  * original write buffer insert.
195  *
196  * It is not safe to rejournal the key once it has been inserted into the write
197  * buffer because that may break recovery ordering. For example, the key may
198  * have already been modified in the active write buffer in a seq that comes
199  * before the current transaction. If we were to journal this key again and
200  * crash, recovery would process updates in the wrong order.
201  */
202 static int
203 btree_write_buffered_insert(struct btree_trans *trans,
204 			  struct btree_write_buffered_key *wb)
205 {
206 	struct btree_iter iter;
207 	int ret;
208 
209 	bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k),
210 			     BTREE_ITER_cached|BTREE_ITER_intent);
211 
212 	trans->journal_res.seq = wb->journal_seq;
213 
214 	ret   = bch2_btree_iter_traverse(trans, &iter) ?:
215 		bch2_trans_update(trans, &iter, &wb->k,
216 				  BTREE_UPDATE_internal_snapshot_node);
217 	bch2_trans_iter_exit(trans, &iter);
218 	return ret;
219 }
220 
221 static void move_keys_from_inc_to_flushing(struct btree_write_buffer *wb)
222 {
223 	struct bch_fs *c = container_of(wb, struct bch_fs, btree_write_buffer);
224 	struct journal *j = &c->journal;
225 
226 	if (!wb->inc.keys.nr)
227 		return;
228 
229 	bch2_journal_pin_add(j, wb->inc.keys.data[0].journal_seq, &wb->flushing.pin,
230 			     bch2_btree_write_buffer_journal_flush);
231 
232 	darray_resize(&wb->flushing.keys, min_t(size_t, 1U << 20, wb->flushing.keys.nr + wb->inc.keys.nr));
233 	darray_resize(&wb->sorted, wb->flushing.keys.size);
234 
235 	if (!wb->flushing.keys.nr && wb->sorted.size >= wb->inc.keys.nr) {
236 		swap(wb->flushing.keys, wb->inc.keys);
237 		goto out;
238 	}
239 
240 	size_t nr = min(darray_room(wb->flushing.keys),
241 			wb->sorted.size - wb->flushing.keys.nr);
242 	nr = min(nr, wb->inc.keys.nr);
243 
244 	memcpy(&darray_top(wb->flushing.keys),
245 	       wb->inc.keys.data,
246 	       sizeof(wb->inc.keys.data[0]) * nr);
247 
248 	memmove(wb->inc.keys.data,
249 		wb->inc.keys.data + nr,
250 	       sizeof(wb->inc.keys.data[0]) * (wb->inc.keys.nr - nr));
251 
252 	wb->flushing.keys.nr	+= nr;
253 	wb->inc.keys.nr		-= nr;
254 out:
255 	if (!wb->inc.keys.nr)
256 		bch2_journal_pin_drop(j, &wb->inc.pin);
257 	else
258 		bch2_journal_pin_update(j, wb->inc.keys.data[0].journal_seq, &wb->inc.pin,
259 					bch2_btree_write_buffer_journal_flush);
260 
261 	if (j->watermark) {
262 		spin_lock(&j->lock);
263 		bch2_journal_set_watermark(j);
264 		spin_unlock(&j->lock);
265 	}
266 
267 	BUG_ON(wb->sorted.size < wb->flushing.keys.nr);
268 }
269 
270 int bch2_btree_write_buffer_insert_err(struct btree_trans *trans,
271 				       enum btree_id btree, struct bkey_i *k)
272 {
273 	struct bch_fs *c = trans->c;
274 	struct printbuf buf = PRINTBUF;
275 
276 	prt_printf(&buf, "attempting to do write buffer update on non wb btree=");
277 	bch2_btree_id_to_text(&buf, btree);
278 	prt_str(&buf, "\n");
279 	bch2_bkey_val_to_text(&buf, c, bkey_i_to_s_c(k));
280 
281 	bch2_fs_inconsistent(c, "%s", buf.buf);
282 	printbuf_exit(&buf);
283 	return -EROFS;
284 }
285 
286 static int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
287 {
288 	struct bch_fs *c = trans->c;
289 	struct journal *j = &c->journal;
290 	struct btree_write_buffer *wb = &c->btree_write_buffer;
291 	struct btree_iter iter = {};
292 	size_t overwritten = 0, fast = 0, slowpath = 0, could_not_insert = 0;
293 	bool write_locked = false;
294 	bool accounting_replay_done = test_bit(BCH_FS_accounting_replay_done, &c->flags);
295 	int ret = 0;
296 
297 	ret = bch2_journal_error(&c->journal);
298 	if (ret)
299 		return ret;
300 
301 	bch2_trans_unlock(trans);
302 	bch2_trans_begin(trans);
303 
304 	mutex_lock(&wb->inc.lock);
305 	move_keys_from_inc_to_flushing(wb);
306 	mutex_unlock(&wb->inc.lock);
307 
308 	for (size_t i = 0; i < wb->flushing.keys.nr; i++) {
309 		wb->sorted.data[i].idx = i;
310 		wb->sorted.data[i].btree = wb->flushing.keys.data[i].btree;
311 		memcpy(&wb->sorted.data[i].pos, &wb->flushing.keys.data[i].k.k.p, sizeof(struct bpos));
312 	}
313 	wb->sorted.nr = wb->flushing.keys.nr;
314 
315 	/*
316 	 * We first sort so that we can detect and skip redundant updates, and
317 	 * then we attempt to flush in sorted btree order, as this is most
318 	 * efficient.
319 	 *
320 	 * However, since we're not flushing in the order they appear in the
321 	 * journal we won't be able to drop our journal pin until everything is
322 	 * flushed - which means this could deadlock the journal if we weren't
323 	 * passing BCH_TRANS_COMMIT_journal_reclaim. This causes the update to fail
324 	 * if it would block taking a journal reservation.
325 	 *
326 	 * If that happens, simply skip the key so we can optimistically insert
327 	 * as many keys as possible in the fast path.
328 	 */
329 	wb_sort(wb->sorted.data, wb->sorted.nr);
330 
331 	darray_for_each(wb->sorted, i) {
332 		struct btree_write_buffered_key *k = &wb->flushing.keys.data[i->idx];
333 
334 		if (unlikely(!btree_type_uses_write_buffer(k->btree))) {
335 			ret = bch2_btree_write_buffer_insert_err(trans, k->btree, &k->k);
336 			goto err;
337 		}
338 
339 		for (struct wb_key_ref *n = i + 1; n < min(i + 4, &darray_top(wb->sorted)); n++)
340 			prefetch(&wb->flushing.keys.data[n->idx]);
341 
342 		BUG_ON(!k->journal_seq);
343 
344 		if (!accounting_replay_done &&
345 		    k->k.k.type == KEY_TYPE_accounting) {
346 			slowpath++;
347 			continue;
348 		}
349 
350 		if (i + 1 < &darray_top(wb->sorted) &&
351 		    wb_key_eq(i, i + 1)) {
352 			struct btree_write_buffered_key *n = &wb->flushing.keys.data[i[1].idx];
353 
354 			if (k->k.k.type == KEY_TYPE_accounting &&
355 			    n->k.k.type == KEY_TYPE_accounting)
356 				bch2_accounting_accumulate(bkey_i_to_accounting(&n->k),
357 							   bkey_i_to_s_c_accounting(&k->k));
358 
359 			overwritten++;
360 			n->journal_seq = min_t(u64, n->journal_seq, k->journal_seq);
361 			k->journal_seq = 0;
362 			continue;
363 		}
364 
365 		if (write_locked) {
366 			struct btree_path *path = btree_iter_path(trans, &iter);
367 
368 			if (path->btree_id != i->btree ||
369 			    bpos_gt(k->k.k.p, path->l[0].b->key.k.p)) {
370 				bch2_btree_node_unlock_write(trans, path, path->l[0].b);
371 				write_locked = false;
372 
373 				ret = lockrestart_do(trans,
374 					bch2_btree_iter_traverse(trans, &iter) ?:
375 					bch2_foreground_maybe_merge(trans, iter.path, 0,
376 							BCH_WATERMARK_reclaim|
377 							BCH_TRANS_COMMIT_journal_reclaim|
378 							BCH_TRANS_COMMIT_no_check_rw|
379 							BCH_TRANS_COMMIT_no_enospc));
380 				if (ret)
381 					goto err;
382 			}
383 		}
384 
385 		if (!iter.path || iter.btree_id != k->btree) {
386 			bch2_trans_iter_exit(trans, &iter);
387 			bch2_trans_iter_init(trans, &iter, k->btree, k->k.k.p,
388 					     BTREE_ITER_intent|BTREE_ITER_all_snapshots);
389 		}
390 
391 		bch2_btree_iter_set_pos(trans, &iter, k->k.k.p);
392 		btree_iter_path(trans, &iter)->preserve = false;
393 
394 		bool accounting_accumulated = false;
395 		do {
396 			if (race_fault()) {
397 				ret = bch_err_throw(c, journal_reclaim_would_deadlock);
398 				break;
399 			}
400 
401 			ret = wb_flush_one(trans, &iter, k, &write_locked,
402 					   &accounting_accumulated, &fast);
403 			if (!write_locked)
404 				bch2_trans_begin(trans);
405 		} while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
406 
407 		if (!ret) {
408 			k->journal_seq = 0;
409 		} else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) {
410 			slowpath++;
411 			ret = 0;
412 		} else
413 			break;
414 	}
415 
416 	if (write_locked) {
417 		struct btree_path *path = btree_iter_path(trans, &iter);
418 		bch2_btree_node_unlock_write(trans, path, path->l[0].b);
419 	}
420 	bch2_trans_iter_exit(trans, &iter);
421 
422 	if (ret)
423 		goto err;
424 
425 	if (slowpath) {
426 		/*
427 		 * Flush in the order they were present in the journal, so that
428 		 * we can release journal pins:
429 		 * The fastpath zapped the seq of keys that were successfully flushed so
430 		 * we can skip those here.
431 		 */
432 		trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, wb->flushing.keys.nr);
433 
434 		sort_nonatomic(wb->flushing.keys.data,
435 			       wb->flushing.keys.nr,
436 			       sizeof(wb->flushing.keys.data[0]),
437 			       wb_key_seq_cmp, NULL);
438 
439 		darray_for_each(wb->flushing.keys, i) {
440 			if (!i->journal_seq)
441 				continue;
442 
443 			if (!accounting_replay_done &&
444 			    i->k.k.type == KEY_TYPE_accounting) {
445 				could_not_insert++;
446 				continue;
447 			}
448 
449 			if (!could_not_insert)
450 				bch2_journal_pin_update(j, i->journal_seq, &wb->flushing.pin,
451 							bch2_btree_write_buffer_journal_flush);
452 
453 			bch2_trans_begin(trans);
454 
455 			ret = commit_do(trans, NULL, NULL,
456 					BCH_WATERMARK_reclaim|
457 					BCH_TRANS_COMMIT_journal_reclaim|
458 					BCH_TRANS_COMMIT_no_check_rw|
459 					BCH_TRANS_COMMIT_no_enospc|
460 					BCH_TRANS_COMMIT_no_journal_res ,
461 					btree_write_buffered_insert(trans, i));
462 			if (ret)
463 				goto err;
464 
465 			i->journal_seq = 0;
466 		}
467 
468 		/*
469 		 * If journal replay hasn't finished with accounting keys we
470 		 * can't flush accounting keys at all - condense them and leave
471 		 * them for next time.
472 		 *
473 		 * Q: Can the write buffer overflow?
474 		 * A Shouldn't be any actual risk. It's just new accounting
475 		 * updates that the write buffer can't flush, and those are only
476 		 * going to be generated by interior btree node updates as
477 		 * journal replay has to split/rewrite nodes to make room for
478 		 * its updates.
479 		 *
480 		 * And for those new acounting updates, updates to the same
481 		 * counters get accumulated as they're flushed from the journal
482 		 * to the write buffer - see the patch for eytzingcer tree
483 		 * accumulated. So we could only overflow if the number of
484 		 * distinct counters touched somehow was very large.
485 		 */
486 		if (could_not_insert) {
487 			struct btree_write_buffered_key *dst = wb->flushing.keys.data;
488 
489 			darray_for_each(wb->flushing.keys, i)
490 				if (i->journal_seq)
491 					*dst++ = *i;
492 			wb->flushing.keys.nr = dst - wb->flushing.keys.data;
493 		}
494 	}
495 err:
496 	if (ret || !could_not_insert) {
497 		bch2_journal_pin_drop(j, &wb->flushing.pin);
498 		wb->flushing.keys.nr = 0;
499 	}
500 
501 	bch2_fs_fatal_err_on(ret, c, "%s", bch2_err_str(ret));
502 	trace_write_buffer_flush(trans, wb->flushing.keys.nr, overwritten, fast, 0);
503 	return ret;
504 }
505 
506 static int bch2_journal_keys_to_write_buffer(struct bch_fs *c, struct journal_buf *buf)
507 {
508 	struct journal_keys_to_wb dst;
509 	int ret = 0;
510 
511 	bch2_journal_keys_to_write_buffer_start(c, &dst, le64_to_cpu(buf->data->seq));
512 
513 	for_each_jset_entry_type(entry, buf->data, BCH_JSET_ENTRY_write_buffer_keys) {
514 		jset_entry_for_each_key(entry, k) {
515 			ret = bch2_journal_key_to_wb(c, &dst, entry->btree_id, k);
516 			if (ret)
517 				goto out;
518 		}
519 
520 		entry->type = BCH_JSET_ENTRY_btree_keys;
521 	}
522 out:
523 	ret = bch2_journal_keys_to_write_buffer_end(c, &dst) ?: ret;
524 	return ret;
525 }
526 
527 static int fetch_wb_keys_from_journal(struct bch_fs *c, u64 max_seq)
528 {
529 	struct journal *j = &c->journal;
530 	struct journal_buf *buf;
531 	bool blocked;
532 	int ret = 0;
533 
534 	while (!ret && (buf = bch2_next_write_buffer_flush_journal_buf(j, max_seq, &blocked))) {
535 		ret = bch2_journal_keys_to_write_buffer(c, buf);
536 
537 		if (!blocked && !ret) {
538 			spin_lock(&j->lock);
539 			buf->need_flush_to_write_buffer = false;
540 			spin_unlock(&j->lock);
541 		}
542 
543 		mutex_unlock(&j->buf_lock);
544 
545 		if (blocked) {
546 			bch2_journal_unblock(j);
547 			break;
548 		}
549 	}
550 
551 	return ret;
552 }
553 
554 static int btree_write_buffer_flush_seq(struct btree_trans *trans, u64 max_seq,
555 					bool *did_work)
556 {
557 	struct bch_fs *c = trans->c;
558 	struct btree_write_buffer *wb = &c->btree_write_buffer;
559 	int ret = 0, fetch_from_journal_err;
560 
561 	do {
562 		bch2_trans_unlock(trans);
563 
564 		fetch_from_journal_err = fetch_wb_keys_from_journal(c, max_seq);
565 
566 		*did_work |= wb->inc.keys.nr || wb->flushing.keys.nr;
567 
568 		/*
569 		 * On memory allocation failure, bch2_btree_write_buffer_flush_locked()
570 		 * is not guaranteed to empty wb->inc:
571 		 */
572 		mutex_lock(&wb->flushing.lock);
573 		ret = bch2_btree_write_buffer_flush_locked(trans);
574 		mutex_unlock(&wb->flushing.lock);
575 	} while (!ret &&
576 		 (fetch_from_journal_err ||
577 		  (wb->inc.pin.seq && wb->inc.pin.seq <= max_seq) ||
578 		  (wb->flushing.pin.seq && wb->flushing.pin.seq <= max_seq)));
579 
580 	return ret;
581 }
582 
583 static int bch2_btree_write_buffer_journal_flush(struct journal *j,
584 				struct journal_entry_pin *_pin, u64 seq)
585 {
586 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
587 	bool did_work = false;
588 
589 	return bch2_trans_run(c, btree_write_buffer_flush_seq(trans, seq, &did_work));
590 }
591 
592 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
593 {
594 	struct bch_fs *c = trans->c;
595 	bool did_work = false;
596 
597 	trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_);
598 
599 	return btree_write_buffer_flush_seq(trans, journal_cur_seq(&c->journal), &did_work);
600 }
601 
602 /*
603  * The write buffer requires flushing when going RO: keys in the journal for the
604  * write buffer don't have a journal pin yet
605  */
606 bool bch2_btree_write_buffer_flush_going_ro(struct bch_fs *c)
607 {
608 	if (bch2_journal_error(&c->journal))
609 		return false;
610 
611 	bool did_work = false;
612 	bch2_trans_run(c, btree_write_buffer_flush_seq(trans,
613 				journal_cur_seq(&c->journal), &did_work));
614 	return did_work;
615 }
616 
617 int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans)
618 {
619 	struct bch_fs *c = trans->c;
620 	struct btree_write_buffer *wb = &c->btree_write_buffer;
621 	int ret = 0;
622 
623 	if (mutex_trylock(&wb->flushing.lock)) {
624 		ret = bch2_btree_write_buffer_flush_locked(trans);
625 		mutex_unlock(&wb->flushing.lock);
626 	}
627 
628 	return ret;
629 }
630 
631 int bch2_btree_write_buffer_tryflush(struct btree_trans *trans)
632 {
633 	struct bch_fs *c = trans->c;
634 
635 	if (!enumerated_ref_tryget(&c->writes, BCH_WRITE_REF_btree_write_buffer))
636 		return bch_err_throw(c, erofs_no_writes);
637 
638 	int ret = bch2_btree_write_buffer_flush_nocheck_rw(trans);
639 	enumerated_ref_put(&c->writes, BCH_WRITE_REF_btree_write_buffer);
640 	return ret;
641 }
642 
643 /*
644  * In check and repair code, when checking references to write buffer btrees we
645  * need to issue a flush before we have a definitive error: this issues a flush
646  * if this is a key we haven't yet checked.
647  */
648 int bch2_btree_write_buffer_maybe_flush(struct btree_trans *trans,
649 					struct bkey_s_c referring_k,
650 					struct bkey_buf *last_flushed)
651 {
652 	struct bch_fs *c = trans->c;
653 	struct bkey_buf tmp;
654 	int ret = 0;
655 
656 	bch2_bkey_buf_init(&tmp);
657 
658 	if (!bkey_and_val_eq(referring_k, bkey_i_to_s_c(last_flushed->k))) {
659 		if (trace_write_buffer_maybe_flush_enabled()) {
660 			struct printbuf buf = PRINTBUF;
661 
662 			bch2_bkey_val_to_text(&buf, c, referring_k);
663 			trace_write_buffer_maybe_flush(trans, _RET_IP_, buf.buf);
664 			printbuf_exit(&buf);
665 		}
666 
667 		bch2_bkey_buf_reassemble(&tmp, c, referring_k);
668 
669 		if (bkey_is_btree_ptr(referring_k.k)) {
670 			bch2_trans_unlock(trans);
671 			bch2_btree_interior_updates_flush(c);
672 		}
673 
674 		ret = bch2_btree_write_buffer_flush_sync(trans);
675 		if (ret)
676 			goto err;
677 
678 		bch2_bkey_buf_copy(last_flushed, c, tmp.k);
679 		ret = bch_err_throw(c, transaction_restart_write_buffer_flush);
680 	}
681 err:
682 	bch2_bkey_buf_exit(&tmp, c);
683 	return ret;
684 }
685 
686 static void bch2_btree_write_buffer_flush_work(struct work_struct *work)
687 {
688 	struct bch_fs *c = container_of(work, struct bch_fs, btree_write_buffer.flush_work);
689 	struct btree_write_buffer *wb = &c->btree_write_buffer;
690 	int ret;
691 
692 	mutex_lock(&wb->flushing.lock);
693 	do {
694 		ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans));
695 	} while (!ret && bch2_btree_write_buffer_should_flush(c));
696 	mutex_unlock(&wb->flushing.lock);
697 
698 	enumerated_ref_put(&c->writes, BCH_WRITE_REF_btree_write_buffer);
699 }
700 
701 static void wb_accounting_sort(struct btree_write_buffer *wb)
702 {
703 	eytzinger0_sort(wb->accounting.data, wb->accounting.nr,
704 			sizeof(wb->accounting.data[0]),
705 			wb_key_cmp, NULL);
706 }
707 
708 int bch2_accounting_key_to_wb_slowpath(struct bch_fs *c, enum btree_id btree,
709 				       struct bkey_i_accounting *k)
710 {
711 	struct btree_write_buffer *wb = &c->btree_write_buffer;
712 	struct btree_write_buffered_key new = { .btree = btree };
713 
714 	bkey_copy(&new.k, &k->k_i);
715 
716 	int ret = darray_push(&wb->accounting, new);
717 	if (ret)
718 		return ret;
719 
720 	wb_accounting_sort(wb);
721 	return 0;
722 }
723 
724 int bch2_journal_key_to_wb_slowpath(struct bch_fs *c,
725 			     struct journal_keys_to_wb *dst,
726 			     enum btree_id btree, struct bkey_i *k)
727 {
728 	struct btree_write_buffer *wb = &c->btree_write_buffer;
729 	int ret;
730 retry:
731 	ret = darray_make_room_gfp(&dst->wb->keys, 1, GFP_KERNEL);
732 	if (!ret && dst->wb == &wb->flushing)
733 		ret = darray_resize(&wb->sorted, wb->flushing.keys.size);
734 
735 	if (unlikely(ret)) {
736 		if (dst->wb == &c->btree_write_buffer.flushing) {
737 			mutex_unlock(&dst->wb->lock);
738 			dst->wb = &c->btree_write_buffer.inc;
739 			bch2_journal_pin_add(&c->journal, dst->seq, &dst->wb->pin,
740 					     bch2_btree_write_buffer_journal_flush);
741 			goto retry;
742 		}
743 
744 		return ret;
745 	}
746 
747 	dst->room = darray_room(dst->wb->keys);
748 	if (dst->wb == &wb->flushing)
749 		dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
750 	BUG_ON(!dst->room);
751 	BUG_ON(!dst->seq);
752 
753 	struct btree_write_buffered_key *wb_k = &darray_top(dst->wb->keys);
754 	wb_k->journal_seq	= dst->seq;
755 	wb_k->btree		= btree;
756 	bkey_copy(&wb_k->k, k);
757 	dst->wb->keys.nr++;
758 	dst->room--;
759 	return 0;
760 }
761 
762 void bch2_journal_keys_to_write_buffer_start(struct bch_fs *c, struct journal_keys_to_wb *dst, u64 seq)
763 {
764 	struct btree_write_buffer *wb = &c->btree_write_buffer;
765 
766 	if (mutex_trylock(&wb->flushing.lock)) {
767 		mutex_lock(&wb->inc.lock);
768 		move_keys_from_inc_to_flushing(wb);
769 
770 		/*
771 		 * Attempt to skip wb->inc, and add keys directly to
772 		 * wb->flushing, saving us a copy later:
773 		 */
774 
775 		if (!wb->inc.keys.nr) {
776 			dst->wb = &wb->flushing;
777 		} else {
778 			mutex_unlock(&wb->flushing.lock);
779 			dst->wb = &wb->inc;
780 		}
781 	} else {
782 		mutex_lock(&wb->inc.lock);
783 		dst->wb = &wb->inc;
784 	}
785 
786 	dst->room = darray_room(dst->wb->keys);
787 	if (dst->wb == &wb->flushing)
788 		dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
789 	dst->seq = seq;
790 
791 	bch2_journal_pin_add(&c->journal, seq, &dst->wb->pin,
792 			     bch2_btree_write_buffer_journal_flush);
793 
794 	darray_for_each(wb->accounting, i)
795 		memset(&i->k.v, 0, bkey_val_bytes(&i->k.k));
796 }
797 
798 int bch2_journal_keys_to_write_buffer_end(struct bch_fs *c, struct journal_keys_to_wb *dst)
799 {
800 	struct btree_write_buffer *wb = &c->btree_write_buffer;
801 	unsigned live_accounting_keys = 0;
802 	int ret = 0;
803 
804 	darray_for_each(wb->accounting, i)
805 		if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&i->k))) {
806 			i->journal_seq = dst->seq;
807 			live_accounting_keys++;
808 			ret = __bch2_journal_key_to_wb(c, dst, i->btree, &i->k);
809 			if (ret)
810 				break;
811 		}
812 
813 	if (live_accounting_keys * 2 < wb->accounting.nr) {
814 		struct btree_write_buffered_key *dst = wb->accounting.data;
815 
816 		darray_for_each(wb->accounting, src)
817 			if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&src->k)))
818 				*dst++ = *src;
819 		wb->accounting.nr = dst - wb->accounting.data;
820 		wb_accounting_sort(wb);
821 	}
822 
823 	if (!dst->wb->keys.nr)
824 		bch2_journal_pin_drop(&c->journal, &dst->wb->pin);
825 
826 	if (bch2_btree_write_buffer_should_flush(c) &&
827 	    __enumerated_ref_tryget(&c->writes, BCH_WRITE_REF_btree_write_buffer) &&
828 	    !queue_work(system_unbound_wq, &c->btree_write_buffer.flush_work))
829 		enumerated_ref_put(&c->writes, BCH_WRITE_REF_btree_write_buffer);
830 
831 	if (dst->wb == &wb->flushing)
832 		mutex_unlock(&wb->flushing.lock);
833 	mutex_unlock(&wb->inc.lock);
834 
835 	return ret;
836 }
837 
838 static int wb_keys_resize(struct btree_write_buffer_keys *wb, size_t new_size)
839 {
840 	if (wb->keys.size >= new_size)
841 		return 0;
842 
843 	if (!mutex_trylock(&wb->lock))
844 		return -EINTR;
845 
846 	int ret = darray_resize(&wb->keys, new_size);
847 	mutex_unlock(&wb->lock);
848 	return ret;
849 }
850 
851 int bch2_btree_write_buffer_resize(struct bch_fs *c, size_t new_size)
852 {
853 	struct btree_write_buffer *wb = &c->btree_write_buffer;
854 
855 	return wb_keys_resize(&wb->flushing, new_size) ?:
856 		wb_keys_resize(&wb->inc, new_size);
857 }
858 
859 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
860 {
861 	struct btree_write_buffer *wb = &c->btree_write_buffer;
862 
863 	BUG_ON((wb->inc.keys.nr || wb->flushing.keys.nr) &&
864 	       !bch2_journal_error(&c->journal));
865 
866 	darray_exit(&wb->accounting);
867 	darray_exit(&wb->sorted);
868 	darray_exit(&wb->flushing.keys);
869 	darray_exit(&wb->inc.keys);
870 }
871 
872 void bch2_fs_btree_write_buffer_init_early(struct bch_fs *c)
873 {
874 	struct btree_write_buffer *wb = &c->btree_write_buffer;
875 
876 	mutex_init(&wb->inc.lock);
877 	mutex_init(&wb->flushing.lock);
878 	INIT_WORK(&wb->flush_work, bch2_btree_write_buffer_flush_work);
879 }
880 
881 int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
882 {
883 	struct btree_write_buffer *wb = &c->btree_write_buffer;
884 
885 	/* Will be resized by journal as needed: */
886 	unsigned initial_size = 1 << 16;
887 
888 	return  darray_make_room(&wb->inc.keys, initial_size) ?:
889 		darray_make_room(&wb->flushing.keys, initial_size) ?:
890 		darray_make_room(&wb->sorted, initial_size);
891 }
892