xref: /linux/fs/bcachefs/btree_write_buffer.c (revision 6f2a71a99ebd5dfaa7948a2e9c59eae94b741bd8)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include "bcachefs.h"
4 #include "bkey_buf.h"
5 #include "btree_locking.h"
6 #include "btree_update.h"
7 #include "btree_update_interior.h"
8 #include "btree_write_buffer.h"
9 #include "disk_accounting.h"
10 #include "enumerated_ref.h"
11 #include "error.h"
12 #include "extents.h"
13 #include "journal.h"
14 #include "journal_io.h"
15 #include "journal_reclaim.h"
16 
17 #include <linux/prefetch.h>
18 #include <linux/sort.h>
19 
20 static int bch2_btree_write_buffer_journal_flush(struct journal *,
21 				struct journal_entry_pin *, u64);
22 
__wb_key_ref_cmp(const struct wb_key_ref * l,const struct wb_key_ref * r)23 static inline bool __wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r)
24 {
25 	return (cmp_int(l->hi, r->hi) ?:
26 		cmp_int(l->mi, r->mi) ?:
27 		cmp_int(l->lo, r->lo)) >= 0;
28 }
29 
wb_key_ref_cmp(const struct wb_key_ref * l,const struct wb_key_ref * r)30 static inline bool wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r)
31 {
32 #ifdef CONFIG_X86_64
33 	int cmp;
34 
35 	asm("mov   (%[l]), %%rax;"
36 	    "sub   (%[r]), %%rax;"
37 	    "mov  8(%[l]), %%rax;"
38 	    "sbb  8(%[r]), %%rax;"
39 	    "mov 16(%[l]), %%rax;"
40 	    "sbb 16(%[r]), %%rax;"
41 	    : "=@ccae" (cmp)
42 	    : [l] "r" (l), [r] "r" (r)
43 	    : "rax", "cc");
44 
45 	EBUG_ON(cmp != __wb_key_ref_cmp(l, r));
46 	return cmp;
47 #else
48 	return __wb_key_ref_cmp(l, r);
49 #endif
50 }
51 
wb_key_seq_cmp(const void * _l,const void * _r)52 static int wb_key_seq_cmp(const void *_l, const void *_r)
53 {
54 	const struct btree_write_buffered_key *l = _l;
55 	const struct btree_write_buffered_key *r = _r;
56 
57 	return cmp_int(l->journal_seq, r->journal_seq);
58 }
59 
60 /* Compare excluding idx, the low 24 bits: */
wb_key_eq(const void * _l,const void * _r)61 static inline bool wb_key_eq(const void *_l, const void *_r)
62 {
63 	const struct wb_key_ref *l = _l;
64 	const struct wb_key_ref *r = _r;
65 
66 	return !((l->hi ^ r->hi)|
67 		 (l->mi ^ r->mi)|
68 		 ((l->lo >> 24) ^ (r->lo >> 24)));
69 }
70 
wb_sort(struct wb_key_ref * base,size_t num)71 static noinline void wb_sort(struct wb_key_ref *base, size_t num)
72 {
73 	size_t n = num, a = num / 2;
74 
75 	if (!a)		/* num < 2 || size == 0 */
76 		return;
77 
78 	for (;;) {
79 		size_t b, c, d;
80 
81 		if (a)			/* Building heap: sift down --a */
82 			--a;
83 		else if (--n)		/* Sorting: Extract root to --n */
84 			swap(base[0], base[n]);
85 		else			/* Sort complete */
86 			break;
87 
88 		/*
89 		 * Sift element at "a" down into heap.  This is the
90 		 * "bottom-up" variant, which significantly reduces
91 		 * calls to cmp_func(): we find the sift-down path all
92 		 * the way to the leaves (one compare per level), then
93 		 * backtrack to find where to insert the target element.
94 		 *
95 		 * Because elements tend to sift down close to the leaves,
96 		 * this uses fewer compares than doing two per level
97 		 * on the way down.  (A bit more than half as many on
98 		 * average, 3/4 worst-case.)
99 		 */
100 		for (b = a; c = 2*b + 1, (d = c + 1) < n;)
101 			b = wb_key_ref_cmp(base + c, base + d) ? c : d;
102 		if (d == n)		/* Special case last leaf with no sibling */
103 			b = c;
104 
105 		/* Now backtrack from "b" to the correct location for "a" */
106 		while (b != a && wb_key_ref_cmp(base + a, base + b))
107 			b = (b - 1) / 2;
108 		c = b;			/* Where "a" belongs */
109 		while (b != a) {	/* Shift it into place */
110 			b = (b - 1) / 2;
111 			swap(base[b], base[c]);
112 		}
113 	}
114 }
115 
wb_flush_one_slowpath(struct btree_trans * trans,struct btree_iter * iter,struct btree_write_buffered_key * wb)116 static noinline int wb_flush_one_slowpath(struct btree_trans *trans,
117 					  struct btree_iter *iter,
118 					  struct btree_write_buffered_key *wb)
119 {
120 	struct btree_path *path = btree_iter_path(trans, iter);
121 
122 	bch2_btree_node_unlock_write(trans, path, path->l[0].b);
123 
124 	trans->journal_res.seq = wb->journal_seq;
125 
126 	return bch2_trans_update(trans, iter, &wb->k,
127 				 BTREE_UPDATE_internal_snapshot_node) ?:
128 		bch2_trans_commit(trans, NULL, NULL,
129 				  BCH_TRANS_COMMIT_no_enospc|
130 				  BCH_TRANS_COMMIT_no_check_rw|
131 				  BCH_TRANS_COMMIT_no_journal_res|
132 				  BCH_TRANS_COMMIT_journal_reclaim);
133 }
134 
wb_flush_one(struct btree_trans * trans,struct btree_iter * iter,struct btree_write_buffered_key * wb,bool * write_locked,bool * accounting_accumulated,size_t * fast)135 static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *iter,
136 			       struct btree_write_buffered_key *wb,
137 			       bool *write_locked,
138 			       bool *accounting_accumulated,
139 			       size_t *fast)
140 {
141 	struct btree_path *path;
142 	int ret;
143 
144 	EBUG_ON(!wb->journal_seq);
145 	EBUG_ON(!trans->c->btree_write_buffer.flushing.pin.seq);
146 	EBUG_ON(trans->c->btree_write_buffer.flushing.pin.seq > wb->journal_seq);
147 
148 	ret = bch2_btree_iter_traverse(trans, iter);
149 	if (ret)
150 		return ret;
151 
152 	if (!*accounting_accumulated && wb->k.k.type == KEY_TYPE_accounting) {
153 		struct bkey u;
154 		struct bkey_s_c k = bch2_btree_path_peek_slot_exact(btree_iter_path(trans, iter), &u);
155 
156 		if (k.k->type == KEY_TYPE_accounting)
157 			bch2_accounting_accumulate(bkey_i_to_accounting(&wb->k),
158 						   bkey_s_c_to_accounting(k));
159 	}
160 	*accounting_accumulated = true;
161 
162 	/*
163 	 * We can't clone a path that has write locks: unshare it now, before
164 	 * set_pos and traverse():
165 	 */
166 	if (btree_iter_path(trans, iter)->ref > 1)
167 		iter->path = __bch2_btree_path_make_mut(trans, iter->path, true, _THIS_IP_);
168 
169 	path = btree_iter_path(trans, iter);
170 
171 	if (!*write_locked) {
172 		ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c);
173 		if (ret)
174 			return ret;
175 
176 		bch2_btree_node_prep_for_write(trans, path, path->l[0].b);
177 		*write_locked = true;
178 	}
179 
180 	if (unlikely(!bch2_btree_node_insert_fits(path->l[0].b, wb->k.k.u64s))) {
181 		*write_locked = false;
182 		return wb_flush_one_slowpath(trans, iter, wb);
183 	}
184 
185 	EBUG_ON(!bpos_eq(wb->k.k.p, path->pos));
186 
187 	bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq);
188 	(*fast)++;
189 	return 0;
190 }
191 
192 /*
193  * Update a btree with a write buffered key using the journal seq of the
194  * original write buffer insert.
195  *
196  * It is not safe to rejournal the key once it has been inserted into the write
197  * buffer because that may break recovery ordering. For example, the key may
198  * have already been modified in the active write buffer in a seq that comes
199  * before the current transaction. If we were to journal this key again and
200  * crash, recovery would process updates in the wrong order.
201  */
202 static int
btree_write_buffered_insert(struct btree_trans * trans,struct btree_write_buffered_key * wb)203 btree_write_buffered_insert(struct btree_trans *trans,
204 			  struct btree_write_buffered_key *wb)
205 {
206 	struct btree_iter iter;
207 	int ret;
208 
209 	bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k),
210 			     BTREE_ITER_cached|BTREE_ITER_intent);
211 
212 	trans->journal_res.seq = wb->journal_seq;
213 
214 	ret   = bch2_btree_iter_traverse(trans, &iter) ?:
215 		bch2_trans_update(trans, &iter, &wb->k,
216 				  BTREE_UPDATE_internal_snapshot_node);
217 	bch2_trans_iter_exit(trans, &iter);
218 	return ret;
219 }
220 
move_keys_from_inc_to_flushing(struct btree_write_buffer * wb)221 static void move_keys_from_inc_to_flushing(struct btree_write_buffer *wb)
222 {
223 	struct bch_fs *c = container_of(wb, struct bch_fs, btree_write_buffer);
224 	struct journal *j = &c->journal;
225 
226 	if (!wb->inc.keys.nr)
227 		return;
228 
229 	bch2_journal_pin_add(j, wb->inc.keys.data[0].journal_seq, &wb->flushing.pin,
230 			     bch2_btree_write_buffer_journal_flush);
231 
232 	darray_resize(&wb->flushing.keys, min_t(size_t, 1U << 20, wb->flushing.keys.nr + wb->inc.keys.nr));
233 	darray_resize(&wb->sorted, wb->flushing.keys.size);
234 
235 	if (!wb->flushing.keys.nr && wb->sorted.size >= wb->inc.keys.nr) {
236 		swap(wb->flushing.keys, wb->inc.keys);
237 		goto out;
238 	}
239 
240 	size_t nr = min(darray_room(wb->flushing.keys),
241 			wb->sorted.size - wb->flushing.keys.nr);
242 	nr = min(nr, wb->inc.keys.nr);
243 
244 	memcpy(&darray_top(wb->flushing.keys),
245 	       wb->inc.keys.data,
246 	       sizeof(wb->inc.keys.data[0]) * nr);
247 
248 	memmove(wb->inc.keys.data,
249 		wb->inc.keys.data + nr,
250 	       sizeof(wb->inc.keys.data[0]) * (wb->inc.keys.nr - nr));
251 
252 	wb->flushing.keys.nr	+= nr;
253 	wb->inc.keys.nr		-= nr;
254 out:
255 	if (!wb->inc.keys.nr)
256 		bch2_journal_pin_drop(j, &wb->inc.pin);
257 	else
258 		bch2_journal_pin_update(j, wb->inc.keys.data[0].journal_seq, &wb->inc.pin,
259 					bch2_btree_write_buffer_journal_flush);
260 
261 	if (j->watermark) {
262 		spin_lock(&j->lock);
263 		bch2_journal_set_watermark(j);
264 		spin_unlock(&j->lock);
265 	}
266 
267 	BUG_ON(wb->sorted.size < wb->flushing.keys.nr);
268 }
269 
bch2_btree_write_buffer_insert_err(struct bch_fs * c,enum btree_id btree,struct bkey_i * k)270 int bch2_btree_write_buffer_insert_err(struct bch_fs *c,
271 				       enum btree_id btree, struct bkey_i *k)
272 {
273 	struct printbuf buf = PRINTBUF;
274 
275 	prt_printf(&buf, "attempting to do write buffer update on non wb btree=");
276 	bch2_btree_id_to_text(&buf, btree);
277 	prt_str(&buf, "\n");
278 	bch2_bkey_val_to_text(&buf, c, bkey_i_to_s_c(k));
279 
280 	bch2_fs_inconsistent(c, "%s", buf.buf);
281 	printbuf_exit(&buf);
282 	return -EROFS;
283 }
284 
bch2_btree_write_buffer_flush_locked(struct btree_trans * trans)285 static int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans)
286 {
287 	struct bch_fs *c = trans->c;
288 	struct journal *j = &c->journal;
289 	struct btree_write_buffer *wb = &c->btree_write_buffer;
290 	struct btree_iter iter = {};
291 	size_t overwritten = 0, fast = 0, slowpath = 0, could_not_insert = 0;
292 	bool write_locked = false;
293 	bool accounting_replay_done = test_bit(BCH_FS_accounting_replay_done, &c->flags);
294 	int ret = 0;
295 
296 	ret = bch2_journal_error(&c->journal);
297 	if (ret)
298 		return ret;
299 
300 	bch2_trans_unlock(trans);
301 	bch2_trans_begin(trans);
302 
303 	mutex_lock(&wb->inc.lock);
304 	move_keys_from_inc_to_flushing(wb);
305 	mutex_unlock(&wb->inc.lock);
306 
307 	for (size_t i = 0; i < wb->flushing.keys.nr; i++) {
308 		wb->sorted.data[i].idx = i;
309 		wb->sorted.data[i].btree = wb->flushing.keys.data[i].btree;
310 		memcpy(&wb->sorted.data[i].pos, &wb->flushing.keys.data[i].k.k.p, sizeof(struct bpos));
311 	}
312 	wb->sorted.nr = wb->flushing.keys.nr;
313 
314 	/*
315 	 * We first sort so that we can detect and skip redundant updates, and
316 	 * then we attempt to flush in sorted btree order, as this is most
317 	 * efficient.
318 	 *
319 	 * However, since we're not flushing in the order they appear in the
320 	 * journal we won't be able to drop our journal pin until everything is
321 	 * flushed - which means this could deadlock the journal if we weren't
322 	 * passing BCH_TRANS_COMMIT_journal_reclaim. This causes the update to fail
323 	 * if it would block taking a journal reservation.
324 	 *
325 	 * If that happens, simply skip the key so we can optimistically insert
326 	 * as many keys as possible in the fast path.
327 	 */
328 	wb_sort(wb->sorted.data, wb->sorted.nr);
329 
330 	darray_for_each(wb->sorted, i) {
331 		struct btree_write_buffered_key *k = &wb->flushing.keys.data[i->idx];
332 
333 		if (unlikely(!btree_type_uses_write_buffer(k->btree))) {
334 			ret = bch2_btree_write_buffer_insert_err(trans->c, k->btree, &k->k);
335 			goto err;
336 		}
337 
338 		for (struct wb_key_ref *n = i + 1; n < min(i + 4, &darray_top(wb->sorted)); n++)
339 			prefetch(&wb->flushing.keys.data[n->idx]);
340 
341 		BUG_ON(!k->journal_seq);
342 
343 		if (!accounting_replay_done &&
344 		    k->k.k.type == KEY_TYPE_accounting) {
345 			slowpath++;
346 			continue;
347 		}
348 
349 		if (i + 1 < &darray_top(wb->sorted) &&
350 		    wb_key_eq(i, i + 1)) {
351 			struct btree_write_buffered_key *n = &wb->flushing.keys.data[i[1].idx];
352 
353 			if (k->k.k.type == KEY_TYPE_accounting &&
354 			    n->k.k.type == KEY_TYPE_accounting)
355 				bch2_accounting_accumulate(bkey_i_to_accounting(&n->k),
356 							   bkey_i_to_s_c_accounting(&k->k));
357 
358 			overwritten++;
359 			n->journal_seq = min_t(u64, n->journal_seq, k->journal_seq);
360 			k->journal_seq = 0;
361 			continue;
362 		}
363 
364 		if (write_locked) {
365 			struct btree_path *path = btree_iter_path(trans, &iter);
366 
367 			if (path->btree_id != i->btree ||
368 			    bpos_gt(k->k.k.p, path->l[0].b->key.k.p)) {
369 				bch2_btree_node_unlock_write(trans, path, path->l[0].b);
370 				write_locked = false;
371 
372 				ret = lockrestart_do(trans,
373 					bch2_btree_iter_traverse(trans, &iter) ?:
374 					bch2_foreground_maybe_merge(trans, iter.path, 0,
375 							BCH_WATERMARK_reclaim|
376 							BCH_TRANS_COMMIT_journal_reclaim|
377 							BCH_TRANS_COMMIT_no_check_rw|
378 							BCH_TRANS_COMMIT_no_enospc));
379 				if (ret)
380 					goto err;
381 			}
382 		}
383 
384 		if (!iter.path || iter.btree_id != k->btree) {
385 			bch2_trans_iter_exit(trans, &iter);
386 			bch2_trans_iter_init(trans, &iter, k->btree, k->k.k.p,
387 					     BTREE_ITER_intent|BTREE_ITER_all_snapshots);
388 		}
389 
390 		bch2_btree_iter_set_pos(trans, &iter, k->k.k.p);
391 		btree_iter_path(trans, &iter)->preserve = false;
392 
393 		bool accounting_accumulated = false;
394 		do {
395 			if (race_fault()) {
396 				ret = bch_err_throw(c, journal_reclaim_would_deadlock);
397 				break;
398 			}
399 
400 			ret = wb_flush_one(trans, &iter, k, &write_locked,
401 					   &accounting_accumulated, &fast);
402 			if (!write_locked)
403 				bch2_trans_begin(trans);
404 		} while (bch2_err_matches(ret, BCH_ERR_transaction_restart));
405 
406 		if (!ret) {
407 			k->journal_seq = 0;
408 		} else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) {
409 			slowpath++;
410 			ret = 0;
411 		} else
412 			break;
413 	}
414 
415 	if (write_locked) {
416 		struct btree_path *path = btree_iter_path(trans, &iter);
417 		bch2_btree_node_unlock_write(trans, path, path->l[0].b);
418 	}
419 	bch2_trans_iter_exit(trans, &iter);
420 
421 	if (ret)
422 		goto err;
423 
424 	if (slowpath) {
425 		/*
426 		 * Flush in the order they were present in the journal, so that
427 		 * we can release journal pins:
428 		 * The fastpath zapped the seq of keys that were successfully flushed so
429 		 * we can skip those here.
430 		 */
431 		trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, wb->flushing.keys.nr);
432 
433 		sort_nonatomic(wb->flushing.keys.data,
434 			       wb->flushing.keys.nr,
435 			       sizeof(wb->flushing.keys.data[0]),
436 			       wb_key_seq_cmp, NULL);
437 
438 		darray_for_each(wb->flushing.keys, i) {
439 			if (!i->journal_seq)
440 				continue;
441 
442 			if (!accounting_replay_done &&
443 			    i->k.k.type == KEY_TYPE_accounting) {
444 				could_not_insert++;
445 				continue;
446 			}
447 
448 			if (!could_not_insert)
449 				bch2_journal_pin_update(j, i->journal_seq, &wb->flushing.pin,
450 							bch2_btree_write_buffer_journal_flush);
451 
452 			bch2_trans_begin(trans);
453 
454 			ret = commit_do(trans, NULL, NULL,
455 					BCH_WATERMARK_reclaim|
456 					BCH_TRANS_COMMIT_journal_reclaim|
457 					BCH_TRANS_COMMIT_no_check_rw|
458 					BCH_TRANS_COMMIT_no_enospc|
459 					BCH_TRANS_COMMIT_no_journal_res ,
460 					btree_write_buffered_insert(trans, i));
461 			if (ret)
462 				goto err;
463 
464 			i->journal_seq = 0;
465 		}
466 
467 		/*
468 		 * If journal replay hasn't finished with accounting keys we
469 		 * can't flush accounting keys at all - condense them and leave
470 		 * them for next time.
471 		 *
472 		 * Q: Can the write buffer overflow?
473 		 * A Shouldn't be any actual risk. It's just new accounting
474 		 * updates that the write buffer can't flush, and those are only
475 		 * going to be generated by interior btree node updates as
476 		 * journal replay has to split/rewrite nodes to make room for
477 		 * its updates.
478 		 *
479 		 * And for those new acounting updates, updates to the same
480 		 * counters get accumulated as they're flushed from the journal
481 		 * to the write buffer - see the patch for eytzingcer tree
482 		 * accumulated. So we could only overflow if the number of
483 		 * distinct counters touched somehow was very large.
484 		 */
485 		if (could_not_insert) {
486 			struct btree_write_buffered_key *dst = wb->flushing.keys.data;
487 
488 			darray_for_each(wb->flushing.keys, i)
489 				if (i->journal_seq)
490 					*dst++ = *i;
491 			wb->flushing.keys.nr = dst - wb->flushing.keys.data;
492 		}
493 	}
494 err:
495 	if (ret || !could_not_insert) {
496 		bch2_journal_pin_drop(j, &wb->flushing.pin);
497 		wb->flushing.keys.nr = 0;
498 	}
499 
500 	bch2_fs_fatal_err_on(ret, c, "%s", bch2_err_str(ret));
501 	trace_write_buffer_flush(trans, wb->flushing.keys.nr, overwritten, fast, 0);
502 	return ret;
503 }
504 
bch2_journal_keys_to_write_buffer(struct bch_fs * c,struct journal_buf * buf)505 static int bch2_journal_keys_to_write_buffer(struct bch_fs *c, struct journal_buf *buf)
506 {
507 	struct journal_keys_to_wb dst;
508 	int ret = 0;
509 
510 	bch2_journal_keys_to_write_buffer_start(c, &dst, le64_to_cpu(buf->data->seq));
511 
512 	for_each_jset_entry_type(entry, buf->data, BCH_JSET_ENTRY_write_buffer_keys) {
513 		jset_entry_for_each_key(entry, k) {
514 			ret = bch2_journal_key_to_wb(c, &dst, entry->btree_id, k);
515 			if (ret)
516 				goto out;
517 		}
518 
519 		entry->type = BCH_JSET_ENTRY_btree_keys;
520 	}
521 out:
522 	ret = bch2_journal_keys_to_write_buffer_end(c, &dst) ?: ret;
523 	return ret;
524 }
525 
fetch_wb_keys_from_journal(struct bch_fs * c,u64 max_seq)526 static int fetch_wb_keys_from_journal(struct bch_fs *c, u64 max_seq)
527 {
528 	struct journal *j = &c->journal;
529 	struct journal_buf *buf;
530 	bool blocked;
531 	int ret = 0;
532 
533 	while (!ret && (buf = bch2_next_write_buffer_flush_journal_buf(j, max_seq, &blocked))) {
534 		ret = bch2_journal_keys_to_write_buffer(c, buf);
535 
536 		if (!blocked && !ret) {
537 			spin_lock(&j->lock);
538 			buf->need_flush_to_write_buffer = false;
539 			spin_unlock(&j->lock);
540 		}
541 
542 		mutex_unlock(&j->buf_lock);
543 
544 		if (blocked) {
545 			bch2_journal_unblock(j);
546 			break;
547 		}
548 	}
549 
550 	return ret;
551 }
552 
btree_write_buffer_flush_seq(struct btree_trans * trans,u64 max_seq,bool * did_work)553 static int btree_write_buffer_flush_seq(struct btree_trans *trans, u64 max_seq,
554 					bool *did_work)
555 {
556 	struct bch_fs *c = trans->c;
557 	struct btree_write_buffer *wb = &c->btree_write_buffer;
558 	int ret = 0, fetch_from_journal_err;
559 
560 	do {
561 		bch2_trans_unlock(trans);
562 
563 		fetch_from_journal_err = fetch_wb_keys_from_journal(c, max_seq);
564 
565 		*did_work |= wb->inc.keys.nr || wb->flushing.keys.nr;
566 
567 		/*
568 		 * On memory allocation failure, bch2_btree_write_buffer_flush_locked()
569 		 * is not guaranteed to empty wb->inc:
570 		 */
571 		mutex_lock(&wb->flushing.lock);
572 		ret = bch2_btree_write_buffer_flush_locked(trans);
573 		mutex_unlock(&wb->flushing.lock);
574 	} while (!ret &&
575 		 (fetch_from_journal_err ||
576 		  (wb->inc.pin.seq && wb->inc.pin.seq <= max_seq) ||
577 		  (wb->flushing.pin.seq && wb->flushing.pin.seq <= max_seq)));
578 
579 	return ret;
580 }
581 
bch2_btree_write_buffer_journal_flush(struct journal * j,struct journal_entry_pin * _pin,u64 seq)582 static int bch2_btree_write_buffer_journal_flush(struct journal *j,
583 				struct journal_entry_pin *_pin, u64 seq)
584 {
585 	struct bch_fs *c = container_of(j, struct bch_fs, journal);
586 	bool did_work = false;
587 
588 	return bch2_trans_run(c, btree_write_buffer_flush_seq(trans, seq, &did_work));
589 }
590 
bch2_btree_write_buffer_flush_sync(struct btree_trans * trans)591 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans)
592 {
593 	struct bch_fs *c = trans->c;
594 	bool did_work = false;
595 
596 	trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_);
597 
598 	return btree_write_buffer_flush_seq(trans, journal_cur_seq(&c->journal), &did_work);
599 }
600 
601 /*
602  * The write buffer requires flushing when going RO: keys in the journal for the
603  * write buffer don't have a journal pin yet
604  */
bch2_btree_write_buffer_flush_going_ro(struct bch_fs * c)605 bool bch2_btree_write_buffer_flush_going_ro(struct bch_fs *c)
606 {
607 	if (bch2_journal_error(&c->journal))
608 		return false;
609 
610 	bool did_work = false;
611 	bch2_trans_run(c, btree_write_buffer_flush_seq(trans,
612 				journal_cur_seq(&c->journal), &did_work));
613 	return did_work;
614 }
615 
bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans * trans)616 int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans)
617 {
618 	struct bch_fs *c = trans->c;
619 	struct btree_write_buffer *wb = &c->btree_write_buffer;
620 	int ret = 0;
621 
622 	if (mutex_trylock(&wb->flushing.lock)) {
623 		ret = bch2_btree_write_buffer_flush_locked(trans);
624 		mutex_unlock(&wb->flushing.lock);
625 	}
626 
627 	return ret;
628 }
629 
bch2_btree_write_buffer_tryflush(struct btree_trans * trans)630 int bch2_btree_write_buffer_tryflush(struct btree_trans *trans)
631 {
632 	struct bch_fs *c = trans->c;
633 
634 	if (!enumerated_ref_tryget(&c->writes, BCH_WRITE_REF_btree_write_buffer))
635 		return bch_err_throw(c, erofs_no_writes);
636 
637 	int ret = bch2_btree_write_buffer_flush_nocheck_rw(trans);
638 	enumerated_ref_put(&c->writes, BCH_WRITE_REF_btree_write_buffer);
639 	return ret;
640 }
641 
642 /*
643  * In check and repair code, when checking references to write buffer btrees we
644  * need to issue a flush before we have a definitive error: this issues a flush
645  * if this is a key we haven't yet checked.
646  */
bch2_btree_write_buffer_maybe_flush(struct btree_trans * trans,struct bkey_s_c referring_k,struct bkey_buf * last_flushed)647 int bch2_btree_write_buffer_maybe_flush(struct btree_trans *trans,
648 					struct bkey_s_c referring_k,
649 					struct bkey_buf *last_flushed)
650 {
651 	struct bch_fs *c = trans->c;
652 	struct bkey_buf tmp;
653 	int ret = 0;
654 
655 	bch2_bkey_buf_init(&tmp);
656 
657 	if (!bkey_and_val_eq(referring_k, bkey_i_to_s_c(last_flushed->k))) {
658 		if (trace_write_buffer_maybe_flush_enabled()) {
659 			struct printbuf buf = PRINTBUF;
660 
661 			bch2_bkey_val_to_text(&buf, c, referring_k);
662 			trace_write_buffer_maybe_flush(trans, _RET_IP_, buf.buf);
663 			printbuf_exit(&buf);
664 		}
665 
666 		bch2_bkey_buf_reassemble(&tmp, c, referring_k);
667 
668 		if (bkey_is_btree_ptr(referring_k.k)) {
669 			bch2_trans_unlock(trans);
670 			bch2_btree_interior_updates_flush(c);
671 		}
672 
673 		ret = bch2_btree_write_buffer_flush_sync(trans);
674 		if (ret)
675 			goto err;
676 
677 		bch2_bkey_buf_copy(last_flushed, c, tmp.k);
678 
679 		/* can we avoid the unconditional restart? */
680 		trace_and_count(c, trans_restart_write_buffer_flush, trans, _RET_IP_);
681 		ret = bch_err_throw(c, transaction_restart_write_buffer_flush);
682 	}
683 err:
684 	bch2_bkey_buf_exit(&tmp, c);
685 	return ret;
686 }
687 
bch2_btree_write_buffer_flush_work(struct work_struct * work)688 static void bch2_btree_write_buffer_flush_work(struct work_struct *work)
689 {
690 	struct bch_fs *c = container_of(work, struct bch_fs, btree_write_buffer.flush_work);
691 	struct btree_write_buffer *wb = &c->btree_write_buffer;
692 	int ret;
693 
694 	mutex_lock(&wb->flushing.lock);
695 	do {
696 		ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans));
697 	} while (!ret && bch2_btree_write_buffer_should_flush(c));
698 	mutex_unlock(&wb->flushing.lock);
699 
700 	enumerated_ref_put(&c->writes, BCH_WRITE_REF_btree_write_buffer);
701 }
702 
wb_accounting_sort(struct btree_write_buffer * wb)703 static void wb_accounting_sort(struct btree_write_buffer *wb)
704 {
705 	eytzinger0_sort(wb->accounting.data, wb->accounting.nr,
706 			sizeof(wb->accounting.data[0]),
707 			wb_key_cmp, NULL);
708 }
709 
bch2_accounting_key_to_wb_slowpath(struct bch_fs * c,enum btree_id btree,struct bkey_i_accounting * k)710 int bch2_accounting_key_to_wb_slowpath(struct bch_fs *c, enum btree_id btree,
711 				       struct bkey_i_accounting *k)
712 {
713 	struct btree_write_buffer *wb = &c->btree_write_buffer;
714 	struct btree_write_buffered_key new = { .btree = btree };
715 
716 	bkey_copy(&new.k, &k->k_i);
717 
718 	int ret = darray_push(&wb->accounting, new);
719 	if (ret)
720 		return ret;
721 
722 	wb_accounting_sort(wb);
723 	return 0;
724 }
725 
bch2_journal_key_to_wb_slowpath(struct bch_fs * c,struct journal_keys_to_wb * dst,enum btree_id btree,struct bkey_i * k)726 int bch2_journal_key_to_wb_slowpath(struct bch_fs *c,
727 			     struct journal_keys_to_wb *dst,
728 			     enum btree_id btree, struct bkey_i *k)
729 {
730 	struct btree_write_buffer *wb = &c->btree_write_buffer;
731 	int ret;
732 retry:
733 	ret = darray_make_room_gfp(&dst->wb->keys, 1, GFP_KERNEL);
734 	if (!ret && dst->wb == &wb->flushing)
735 		ret = darray_resize(&wb->sorted, wb->flushing.keys.size);
736 
737 	if (unlikely(ret)) {
738 		if (dst->wb == &c->btree_write_buffer.flushing) {
739 			mutex_unlock(&dst->wb->lock);
740 			dst->wb = &c->btree_write_buffer.inc;
741 			bch2_journal_pin_add(&c->journal, dst->seq, &dst->wb->pin,
742 					     bch2_btree_write_buffer_journal_flush);
743 			goto retry;
744 		}
745 
746 		return ret;
747 	}
748 
749 	dst->room = darray_room(dst->wb->keys);
750 	if (dst->wb == &wb->flushing)
751 		dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
752 	BUG_ON(!dst->room);
753 	BUG_ON(!dst->seq);
754 
755 	struct btree_write_buffered_key *wb_k = &darray_top(dst->wb->keys);
756 	wb_k->journal_seq	= dst->seq;
757 	wb_k->btree		= btree;
758 	bkey_copy(&wb_k->k, k);
759 	dst->wb->keys.nr++;
760 	dst->room--;
761 	return 0;
762 }
763 
bch2_journal_keys_to_write_buffer_start(struct bch_fs * c,struct journal_keys_to_wb * dst,u64 seq)764 void bch2_journal_keys_to_write_buffer_start(struct bch_fs *c, struct journal_keys_to_wb *dst, u64 seq)
765 {
766 	struct btree_write_buffer *wb = &c->btree_write_buffer;
767 
768 	if (mutex_trylock(&wb->flushing.lock)) {
769 		mutex_lock(&wb->inc.lock);
770 		move_keys_from_inc_to_flushing(wb);
771 
772 		/*
773 		 * Attempt to skip wb->inc, and add keys directly to
774 		 * wb->flushing, saving us a copy later:
775 		 */
776 
777 		if (!wb->inc.keys.nr) {
778 			dst->wb = &wb->flushing;
779 		} else {
780 			mutex_unlock(&wb->flushing.lock);
781 			dst->wb = &wb->inc;
782 		}
783 	} else {
784 		mutex_lock(&wb->inc.lock);
785 		dst->wb = &wb->inc;
786 	}
787 
788 	dst->room = darray_room(dst->wb->keys);
789 	if (dst->wb == &wb->flushing)
790 		dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr);
791 	dst->seq = seq;
792 
793 	bch2_journal_pin_add(&c->journal, seq, &dst->wb->pin,
794 			     bch2_btree_write_buffer_journal_flush);
795 
796 	darray_for_each(wb->accounting, i)
797 		memset(&i->k.v, 0, bkey_val_bytes(&i->k.k));
798 }
799 
bch2_journal_keys_to_write_buffer_end(struct bch_fs * c,struct journal_keys_to_wb * dst)800 int bch2_journal_keys_to_write_buffer_end(struct bch_fs *c, struct journal_keys_to_wb *dst)
801 {
802 	struct btree_write_buffer *wb = &c->btree_write_buffer;
803 	unsigned live_accounting_keys = 0;
804 	int ret = 0;
805 
806 	darray_for_each(wb->accounting, i)
807 		if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&i->k))) {
808 			i->journal_seq = dst->seq;
809 			live_accounting_keys++;
810 			ret = __bch2_journal_key_to_wb(c, dst, i->btree, &i->k);
811 			if (ret)
812 				break;
813 		}
814 
815 	if (live_accounting_keys * 2 < wb->accounting.nr) {
816 		struct btree_write_buffered_key *dst = wb->accounting.data;
817 
818 		darray_for_each(wb->accounting, src)
819 			if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&src->k)))
820 				*dst++ = *src;
821 		wb->accounting.nr = dst - wb->accounting.data;
822 		wb_accounting_sort(wb);
823 	}
824 
825 	if (!dst->wb->keys.nr)
826 		bch2_journal_pin_drop(&c->journal, &dst->wb->pin);
827 
828 	if (bch2_btree_write_buffer_should_flush(c) &&
829 	    __enumerated_ref_tryget(&c->writes, BCH_WRITE_REF_btree_write_buffer) &&
830 	    !queue_work(system_unbound_wq, &c->btree_write_buffer.flush_work))
831 		enumerated_ref_put(&c->writes, BCH_WRITE_REF_btree_write_buffer);
832 
833 	if (dst->wb == &wb->flushing)
834 		mutex_unlock(&wb->flushing.lock);
835 	mutex_unlock(&wb->inc.lock);
836 
837 	return ret;
838 }
839 
wb_keys_resize(struct btree_write_buffer_keys * wb,size_t new_size)840 static int wb_keys_resize(struct btree_write_buffer_keys *wb, size_t new_size)
841 {
842 	if (wb->keys.size >= new_size)
843 		return 0;
844 
845 	if (!mutex_trylock(&wb->lock))
846 		return -EINTR;
847 
848 	int ret = darray_resize(&wb->keys, new_size);
849 	mutex_unlock(&wb->lock);
850 	return ret;
851 }
852 
bch2_btree_write_buffer_resize(struct bch_fs * c,size_t new_size)853 int bch2_btree_write_buffer_resize(struct bch_fs *c, size_t new_size)
854 {
855 	struct btree_write_buffer *wb = &c->btree_write_buffer;
856 
857 	return wb_keys_resize(&wb->flushing, new_size) ?:
858 		wb_keys_resize(&wb->inc, new_size);
859 }
860 
bch2_fs_btree_write_buffer_exit(struct bch_fs * c)861 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c)
862 {
863 	struct btree_write_buffer *wb = &c->btree_write_buffer;
864 
865 	BUG_ON((wb->inc.keys.nr || wb->flushing.keys.nr) &&
866 	       !bch2_journal_error(&c->journal));
867 
868 	darray_exit(&wb->accounting);
869 	darray_exit(&wb->sorted);
870 	darray_exit(&wb->flushing.keys);
871 	darray_exit(&wb->inc.keys);
872 }
873 
bch2_fs_btree_write_buffer_init_early(struct bch_fs * c)874 void bch2_fs_btree_write_buffer_init_early(struct bch_fs *c)
875 {
876 	struct btree_write_buffer *wb = &c->btree_write_buffer;
877 
878 	mutex_init(&wb->inc.lock);
879 	mutex_init(&wb->flushing.lock);
880 	INIT_WORK(&wb->flush_work, bch2_btree_write_buffer_flush_work);
881 }
882 
bch2_fs_btree_write_buffer_init(struct bch_fs * c)883 int bch2_fs_btree_write_buffer_init(struct bch_fs *c)
884 {
885 	struct btree_write_buffer *wb = &c->btree_write_buffer;
886 
887 	/* Will be resized by journal as needed: */
888 	unsigned initial_size = 1 << 16;
889 
890 	return  darray_make_room(&wb->inc.keys, initial_size) ?:
891 		darray_make_room(&wb->flushing.keys, initial_size) ?:
892 		darray_make_room(&wb->sorted, initial_size);
893 }
894