1 // SPDX-License-Identifier: GPL-2.0 2 3 #include "bcachefs.h" 4 #include "bkey_buf.h" 5 #include "btree_locking.h" 6 #include "btree_update.h" 7 #include "btree_update_interior.h" 8 #include "btree_write_buffer.h" 9 #include "disk_accounting.h" 10 #include "error.h" 11 #include "extents.h" 12 #include "journal.h" 13 #include "journal_io.h" 14 #include "journal_reclaim.h" 15 16 #include <linux/prefetch.h> 17 #include <linux/sort.h> 18 19 static int bch2_btree_write_buffer_journal_flush(struct journal *, 20 struct journal_entry_pin *, u64); 21 22 static inline bool __wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r) 23 { 24 return (cmp_int(l->hi, r->hi) ?: 25 cmp_int(l->mi, r->mi) ?: 26 cmp_int(l->lo, r->lo)) >= 0; 27 } 28 29 static inline bool wb_key_ref_cmp(const struct wb_key_ref *l, const struct wb_key_ref *r) 30 { 31 #ifdef CONFIG_X86_64 32 int cmp; 33 34 asm("mov (%[l]), %%rax;" 35 "sub (%[r]), %%rax;" 36 "mov 8(%[l]), %%rax;" 37 "sbb 8(%[r]), %%rax;" 38 "mov 16(%[l]), %%rax;" 39 "sbb 16(%[r]), %%rax;" 40 : "=@ccae" (cmp) 41 : [l] "r" (l), [r] "r" (r) 42 : "rax", "cc"); 43 44 EBUG_ON(cmp != __wb_key_ref_cmp(l, r)); 45 return cmp; 46 #else 47 return __wb_key_ref_cmp(l, r); 48 #endif 49 } 50 51 static int wb_key_seq_cmp(const void *_l, const void *_r) 52 { 53 const struct btree_write_buffered_key *l = _l; 54 const struct btree_write_buffered_key *r = _r; 55 56 return cmp_int(l->journal_seq, r->journal_seq); 57 } 58 59 /* Compare excluding idx, the low 24 bits: */ 60 static inline bool wb_key_eq(const void *_l, const void *_r) 61 { 62 const struct wb_key_ref *l = _l; 63 const struct wb_key_ref *r = _r; 64 65 return !((l->hi ^ r->hi)| 66 (l->mi ^ r->mi)| 67 ((l->lo >> 24) ^ (r->lo >> 24))); 68 } 69 70 static noinline void wb_sort(struct wb_key_ref *base, size_t num) 71 { 72 size_t n = num, a = num / 2; 73 74 if (!a) /* num < 2 || size == 0 */ 75 return; 76 77 for (;;) { 78 size_t b, c, d; 79 80 if (a) /* Building heap: sift down --a */ 81 --a; 82 else if (--n) /* Sorting: Extract root to --n */ 83 swap(base[0], base[n]); 84 else /* Sort complete */ 85 break; 86 87 /* 88 * Sift element at "a" down into heap. This is the 89 * "bottom-up" variant, which significantly reduces 90 * calls to cmp_func(): we find the sift-down path all 91 * the way to the leaves (one compare per level), then 92 * backtrack to find where to insert the target element. 93 * 94 * Because elements tend to sift down close to the leaves, 95 * this uses fewer compares than doing two per level 96 * on the way down. (A bit more than half as many on 97 * average, 3/4 worst-case.) 98 */ 99 for (b = a; c = 2*b + 1, (d = c + 1) < n;) 100 b = wb_key_ref_cmp(base + c, base + d) ? c : d; 101 if (d == n) /* Special case last leaf with no sibling */ 102 b = c; 103 104 /* Now backtrack from "b" to the correct location for "a" */ 105 while (b != a && wb_key_ref_cmp(base + a, base + b)) 106 b = (b - 1) / 2; 107 c = b; /* Where "a" belongs */ 108 while (b != a) { /* Shift it into place */ 109 b = (b - 1) / 2; 110 swap(base[b], base[c]); 111 } 112 } 113 } 114 115 static noinline int wb_flush_one_slowpath(struct btree_trans *trans, 116 struct btree_iter *iter, 117 struct btree_write_buffered_key *wb) 118 { 119 struct btree_path *path = btree_iter_path(trans, iter); 120 121 bch2_btree_node_unlock_write(trans, path, path->l[0].b); 122 123 trans->journal_res.seq = wb->journal_seq; 124 125 return bch2_trans_update(trans, iter, &wb->k, 126 BTREE_UPDATE_internal_snapshot_node) ?: 127 bch2_trans_commit(trans, NULL, NULL, 128 BCH_TRANS_COMMIT_no_enospc| 129 BCH_TRANS_COMMIT_no_check_rw| 130 BCH_TRANS_COMMIT_no_journal_res| 131 BCH_TRANS_COMMIT_journal_reclaim); 132 } 133 134 static inline int wb_flush_one(struct btree_trans *trans, struct btree_iter *iter, 135 struct btree_write_buffered_key *wb, 136 bool *write_locked, 137 bool *accounting_accumulated, 138 size_t *fast) 139 { 140 struct btree_path *path; 141 int ret; 142 143 EBUG_ON(!wb->journal_seq); 144 EBUG_ON(!trans->c->btree_write_buffer.flushing.pin.seq); 145 EBUG_ON(trans->c->btree_write_buffer.flushing.pin.seq > wb->journal_seq); 146 147 ret = bch2_btree_iter_traverse(iter); 148 if (ret) 149 return ret; 150 151 if (!*accounting_accumulated && wb->k.k.type == KEY_TYPE_accounting) { 152 struct bkey u; 153 struct bkey_s_c k = bch2_btree_path_peek_slot_exact(btree_iter_path(trans, iter), &u); 154 155 if (k.k->type == KEY_TYPE_accounting) 156 bch2_accounting_accumulate(bkey_i_to_accounting(&wb->k), 157 bkey_s_c_to_accounting(k)); 158 } 159 *accounting_accumulated = true; 160 161 /* 162 * We can't clone a path that has write locks: unshare it now, before 163 * set_pos and traverse(): 164 */ 165 if (btree_iter_path(trans, iter)->ref > 1) 166 iter->path = __bch2_btree_path_make_mut(trans, iter->path, true, _THIS_IP_); 167 168 path = btree_iter_path(trans, iter); 169 170 if (!*write_locked) { 171 ret = bch2_btree_node_lock_write(trans, path, &path->l[0].b->c); 172 if (ret) 173 return ret; 174 175 bch2_btree_node_prep_for_write(trans, path, path->l[0].b); 176 *write_locked = true; 177 } 178 179 if (unlikely(!bch2_btree_node_insert_fits(path->l[0].b, wb->k.k.u64s))) { 180 *write_locked = false; 181 return wb_flush_one_slowpath(trans, iter, wb); 182 } 183 184 bch2_btree_insert_key_leaf(trans, path, &wb->k, wb->journal_seq); 185 (*fast)++; 186 return 0; 187 } 188 189 /* 190 * Update a btree with a write buffered key using the journal seq of the 191 * original write buffer insert. 192 * 193 * It is not safe to rejournal the key once it has been inserted into the write 194 * buffer because that may break recovery ordering. For example, the key may 195 * have already been modified in the active write buffer in a seq that comes 196 * before the current transaction. If we were to journal this key again and 197 * crash, recovery would process updates in the wrong order. 198 */ 199 static int 200 btree_write_buffered_insert(struct btree_trans *trans, 201 struct btree_write_buffered_key *wb) 202 { 203 struct btree_iter iter; 204 int ret; 205 206 bch2_trans_iter_init(trans, &iter, wb->btree, bkey_start_pos(&wb->k.k), 207 BTREE_ITER_cached|BTREE_ITER_intent); 208 209 trans->journal_res.seq = wb->journal_seq; 210 211 ret = bch2_btree_iter_traverse(&iter) ?: 212 bch2_trans_update(trans, &iter, &wb->k, 213 BTREE_UPDATE_internal_snapshot_node); 214 bch2_trans_iter_exit(trans, &iter); 215 return ret; 216 } 217 218 static void move_keys_from_inc_to_flushing(struct btree_write_buffer *wb) 219 { 220 struct bch_fs *c = container_of(wb, struct bch_fs, btree_write_buffer); 221 struct journal *j = &c->journal; 222 223 if (!wb->inc.keys.nr) 224 return; 225 226 bch2_journal_pin_add(j, wb->inc.keys.data[0].journal_seq, &wb->flushing.pin, 227 bch2_btree_write_buffer_journal_flush); 228 229 darray_resize(&wb->flushing.keys, min_t(size_t, 1U << 20, wb->flushing.keys.nr + wb->inc.keys.nr)); 230 darray_resize(&wb->sorted, wb->flushing.keys.size); 231 232 if (!wb->flushing.keys.nr && wb->sorted.size >= wb->inc.keys.nr) { 233 swap(wb->flushing.keys, wb->inc.keys); 234 goto out; 235 } 236 237 size_t nr = min(darray_room(wb->flushing.keys), 238 wb->sorted.size - wb->flushing.keys.nr); 239 nr = min(nr, wb->inc.keys.nr); 240 241 memcpy(&darray_top(wb->flushing.keys), 242 wb->inc.keys.data, 243 sizeof(wb->inc.keys.data[0]) * nr); 244 245 memmove(wb->inc.keys.data, 246 wb->inc.keys.data + nr, 247 sizeof(wb->inc.keys.data[0]) * (wb->inc.keys.nr - nr)); 248 249 wb->flushing.keys.nr += nr; 250 wb->inc.keys.nr -= nr; 251 out: 252 if (!wb->inc.keys.nr) 253 bch2_journal_pin_drop(j, &wb->inc.pin); 254 else 255 bch2_journal_pin_update(j, wb->inc.keys.data[0].journal_seq, &wb->inc.pin, 256 bch2_btree_write_buffer_journal_flush); 257 258 if (j->watermark) { 259 spin_lock(&j->lock); 260 bch2_journal_set_watermark(j); 261 spin_unlock(&j->lock); 262 } 263 264 BUG_ON(wb->sorted.size < wb->flushing.keys.nr); 265 } 266 267 int bch2_btree_write_buffer_insert_err(struct btree_trans *trans, 268 enum btree_id btree, struct bkey_i *k) 269 { 270 struct bch_fs *c = trans->c; 271 struct printbuf buf = PRINTBUF; 272 273 prt_printf(&buf, "attempting to do write buffer update on non wb btree="); 274 bch2_btree_id_to_text(&buf, btree); 275 prt_str(&buf, "\n"); 276 bch2_bkey_val_to_text(&buf, c, bkey_i_to_s_c(k)); 277 278 bch2_fs_inconsistent(c, "%s", buf.buf); 279 printbuf_exit(&buf); 280 return -EROFS; 281 } 282 283 static int bch2_btree_write_buffer_flush_locked(struct btree_trans *trans) 284 { 285 struct bch_fs *c = trans->c; 286 struct journal *j = &c->journal; 287 struct btree_write_buffer *wb = &c->btree_write_buffer; 288 struct btree_iter iter = { NULL }; 289 size_t overwritten = 0, fast = 0, slowpath = 0, could_not_insert = 0; 290 bool write_locked = false; 291 bool accounting_replay_done = test_bit(BCH_FS_accounting_replay_done, &c->flags); 292 int ret = 0; 293 294 ret = bch2_journal_error(&c->journal); 295 if (ret) 296 return ret; 297 298 bch2_trans_unlock(trans); 299 bch2_trans_begin(trans); 300 301 mutex_lock(&wb->inc.lock); 302 move_keys_from_inc_to_flushing(wb); 303 mutex_unlock(&wb->inc.lock); 304 305 for (size_t i = 0; i < wb->flushing.keys.nr; i++) { 306 wb->sorted.data[i].idx = i; 307 wb->sorted.data[i].btree = wb->flushing.keys.data[i].btree; 308 memcpy(&wb->sorted.data[i].pos, &wb->flushing.keys.data[i].k.k.p, sizeof(struct bpos)); 309 } 310 wb->sorted.nr = wb->flushing.keys.nr; 311 312 /* 313 * We first sort so that we can detect and skip redundant updates, and 314 * then we attempt to flush in sorted btree order, as this is most 315 * efficient. 316 * 317 * However, since we're not flushing in the order they appear in the 318 * journal we won't be able to drop our journal pin until everything is 319 * flushed - which means this could deadlock the journal if we weren't 320 * passing BCH_TRANS_COMMIT_journal_reclaim. This causes the update to fail 321 * if it would block taking a journal reservation. 322 * 323 * If that happens, simply skip the key so we can optimistically insert 324 * as many keys as possible in the fast path. 325 */ 326 wb_sort(wb->sorted.data, wb->sorted.nr); 327 328 darray_for_each(wb->sorted, i) { 329 struct btree_write_buffered_key *k = &wb->flushing.keys.data[i->idx]; 330 331 if (unlikely(!btree_type_uses_write_buffer(k->btree))) { 332 ret = bch2_btree_write_buffer_insert_err(trans, k->btree, &k->k); 333 goto err; 334 } 335 336 for (struct wb_key_ref *n = i + 1; n < min(i + 4, &darray_top(wb->sorted)); n++) 337 prefetch(&wb->flushing.keys.data[n->idx]); 338 339 BUG_ON(!k->journal_seq); 340 341 if (!accounting_replay_done && 342 k->k.k.type == KEY_TYPE_accounting) { 343 slowpath++; 344 continue; 345 } 346 347 if (i + 1 < &darray_top(wb->sorted) && 348 wb_key_eq(i, i + 1)) { 349 struct btree_write_buffered_key *n = &wb->flushing.keys.data[i[1].idx]; 350 351 if (k->k.k.type == KEY_TYPE_accounting && 352 n->k.k.type == KEY_TYPE_accounting) 353 bch2_accounting_accumulate(bkey_i_to_accounting(&n->k), 354 bkey_i_to_s_c_accounting(&k->k)); 355 356 overwritten++; 357 n->journal_seq = min_t(u64, n->journal_seq, k->journal_seq); 358 k->journal_seq = 0; 359 continue; 360 } 361 362 if (write_locked) { 363 struct btree_path *path = btree_iter_path(trans, &iter); 364 365 if (path->btree_id != i->btree || 366 bpos_gt(k->k.k.p, path->l[0].b->key.k.p)) { 367 bch2_btree_node_unlock_write(trans, path, path->l[0].b); 368 write_locked = false; 369 370 ret = lockrestart_do(trans, 371 bch2_btree_iter_traverse(&iter) ?: 372 bch2_foreground_maybe_merge(trans, iter.path, 0, 373 BCH_WATERMARK_reclaim| 374 BCH_TRANS_COMMIT_journal_reclaim| 375 BCH_TRANS_COMMIT_no_check_rw| 376 BCH_TRANS_COMMIT_no_enospc)); 377 if (ret) 378 goto err; 379 } 380 } 381 382 if (!iter.path || iter.btree_id != k->btree) { 383 bch2_trans_iter_exit(trans, &iter); 384 bch2_trans_iter_init(trans, &iter, k->btree, k->k.k.p, 385 BTREE_ITER_intent|BTREE_ITER_all_snapshots); 386 } 387 388 bch2_btree_iter_set_pos(&iter, k->k.k.p); 389 btree_iter_path(trans, &iter)->preserve = false; 390 391 bool accounting_accumulated = false; 392 do { 393 if (race_fault()) { 394 ret = -BCH_ERR_journal_reclaim_would_deadlock; 395 break; 396 } 397 398 ret = wb_flush_one(trans, &iter, k, &write_locked, 399 &accounting_accumulated, &fast); 400 if (!write_locked) 401 bch2_trans_begin(trans); 402 } while (bch2_err_matches(ret, BCH_ERR_transaction_restart)); 403 404 if (!ret) { 405 k->journal_seq = 0; 406 } else if (ret == -BCH_ERR_journal_reclaim_would_deadlock) { 407 slowpath++; 408 ret = 0; 409 } else 410 break; 411 } 412 413 if (write_locked) { 414 struct btree_path *path = btree_iter_path(trans, &iter); 415 bch2_btree_node_unlock_write(trans, path, path->l[0].b); 416 } 417 bch2_trans_iter_exit(trans, &iter); 418 419 if (ret) 420 goto err; 421 422 if (slowpath) { 423 /* 424 * Flush in the order they were present in the journal, so that 425 * we can release journal pins: 426 * The fastpath zapped the seq of keys that were successfully flushed so 427 * we can skip those here. 428 */ 429 trace_and_count(c, write_buffer_flush_slowpath, trans, slowpath, wb->flushing.keys.nr); 430 431 sort(wb->flushing.keys.data, 432 wb->flushing.keys.nr, 433 sizeof(wb->flushing.keys.data[0]), 434 wb_key_seq_cmp, NULL); 435 436 darray_for_each(wb->flushing.keys, i) { 437 if (!i->journal_seq) 438 continue; 439 440 if (!accounting_replay_done && 441 i->k.k.type == KEY_TYPE_accounting) { 442 could_not_insert++; 443 continue; 444 } 445 446 if (!could_not_insert) 447 bch2_journal_pin_update(j, i->journal_seq, &wb->flushing.pin, 448 bch2_btree_write_buffer_journal_flush); 449 450 bch2_trans_begin(trans); 451 452 ret = commit_do(trans, NULL, NULL, 453 BCH_WATERMARK_reclaim| 454 BCH_TRANS_COMMIT_journal_reclaim| 455 BCH_TRANS_COMMIT_no_check_rw| 456 BCH_TRANS_COMMIT_no_enospc| 457 BCH_TRANS_COMMIT_no_journal_res , 458 btree_write_buffered_insert(trans, i)); 459 if (ret) 460 goto err; 461 462 i->journal_seq = 0; 463 } 464 465 /* 466 * If journal replay hasn't finished with accounting keys we 467 * can't flush accounting keys at all - condense them and leave 468 * them for next time. 469 * 470 * Q: Can the write buffer overflow? 471 * A Shouldn't be any actual risk. It's just new accounting 472 * updates that the write buffer can't flush, and those are only 473 * going to be generated by interior btree node updates as 474 * journal replay has to split/rewrite nodes to make room for 475 * its updates. 476 * 477 * And for those new acounting updates, updates to the same 478 * counters get accumulated as they're flushed from the journal 479 * to the write buffer - see the patch for eytzingcer tree 480 * accumulated. So we could only overflow if the number of 481 * distinct counters touched somehow was very large. 482 */ 483 if (could_not_insert) { 484 struct btree_write_buffered_key *dst = wb->flushing.keys.data; 485 486 darray_for_each(wb->flushing.keys, i) 487 if (i->journal_seq) 488 *dst++ = *i; 489 wb->flushing.keys.nr = dst - wb->flushing.keys.data; 490 } 491 } 492 err: 493 if (ret || !could_not_insert) { 494 bch2_journal_pin_drop(j, &wb->flushing.pin); 495 wb->flushing.keys.nr = 0; 496 } 497 498 bch2_fs_fatal_err_on(ret, c, "%s", bch2_err_str(ret)); 499 trace_write_buffer_flush(trans, wb->flushing.keys.nr, overwritten, fast, 0); 500 return ret; 501 } 502 503 static int bch2_journal_keys_to_write_buffer(struct bch_fs *c, struct journal_buf *buf) 504 { 505 struct journal_keys_to_wb dst; 506 int ret = 0; 507 508 bch2_journal_keys_to_write_buffer_start(c, &dst, le64_to_cpu(buf->data->seq)); 509 510 for_each_jset_entry_type(entry, buf->data, BCH_JSET_ENTRY_write_buffer_keys) { 511 jset_entry_for_each_key(entry, k) { 512 ret = bch2_journal_key_to_wb(c, &dst, entry->btree_id, k); 513 if (ret) 514 goto out; 515 } 516 517 entry->type = BCH_JSET_ENTRY_btree_keys; 518 } 519 out: 520 ret = bch2_journal_keys_to_write_buffer_end(c, &dst) ?: ret; 521 return ret; 522 } 523 524 static int fetch_wb_keys_from_journal(struct bch_fs *c, u64 max_seq) 525 { 526 struct journal *j = &c->journal; 527 struct journal_buf *buf; 528 bool blocked; 529 int ret = 0; 530 531 while (!ret && (buf = bch2_next_write_buffer_flush_journal_buf(j, max_seq, &blocked))) { 532 ret = bch2_journal_keys_to_write_buffer(c, buf); 533 534 if (!blocked && !ret) { 535 spin_lock(&j->lock); 536 buf->need_flush_to_write_buffer = false; 537 spin_unlock(&j->lock); 538 } 539 540 mutex_unlock(&j->buf_lock); 541 542 if (blocked) { 543 bch2_journal_unblock(j); 544 break; 545 } 546 } 547 548 return ret; 549 } 550 551 static int btree_write_buffer_flush_seq(struct btree_trans *trans, u64 max_seq, 552 bool *did_work) 553 { 554 struct bch_fs *c = trans->c; 555 struct btree_write_buffer *wb = &c->btree_write_buffer; 556 int ret = 0, fetch_from_journal_err; 557 558 do { 559 bch2_trans_unlock(trans); 560 561 fetch_from_journal_err = fetch_wb_keys_from_journal(c, max_seq); 562 563 *did_work |= wb->inc.keys.nr || wb->flushing.keys.nr; 564 565 /* 566 * On memory allocation failure, bch2_btree_write_buffer_flush_locked() 567 * is not guaranteed to empty wb->inc: 568 */ 569 mutex_lock(&wb->flushing.lock); 570 ret = bch2_btree_write_buffer_flush_locked(trans); 571 mutex_unlock(&wb->flushing.lock); 572 } while (!ret && 573 (fetch_from_journal_err || 574 (wb->inc.pin.seq && wb->inc.pin.seq <= max_seq) || 575 (wb->flushing.pin.seq && wb->flushing.pin.seq <= max_seq))); 576 577 return ret; 578 } 579 580 static int bch2_btree_write_buffer_journal_flush(struct journal *j, 581 struct journal_entry_pin *_pin, u64 seq) 582 { 583 struct bch_fs *c = container_of(j, struct bch_fs, journal); 584 bool did_work = false; 585 586 return bch2_trans_run(c, btree_write_buffer_flush_seq(trans, seq, &did_work)); 587 } 588 589 int bch2_btree_write_buffer_flush_sync(struct btree_trans *trans) 590 { 591 struct bch_fs *c = trans->c; 592 bool did_work = false; 593 594 trace_and_count(c, write_buffer_flush_sync, trans, _RET_IP_); 595 596 return btree_write_buffer_flush_seq(trans, journal_cur_seq(&c->journal), &did_work); 597 } 598 599 /* 600 * The write buffer requires flushing when going RO: keys in the journal for the 601 * write buffer don't have a journal pin yet 602 */ 603 bool bch2_btree_write_buffer_flush_going_ro(struct bch_fs *c) 604 { 605 if (bch2_journal_error(&c->journal)) 606 return false; 607 608 bool did_work = false; 609 bch2_trans_run(c, btree_write_buffer_flush_seq(trans, 610 journal_cur_seq(&c->journal), &did_work)); 611 return did_work; 612 } 613 614 int bch2_btree_write_buffer_flush_nocheck_rw(struct btree_trans *trans) 615 { 616 struct bch_fs *c = trans->c; 617 struct btree_write_buffer *wb = &c->btree_write_buffer; 618 int ret = 0; 619 620 if (mutex_trylock(&wb->flushing.lock)) { 621 ret = bch2_btree_write_buffer_flush_locked(trans); 622 mutex_unlock(&wb->flushing.lock); 623 } 624 625 return ret; 626 } 627 628 int bch2_btree_write_buffer_tryflush(struct btree_trans *trans) 629 { 630 struct bch_fs *c = trans->c; 631 632 if (!bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer)) 633 return -BCH_ERR_erofs_no_writes; 634 635 int ret = bch2_btree_write_buffer_flush_nocheck_rw(trans); 636 bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer); 637 return ret; 638 } 639 640 /* 641 * In check and repair code, when checking references to write buffer btrees we 642 * need to issue a flush before we have a definitive error: this issues a flush 643 * if this is a key we haven't yet checked. 644 */ 645 int bch2_btree_write_buffer_maybe_flush(struct btree_trans *trans, 646 struct bkey_s_c referring_k, 647 struct bkey_buf *last_flushed) 648 { 649 struct bch_fs *c = trans->c; 650 struct bkey_buf tmp; 651 int ret = 0; 652 653 bch2_bkey_buf_init(&tmp); 654 655 if (!bkey_and_val_eq(referring_k, bkey_i_to_s_c(last_flushed->k))) { 656 if (trace_write_buffer_maybe_flush_enabled()) { 657 struct printbuf buf = PRINTBUF; 658 659 bch2_bkey_val_to_text(&buf, c, referring_k); 660 trace_write_buffer_maybe_flush(trans, _RET_IP_, buf.buf); 661 printbuf_exit(&buf); 662 } 663 664 bch2_bkey_buf_reassemble(&tmp, c, referring_k); 665 666 if (bkey_is_btree_ptr(referring_k.k)) { 667 bch2_trans_unlock(trans); 668 bch2_btree_interior_updates_flush(c); 669 } 670 671 ret = bch2_btree_write_buffer_flush_sync(trans); 672 if (ret) 673 goto err; 674 675 bch2_bkey_buf_copy(last_flushed, c, tmp.k); 676 ret = -BCH_ERR_transaction_restart_write_buffer_flush; 677 } 678 err: 679 bch2_bkey_buf_exit(&tmp, c); 680 return ret; 681 } 682 683 static void bch2_btree_write_buffer_flush_work(struct work_struct *work) 684 { 685 struct bch_fs *c = container_of(work, struct bch_fs, btree_write_buffer.flush_work); 686 struct btree_write_buffer *wb = &c->btree_write_buffer; 687 int ret; 688 689 mutex_lock(&wb->flushing.lock); 690 do { 691 ret = bch2_trans_run(c, bch2_btree_write_buffer_flush_locked(trans)); 692 } while (!ret && bch2_btree_write_buffer_should_flush(c)); 693 mutex_unlock(&wb->flushing.lock); 694 695 bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer); 696 } 697 698 static void wb_accounting_sort(struct btree_write_buffer *wb) 699 { 700 eytzinger0_sort(wb->accounting.data, wb->accounting.nr, 701 sizeof(wb->accounting.data[0]), 702 wb_key_cmp, NULL); 703 } 704 705 int bch2_accounting_key_to_wb_slowpath(struct bch_fs *c, enum btree_id btree, 706 struct bkey_i_accounting *k) 707 { 708 struct btree_write_buffer *wb = &c->btree_write_buffer; 709 struct btree_write_buffered_key new = { .btree = btree }; 710 711 bkey_copy(&new.k, &k->k_i); 712 713 int ret = darray_push(&wb->accounting, new); 714 if (ret) 715 return ret; 716 717 wb_accounting_sort(wb); 718 return 0; 719 } 720 721 int bch2_journal_key_to_wb_slowpath(struct bch_fs *c, 722 struct journal_keys_to_wb *dst, 723 enum btree_id btree, struct bkey_i *k) 724 { 725 struct btree_write_buffer *wb = &c->btree_write_buffer; 726 int ret; 727 retry: 728 ret = darray_make_room_gfp(&dst->wb->keys, 1, GFP_KERNEL); 729 if (!ret && dst->wb == &wb->flushing) 730 ret = darray_resize(&wb->sorted, wb->flushing.keys.size); 731 732 if (unlikely(ret)) { 733 if (dst->wb == &c->btree_write_buffer.flushing) { 734 mutex_unlock(&dst->wb->lock); 735 dst->wb = &c->btree_write_buffer.inc; 736 bch2_journal_pin_add(&c->journal, dst->seq, &dst->wb->pin, 737 bch2_btree_write_buffer_journal_flush); 738 goto retry; 739 } 740 741 return ret; 742 } 743 744 dst->room = darray_room(dst->wb->keys); 745 if (dst->wb == &wb->flushing) 746 dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr); 747 BUG_ON(!dst->room); 748 BUG_ON(!dst->seq); 749 750 struct btree_write_buffered_key *wb_k = &darray_top(dst->wb->keys); 751 wb_k->journal_seq = dst->seq; 752 wb_k->btree = btree; 753 bkey_copy(&wb_k->k, k); 754 dst->wb->keys.nr++; 755 dst->room--; 756 return 0; 757 } 758 759 void bch2_journal_keys_to_write_buffer_start(struct bch_fs *c, struct journal_keys_to_wb *dst, u64 seq) 760 { 761 struct btree_write_buffer *wb = &c->btree_write_buffer; 762 763 if (mutex_trylock(&wb->flushing.lock)) { 764 mutex_lock(&wb->inc.lock); 765 move_keys_from_inc_to_flushing(wb); 766 767 /* 768 * Attempt to skip wb->inc, and add keys directly to 769 * wb->flushing, saving us a copy later: 770 */ 771 772 if (!wb->inc.keys.nr) { 773 dst->wb = &wb->flushing; 774 } else { 775 mutex_unlock(&wb->flushing.lock); 776 dst->wb = &wb->inc; 777 } 778 } else { 779 mutex_lock(&wb->inc.lock); 780 dst->wb = &wb->inc; 781 } 782 783 dst->room = darray_room(dst->wb->keys); 784 if (dst->wb == &wb->flushing) 785 dst->room = min(dst->room, wb->sorted.size - wb->flushing.keys.nr); 786 dst->seq = seq; 787 788 bch2_journal_pin_add(&c->journal, seq, &dst->wb->pin, 789 bch2_btree_write_buffer_journal_flush); 790 791 darray_for_each(wb->accounting, i) 792 memset(&i->k.v, 0, bkey_val_bytes(&i->k.k)); 793 } 794 795 int bch2_journal_keys_to_write_buffer_end(struct bch_fs *c, struct journal_keys_to_wb *dst) 796 { 797 struct btree_write_buffer *wb = &c->btree_write_buffer; 798 unsigned live_accounting_keys = 0; 799 int ret = 0; 800 801 darray_for_each(wb->accounting, i) 802 if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&i->k))) { 803 i->journal_seq = dst->seq; 804 live_accounting_keys++; 805 ret = __bch2_journal_key_to_wb(c, dst, i->btree, &i->k); 806 if (ret) 807 break; 808 } 809 810 if (live_accounting_keys * 2 < wb->accounting.nr) { 811 struct btree_write_buffered_key *dst = wb->accounting.data; 812 813 darray_for_each(wb->accounting, src) 814 if (!bch2_accounting_key_is_zero(bkey_i_to_s_c_accounting(&src->k))) 815 *dst++ = *src; 816 wb->accounting.nr = dst - wb->accounting.data; 817 wb_accounting_sort(wb); 818 } 819 820 if (!dst->wb->keys.nr) 821 bch2_journal_pin_drop(&c->journal, &dst->wb->pin); 822 823 if (bch2_btree_write_buffer_should_flush(c) && 824 __bch2_write_ref_tryget(c, BCH_WRITE_REF_btree_write_buffer) && 825 !queue_work(system_unbound_wq, &c->btree_write_buffer.flush_work)) 826 bch2_write_ref_put(c, BCH_WRITE_REF_btree_write_buffer); 827 828 if (dst->wb == &wb->flushing) 829 mutex_unlock(&wb->flushing.lock); 830 mutex_unlock(&wb->inc.lock); 831 832 return ret; 833 } 834 835 static int wb_keys_resize(struct btree_write_buffer_keys *wb, size_t new_size) 836 { 837 if (wb->keys.size >= new_size) 838 return 0; 839 840 if (!mutex_trylock(&wb->lock)) 841 return -EINTR; 842 843 int ret = darray_resize(&wb->keys, new_size); 844 mutex_unlock(&wb->lock); 845 return ret; 846 } 847 848 int bch2_btree_write_buffer_resize(struct bch_fs *c, size_t new_size) 849 { 850 struct btree_write_buffer *wb = &c->btree_write_buffer; 851 852 return wb_keys_resize(&wb->flushing, new_size) ?: 853 wb_keys_resize(&wb->inc, new_size); 854 } 855 856 void bch2_fs_btree_write_buffer_exit(struct bch_fs *c) 857 { 858 struct btree_write_buffer *wb = &c->btree_write_buffer; 859 860 BUG_ON((wb->inc.keys.nr || wb->flushing.keys.nr) && 861 !bch2_journal_error(&c->journal)); 862 863 darray_exit(&wb->accounting); 864 darray_exit(&wb->sorted); 865 darray_exit(&wb->flushing.keys); 866 darray_exit(&wb->inc.keys); 867 } 868 869 int bch2_fs_btree_write_buffer_init(struct bch_fs *c) 870 { 871 struct btree_write_buffer *wb = &c->btree_write_buffer; 872 873 mutex_init(&wb->inc.lock); 874 mutex_init(&wb->flushing.lock); 875 INIT_WORK(&wb->flush_work, bch2_btree_write_buffer_flush_work); 876 877 /* Will be resized by journal as needed: */ 878 unsigned initial_size = 1 << 16; 879 880 return darray_make_room(&wb->inc.keys, initial_size) ?: 881 darray_make_room(&wb->flushing.keys, initial_size) ?: 882 darray_make_room(&wb->sorted, initial_size); 883 } 884