1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * bcache journalling code, for btree insertions 4 * 5 * Copyright 2012 Google, Inc. 6 */ 7 8 #include "bcache.h" 9 #include "btree.h" 10 #include "debug.h" 11 #include "extents.h" 12 13 #include <trace/events/bcache.h> 14 15 /* 16 * Journal replay/recovery: 17 * 18 * This code is all driven from run_cache_set(); we first read the journal 19 * entries, do some other stuff, then we mark all the keys in the journal 20 * entries (same as garbage collection would), then we replay them - reinserting 21 * them into the cache in precisely the same order as they appear in the 22 * journal. 23 * 24 * We only journal keys that go in leaf nodes, which simplifies things quite a 25 * bit. 26 */ 27 28 static void journal_read_endio(struct bio *bio) 29 { 30 struct closure *cl = bio->bi_private; 31 closure_put(cl); 32 } 33 34 static int journal_read_bucket(struct cache *ca, struct list_head *list, 35 unsigned bucket_index) 36 { 37 struct journal_device *ja = &ca->journal; 38 struct bio *bio = &ja->bio; 39 40 struct journal_replay *i; 41 struct jset *j, *data = ca->set->journal.w[0].data; 42 struct closure cl; 43 unsigned len, left, offset = 0; 44 int ret = 0; 45 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 46 47 closure_init_stack(&cl); 48 49 pr_debug("reading %u", bucket_index); 50 51 while (offset < ca->sb.bucket_size) { 52 reread: left = ca->sb.bucket_size - offset; 53 len = min_t(unsigned, left, PAGE_SECTORS << JSET_BITS); 54 55 bio_reset(bio); 56 bio->bi_iter.bi_sector = bucket + offset; 57 bio_set_dev(bio, ca->bdev); 58 bio->bi_iter.bi_size = len << 9; 59 60 bio->bi_end_io = journal_read_endio; 61 bio->bi_private = &cl; 62 bio_set_op_attrs(bio, REQ_OP_READ, 0); 63 bch_bio_map(bio, data); 64 65 closure_bio_submit(bio, &cl); 66 closure_sync(&cl); 67 68 /* This function could be simpler now since we no longer write 69 * journal entries that overlap bucket boundaries; this means 70 * the start of a bucket will always have a valid journal entry 71 * if it has any journal entries at all. 72 */ 73 74 j = data; 75 while (len) { 76 struct list_head *where; 77 size_t blocks, bytes = set_bytes(j); 78 79 if (j->magic != jset_magic(&ca->sb)) { 80 pr_debug("%u: bad magic", bucket_index); 81 return ret; 82 } 83 84 if (bytes > left << 9 || 85 bytes > PAGE_SIZE << JSET_BITS) { 86 pr_info("%u: too big, %zu bytes, offset %u", 87 bucket_index, bytes, offset); 88 return ret; 89 } 90 91 if (bytes > len << 9) 92 goto reread; 93 94 if (j->csum != csum_set(j)) { 95 pr_info("%u: bad csum, %zu bytes, offset %u", 96 bucket_index, bytes, offset); 97 return ret; 98 } 99 100 blocks = set_blocks(j, block_bytes(ca->set)); 101 102 while (!list_empty(list)) { 103 i = list_first_entry(list, 104 struct journal_replay, list); 105 if (i->j.seq >= j->last_seq) 106 break; 107 list_del(&i->list); 108 kfree(i); 109 } 110 111 list_for_each_entry_reverse(i, list, list) { 112 if (j->seq == i->j.seq) 113 goto next_set; 114 115 if (j->seq < i->j.last_seq) 116 goto next_set; 117 118 if (j->seq > i->j.seq) { 119 where = &i->list; 120 goto add; 121 } 122 } 123 124 where = list; 125 add: 126 i = kmalloc(offsetof(struct journal_replay, j) + 127 bytes, GFP_KERNEL); 128 if (!i) 129 return -ENOMEM; 130 memcpy(&i->j, j, bytes); 131 list_add(&i->list, where); 132 ret = 1; 133 134 ja->seq[bucket_index] = j->seq; 135 next_set: 136 offset += blocks * ca->sb.block_size; 137 len -= blocks * ca->sb.block_size; 138 j = ((void *) j) + blocks * block_bytes(ca); 139 } 140 } 141 142 return ret; 143 } 144 145 int bch_journal_read(struct cache_set *c, struct list_head *list) 146 { 147 #define read_bucket(b) \ 148 ({ \ 149 int ret = journal_read_bucket(ca, list, b); \ 150 __set_bit(b, bitmap); \ 151 if (ret < 0) \ 152 return ret; \ 153 ret; \ 154 }) 155 156 struct cache *ca; 157 unsigned iter; 158 159 for_each_cache(ca, c, iter) { 160 struct journal_device *ja = &ca->journal; 161 DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS); 162 unsigned i, l, r, m; 163 uint64_t seq; 164 165 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS); 166 pr_debug("%u journal buckets", ca->sb.njournal_buckets); 167 168 /* 169 * Read journal buckets ordered by golden ratio hash to quickly 170 * find a sequence of buckets with valid journal entries 171 */ 172 for (i = 0; i < ca->sb.njournal_buckets; i++) { 173 l = (i * 2654435769U) % ca->sb.njournal_buckets; 174 175 if (test_bit(l, bitmap)) 176 break; 177 178 if (read_bucket(l)) 179 goto bsearch; 180 } 181 182 /* 183 * If that fails, check all the buckets we haven't checked 184 * already 185 */ 186 pr_debug("falling back to linear search"); 187 188 for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets); 189 l < ca->sb.njournal_buckets; 190 l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1)) 191 if (read_bucket(l)) 192 goto bsearch; 193 194 /* no journal entries on this device? */ 195 if (l == ca->sb.njournal_buckets) 196 continue; 197 bsearch: 198 BUG_ON(list_empty(list)); 199 200 /* Binary search */ 201 m = l; 202 r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1); 203 pr_debug("starting binary search, l %u r %u", l, r); 204 205 while (l + 1 < r) { 206 seq = list_entry(list->prev, struct journal_replay, 207 list)->j.seq; 208 209 m = (l + r) >> 1; 210 read_bucket(m); 211 212 if (seq != list_entry(list->prev, struct journal_replay, 213 list)->j.seq) 214 l = m; 215 else 216 r = m; 217 } 218 219 /* 220 * Read buckets in reverse order until we stop finding more 221 * journal entries 222 */ 223 pr_debug("finishing up: m %u njournal_buckets %u", 224 m, ca->sb.njournal_buckets); 225 l = m; 226 227 while (1) { 228 if (!l--) 229 l = ca->sb.njournal_buckets - 1; 230 231 if (l == m) 232 break; 233 234 if (test_bit(l, bitmap)) 235 continue; 236 237 if (!read_bucket(l)) 238 break; 239 } 240 241 seq = 0; 242 243 for (i = 0; i < ca->sb.njournal_buckets; i++) 244 if (ja->seq[i] > seq) { 245 seq = ja->seq[i]; 246 /* 247 * When journal_reclaim() goes to allocate for 248 * the first time, it'll use the bucket after 249 * ja->cur_idx 250 */ 251 ja->cur_idx = i; 252 ja->last_idx = ja->discard_idx = (i + 1) % 253 ca->sb.njournal_buckets; 254 255 } 256 } 257 258 if (!list_empty(list)) 259 c->journal.seq = list_entry(list->prev, 260 struct journal_replay, 261 list)->j.seq; 262 263 return 0; 264 #undef read_bucket 265 } 266 267 void bch_journal_mark(struct cache_set *c, struct list_head *list) 268 { 269 atomic_t p = { 0 }; 270 struct bkey *k; 271 struct journal_replay *i; 272 struct journal *j = &c->journal; 273 uint64_t last = j->seq; 274 275 /* 276 * journal.pin should never fill up - we never write a journal 277 * entry when it would fill up. But if for some reason it does, we 278 * iterate over the list in reverse order so that we can just skip that 279 * refcount instead of bugging. 280 */ 281 282 list_for_each_entry_reverse(i, list, list) { 283 BUG_ON(last < i->j.seq); 284 i->pin = NULL; 285 286 while (last-- != i->j.seq) 287 if (fifo_free(&j->pin) > 1) { 288 fifo_push_front(&j->pin, p); 289 atomic_set(&fifo_front(&j->pin), 0); 290 } 291 292 if (fifo_free(&j->pin) > 1) { 293 fifo_push_front(&j->pin, p); 294 i->pin = &fifo_front(&j->pin); 295 atomic_set(i->pin, 1); 296 } 297 298 for (k = i->j.start; 299 k < bset_bkey_last(&i->j); 300 k = bkey_next(k)) 301 if (!__bch_extent_invalid(c, k)) { 302 unsigned j; 303 304 for (j = 0; j < KEY_PTRS(k); j++) 305 if (ptr_available(c, k, j)) 306 atomic_inc(&PTR_BUCKET(c, k, j)->pin); 307 308 bch_initial_mark_key(c, 0, k); 309 } 310 } 311 } 312 313 int bch_journal_replay(struct cache_set *s, struct list_head *list) 314 { 315 int ret = 0, keys = 0, entries = 0; 316 struct bkey *k; 317 struct journal_replay *i = 318 list_entry(list->prev, struct journal_replay, list); 319 320 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 321 struct keylist keylist; 322 323 list_for_each_entry(i, list, list) { 324 BUG_ON(i->pin && atomic_read(i->pin) != 1); 325 326 cache_set_err_on(n != i->j.seq, s, 327 "bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)", 328 n, i->j.seq - 1, start, end); 329 330 for (k = i->j.start; 331 k < bset_bkey_last(&i->j); 332 k = bkey_next(k)) { 333 trace_bcache_journal_replay_key(k); 334 335 bch_keylist_init_single(&keylist, k); 336 337 ret = bch_btree_insert(s, &keylist, i->pin, NULL); 338 if (ret) 339 goto err; 340 341 BUG_ON(!bch_keylist_empty(&keylist)); 342 keys++; 343 344 cond_resched(); 345 } 346 347 if (i->pin) 348 atomic_dec(i->pin); 349 n = i->j.seq + 1; 350 entries++; 351 } 352 353 pr_info("journal replay done, %i keys in %i entries, seq %llu", 354 keys, entries, end); 355 err: 356 while (!list_empty(list)) { 357 i = list_first_entry(list, struct journal_replay, list); 358 list_del(&i->list); 359 kfree(i); 360 } 361 362 return ret; 363 } 364 365 /* Journalling */ 366 367 static void btree_flush_write(struct cache_set *c) 368 { 369 /* 370 * Try to find the btree node with that references the oldest journal 371 * entry, best is our current candidate and is locked if non NULL: 372 */ 373 struct btree *b, *best; 374 unsigned i; 375 retry: 376 best = NULL; 377 378 for_each_cached_btree(b, c, i) 379 if (btree_current_write(b)->journal) { 380 if (!best) 381 best = b; 382 else if (journal_pin_cmp(c, 383 btree_current_write(best)->journal, 384 btree_current_write(b)->journal)) { 385 best = b; 386 } 387 } 388 389 b = best; 390 if (b) { 391 mutex_lock(&b->write_lock); 392 if (!btree_current_write(b)->journal) { 393 mutex_unlock(&b->write_lock); 394 /* We raced */ 395 goto retry; 396 } 397 398 __bch_btree_node_write(b, NULL); 399 mutex_unlock(&b->write_lock); 400 } 401 } 402 403 #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 404 405 static void journal_discard_endio(struct bio *bio) 406 { 407 struct journal_device *ja = 408 container_of(bio, struct journal_device, discard_bio); 409 struct cache *ca = container_of(ja, struct cache, journal); 410 411 atomic_set(&ja->discard_in_flight, DISCARD_DONE); 412 413 closure_wake_up(&ca->set->journal.wait); 414 closure_put(&ca->set->cl); 415 } 416 417 static void journal_discard_work(struct work_struct *work) 418 { 419 struct journal_device *ja = 420 container_of(work, struct journal_device, discard_work); 421 422 submit_bio(&ja->discard_bio); 423 } 424 425 static void do_journal_discard(struct cache *ca) 426 { 427 struct journal_device *ja = &ca->journal; 428 struct bio *bio = &ja->discard_bio; 429 430 if (!ca->discard) { 431 ja->discard_idx = ja->last_idx; 432 return; 433 } 434 435 switch (atomic_read(&ja->discard_in_flight)) { 436 case DISCARD_IN_FLIGHT: 437 return; 438 439 case DISCARD_DONE: 440 ja->discard_idx = (ja->discard_idx + 1) % 441 ca->sb.njournal_buckets; 442 443 atomic_set(&ja->discard_in_flight, DISCARD_READY); 444 /* fallthrough */ 445 446 case DISCARD_READY: 447 if (ja->discard_idx == ja->last_idx) 448 return; 449 450 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT); 451 452 bio_init(bio, bio->bi_inline_vecs, 1); 453 bio_set_op_attrs(bio, REQ_OP_DISCARD, 0); 454 bio->bi_iter.bi_sector = bucket_to_sector(ca->set, 455 ca->sb.d[ja->discard_idx]); 456 bio_set_dev(bio, ca->bdev); 457 bio->bi_iter.bi_size = bucket_bytes(ca); 458 bio->bi_end_io = journal_discard_endio; 459 460 closure_get(&ca->set->cl); 461 INIT_WORK(&ja->discard_work, journal_discard_work); 462 schedule_work(&ja->discard_work); 463 } 464 } 465 466 static void journal_reclaim(struct cache_set *c) 467 { 468 struct bkey *k = &c->journal.key; 469 struct cache *ca; 470 uint64_t last_seq; 471 unsigned iter, n = 0; 472 atomic_t p; 473 474 while (!atomic_read(&fifo_front(&c->journal.pin))) 475 fifo_pop(&c->journal.pin, p); 476 477 last_seq = last_seq(&c->journal); 478 479 /* Update last_idx */ 480 481 for_each_cache(ca, c, iter) { 482 struct journal_device *ja = &ca->journal; 483 484 while (ja->last_idx != ja->cur_idx && 485 ja->seq[ja->last_idx] < last_seq) 486 ja->last_idx = (ja->last_idx + 1) % 487 ca->sb.njournal_buckets; 488 } 489 490 for_each_cache(ca, c, iter) 491 do_journal_discard(ca); 492 493 if (c->journal.blocks_free) 494 goto out; 495 496 /* 497 * Allocate: 498 * XXX: Sort by free journal space 499 */ 500 501 for_each_cache(ca, c, iter) { 502 struct journal_device *ja = &ca->journal; 503 unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets; 504 505 /* No space available on this device */ 506 if (next == ja->discard_idx) 507 continue; 508 509 ja->cur_idx = next; 510 k->ptr[n++] = PTR(0, 511 bucket_to_sector(c, ca->sb.d[ja->cur_idx]), 512 ca->sb.nr_this_dev); 513 } 514 515 bkey_init(k); 516 SET_KEY_PTRS(k, n); 517 518 if (n) 519 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 520 out: 521 if (!journal_full(&c->journal)) 522 __closure_wake_up(&c->journal.wait); 523 } 524 525 void bch_journal_next(struct journal *j) 526 { 527 atomic_t p = { 1 }; 528 529 j->cur = (j->cur == j->w) 530 ? &j->w[1] 531 : &j->w[0]; 532 533 /* 534 * The fifo_push() needs to happen at the same time as j->seq is 535 * incremented for last_seq() to be calculated correctly 536 */ 537 BUG_ON(!fifo_push(&j->pin, p)); 538 atomic_set(&fifo_back(&j->pin), 1); 539 540 j->cur->data->seq = ++j->seq; 541 j->cur->dirty = false; 542 j->cur->need_write = false; 543 j->cur->data->keys = 0; 544 545 if (fifo_full(&j->pin)) 546 pr_debug("journal_pin full (%zu)", fifo_used(&j->pin)); 547 } 548 549 static void journal_write_endio(struct bio *bio) 550 { 551 struct journal_write *w = bio->bi_private; 552 553 cache_set_err_on(bio->bi_status, w->c, "journal io error"); 554 closure_put(&w->c->journal.io); 555 } 556 557 static void journal_write(struct closure *); 558 559 static void journal_write_done(struct closure *cl) 560 { 561 struct journal *j = container_of(cl, struct journal, io); 562 struct journal_write *w = (j->cur == j->w) 563 ? &j->w[1] 564 : &j->w[0]; 565 566 __closure_wake_up(&w->wait); 567 continue_at_nobarrier(cl, journal_write, system_wq); 568 } 569 570 static void journal_write_unlock(struct closure *cl) 571 { 572 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 573 574 c->journal.io_in_flight = 0; 575 spin_unlock(&c->journal.lock); 576 } 577 578 static void journal_write_unlocked(struct closure *cl) 579 __releases(c->journal.lock) 580 { 581 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 582 struct cache *ca; 583 struct journal_write *w = c->journal.cur; 584 struct bkey *k = &c->journal.key; 585 unsigned i, sectors = set_blocks(w->data, block_bytes(c)) * 586 c->sb.block_size; 587 588 struct bio *bio; 589 struct bio_list list; 590 bio_list_init(&list); 591 592 if (!w->need_write) { 593 closure_return_with_destructor(cl, journal_write_unlock); 594 return; 595 } else if (journal_full(&c->journal)) { 596 journal_reclaim(c); 597 spin_unlock(&c->journal.lock); 598 599 btree_flush_write(c); 600 continue_at(cl, journal_write, system_wq); 601 return; 602 } 603 604 c->journal.blocks_free -= set_blocks(w->data, block_bytes(c)); 605 606 w->data->btree_level = c->root->level; 607 608 bkey_copy(&w->data->btree_root, &c->root->key); 609 bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket); 610 611 for_each_cache(ca, c, i) 612 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 613 614 w->data->magic = jset_magic(&c->sb); 615 w->data->version = BCACHE_JSET_VERSION; 616 w->data->last_seq = last_seq(&c->journal); 617 w->data->csum = csum_set(w->data); 618 619 for (i = 0; i < KEY_PTRS(k); i++) { 620 ca = PTR_CACHE(c, k, i); 621 bio = &ca->journal.bio; 622 623 atomic_long_add(sectors, &ca->meta_sectors_written); 624 625 bio_reset(bio); 626 bio->bi_iter.bi_sector = PTR_OFFSET(k, i); 627 bio_set_dev(bio, ca->bdev); 628 bio->bi_iter.bi_size = sectors << 9; 629 630 bio->bi_end_io = journal_write_endio; 631 bio->bi_private = w; 632 bio_set_op_attrs(bio, REQ_OP_WRITE, 633 REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA); 634 bch_bio_map(bio, w->data); 635 636 trace_bcache_journal_write(bio); 637 bio_list_add(&list, bio); 638 639 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors); 640 641 ca->journal.seq[ca->journal.cur_idx] = w->data->seq; 642 } 643 644 atomic_dec_bug(&fifo_back(&c->journal.pin)); 645 bch_journal_next(&c->journal); 646 journal_reclaim(c); 647 648 spin_unlock(&c->journal.lock); 649 650 while ((bio = bio_list_pop(&list))) 651 closure_bio_submit(bio, cl); 652 653 continue_at(cl, journal_write_done, NULL); 654 } 655 656 static void journal_write(struct closure *cl) 657 { 658 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 659 660 spin_lock(&c->journal.lock); 661 journal_write_unlocked(cl); 662 } 663 664 static void journal_try_write(struct cache_set *c) 665 __releases(c->journal.lock) 666 { 667 struct closure *cl = &c->journal.io; 668 struct journal_write *w = c->journal.cur; 669 670 w->need_write = true; 671 672 if (!c->journal.io_in_flight) { 673 c->journal.io_in_flight = 1; 674 closure_call(cl, journal_write_unlocked, NULL, &c->cl); 675 } else { 676 spin_unlock(&c->journal.lock); 677 } 678 } 679 680 static struct journal_write *journal_wait_for_write(struct cache_set *c, 681 unsigned nkeys) 682 { 683 size_t sectors; 684 struct closure cl; 685 bool wait = false; 686 687 closure_init_stack(&cl); 688 689 spin_lock(&c->journal.lock); 690 691 while (1) { 692 struct journal_write *w = c->journal.cur; 693 694 sectors = __set_blocks(w->data, w->data->keys + nkeys, 695 block_bytes(c)) * c->sb.block_size; 696 697 if (sectors <= min_t(size_t, 698 c->journal.blocks_free * c->sb.block_size, 699 PAGE_SECTORS << JSET_BITS)) 700 return w; 701 702 if (wait) 703 closure_wait(&c->journal.wait, &cl); 704 705 if (!journal_full(&c->journal)) { 706 if (wait) 707 trace_bcache_journal_entry_full(c); 708 709 /* 710 * XXX: If we were inserting so many keys that they 711 * won't fit in an _empty_ journal write, we'll 712 * deadlock. For now, handle this in 713 * bch_keylist_realloc() - but something to think about. 714 */ 715 BUG_ON(!w->data->keys); 716 717 journal_try_write(c); /* unlocks */ 718 } else { 719 if (wait) 720 trace_bcache_journal_full(c); 721 722 journal_reclaim(c); 723 spin_unlock(&c->journal.lock); 724 725 btree_flush_write(c); 726 } 727 728 closure_sync(&cl); 729 spin_lock(&c->journal.lock); 730 wait = true; 731 } 732 } 733 734 static void journal_write_work(struct work_struct *work) 735 { 736 struct cache_set *c = container_of(to_delayed_work(work), 737 struct cache_set, 738 journal.work); 739 spin_lock(&c->journal.lock); 740 if (c->journal.cur->dirty) 741 journal_try_write(c); 742 else 743 spin_unlock(&c->journal.lock); 744 } 745 746 /* 747 * Entry point to the journalling code - bio_insert() and btree_invalidate() 748 * pass bch_journal() a list of keys to be journalled, and then 749 * bch_journal() hands those same keys off to btree_insert_async() 750 */ 751 752 atomic_t *bch_journal(struct cache_set *c, 753 struct keylist *keys, 754 struct closure *parent) 755 { 756 struct journal_write *w; 757 atomic_t *ret; 758 759 if (!CACHE_SYNC(&c->sb)) 760 return NULL; 761 762 w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); 763 764 memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys)); 765 w->data->keys += bch_keylist_nkeys(keys); 766 767 ret = &fifo_back(&c->journal.pin); 768 atomic_inc(ret); 769 770 if (parent) { 771 closure_wait(&w->wait, parent); 772 journal_try_write(c); 773 } else if (!w->dirty) { 774 w->dirty = true; 775 schedule_delayed_work(&c->journal.work, 776 msecs_to_jiffies(c->journal_delay_ms)); 777 spin_unlock(&c->journal.lock); 778 } else { 779 spin_unlock(&c->journal.lock); 780 } 781 782 783 return ret; 784 } 785 786 void bch_journal_meta(struct cache_set *c, struct closure *cl) 787 { 788 struct keylist keys; 789 atomic_t *ref; 790 791 bch_keylist_init(&keys); 792 793 ref = bch_journal(c, &keys, cl); 794 if (ref) 795 atomic_dec_bug(ref); 796 } 797 798 void bch_journal_free(struct cache_set *c) 799 { 800 free_pages((unsigned long) c->journal.w[1].data, JSET_BITS); 801 free_pages((unsigned long) c->journal.w[0].data, JSET_BITS); 802 free_fifo(&c->journal.pin); 803 } 804 805 int bch_journal_alloc(struct cache_set *c) 806 { 807 struct journal *j = &c->journal; 808 809 spin_lock_init(&j->lock); 810 INIT_DELAYED_WORK(&j->work, journal_write_work); 811 812 c->journal_delay_ms = 100; 813 814 j->w[0].c = c; 815 j->w[1].c = c; 816 817 if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 818 !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) || 819 !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS))) 820 return -ENOMEM; 821 822 return 0; 823 } 824