1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "block-map.h" 7 8 #include <linux/bio.h> 9 #include <linux/ratelimit.h> 10 11 #include "errors.h" 12 #include "logger.h" 13 #include "memory-alloc.h" 14 #include "permassert.h" 15 16 #include "action-manager.h" 17 #include "admin-state.h" 18 #include "completion.h" 19 #include "constants.h" 20 #include "data-vio.h" 21 #include "encodings.h" 22 #include "io-submitter.h" 23 #include "physical-zone.h" 24 #include "recovery-journal.h" 25 #include "slab-depot.h" 26 #include "status-codes.h" 27 #include "types.h" 28 #include "vdo.h" 29 #include "vio.h" 30 #include "wait-queue.h" 31 32 /** 33 * DOC: Block map eras 34 * 35 * The block map era, or maximum age, is used as follows: 36 * 37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of 38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every 39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras 40 * according to the sequence number they record. 41 * 42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In 43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in 44 * this era are issued for write. In all older eras, pages are issued for write immediately. 45 */ 46 47 struct page_descriptor { 48 root_count_t root_index; 49 height_t height; 50 page_number_t page_index; 51 slot_number_t slot; 52 } __packed; 53 54 union page_key { 55 struct page_descriptor descriptor; 56 u64 key; 57 }; 58 59 struct write_if_not_dirtied_context { 60 struct block_map_zone *zone; 61 u8 generation; 62 }; 63 64 struct block_map_tree_segment { 65 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 66 }; 67 68 struct block_map_tree { 69 struct block_map_tree_segment *segments; 70 }; 71 72 struct forest { 73 struct block_map *map; 74 size_t segments; 75 struct boundary *boundaries; 76 struct tree_page **pages; 77 struct block_map_tree trees[]; 78 }; 79 80 struct cursor_level { 81 page_number_t page_index; 82 slot_number_t slot; 83 }; 84 85 struct cursors; 86 87 struct cursor { 88 struct vdo_waiter waiter; 89 struct block_map_tree *tree; 90 height_t height; 91 struct cursors *parent; 92 struct boundary boundary; 93 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 94 struct pooled_vio *vio; 95 }; 96 97 struct cursors { 98 struct block_map_zone *zone; 99 struct vio_pool *pool; 100 vdo_entry_callback_fn entry_callback; 101 struct vdo_completion *completion; 102 root_count_t active_roots; 103 struct cursor cursors[]; 104 }; 105 106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF; 107 108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */ 109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF; 110 111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = { 112 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F, 113 .pbn_high_nibble = 0, 114 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX), 115 }; 116 117 #define LOG_INTERVAL 4000 118 #define DISPLAY_INTERVAL 100000 119 120 /* 121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread. 122 * Prevents any compiler shenanigans from affecting other threads reading those stats. 123 */ 124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta)) 125 126 static inline bool is_dirty(const struct page_info *info) 127 { 128 return info->state == PS_DIRTY; 129 } 130 131 static inline bool is_present(const struct page_info *info) 132 { 133 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY); 134 } 135 136 static inline bool is_in_flight(const struct page_info *info) 137 { 138 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING); 139 } 140 141 static inline bool is_incoming(const struct page_info *info) 142 { 143 return info->state == PS_INCOMING; 144 } 145 146 static inline bool is_outgoing(const struct page_info *info) 147 { 148 return info->state == PS_OUTGOING; 149 } 150 151 static inline bool is_valid(const struct page_info *info) 152 { 153 return is_present(info) || is_outgoing(info); 154 } 155 156 static char *get_page_buffer(struct page_info *info) 157 { 158 struct vdo_page_cache *cache = info->cache; 159 160 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE]; 161 } 162 163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter) 164 { 165 struct vdo_page_completion *completion; 166 167 if (waiter == NULL) 168 return NULL; 169 170 completion = container_of(waiter, struct vdo_page_completion, waiter); 171 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION); 172 return completion; 173 } 174 175 /** 176 * initialize_info() - Initialize all page info structures and put them on the free list. 177 * 178 * Return: VDO_SUCCESS or an error. 179 */ 180 static int initialize_info(struct vdo_page_cache *cache) 181 { 182 struct page_info *info; 183 184 INIT_LIST_HEAD(&cache->free_list); 185 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 186 int result; 187 188 info->cache = cache; 189 info->state = PS_FREE; 190 info->pbn = NO_PAGE; 191 192 result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP, 193 VIO_PRIORITY_METADATA, info, 194 get_page_buffer(info), &info->vio); 195 if (result != VDO_SUCCESS) 196 return result; 197 198 /* The thread ID should never change. */ 199 info->vio->completion.callback_thread_id = cache->zone->thread_id; 200 201 INIT_LIST_HEAD(&info->state_entry); 202 list_add_tail(&info->state_entry, &cache->free_list); 203 INIT_LIST_HEAD(&info->lru_entry); 204 } 205 206 return VDO_SUCCESS; 207 } 208 209 /** 210 * allocate_cache_components() - Allocate components of the cache which require their own 211 * allocation. 212 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be 213 * written out. 214 * 215 * The caller is responsible for all clean up on errors. 216 * 217 * Return: VDO_SUCCESS or an error code. 218 */ 219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache) 220 { 221 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE; 222 int result; 223 224 result = vdo_allocate(cache->page_count, struct page_info, "page infos", 225 &cache->infos); 226 if (result != VDO_SUCCESS) 227 return result; 228 229 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages); 230 if (result != VDO_SUCCESS) 231 return result; 232 233 result = vdo_int_map_create(cache->page_count, &cache->page_map); 234 if (result != VDO_SUCCESS) 235 return result; 236 237 return initialize_info(cache); 238 } 239 240 /** 241 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's 242 * thread. 243 */ 244 static inline void assert_on_cache_thread(struct vdo_page_cache *cache, 245 const char *function_name) 246 { 247 thread_id_t thread_id = vdo_get_callback_thread_id(); 248 249 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id), 250 "%s() must only be called on cache thread %d, not thread %d", 251 function_name, cache->zone->thread_id, thread_id); 252 } 253 254 /** assert_io_allowed() - Assert that a page cache may issue I/O. */ 255 static inline void assert_io_allowed(struct vdo_page_cache *cache) 256 { 257 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state), 258 "VDO page cache may issue I/O"); 259 } 260 261 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */ 262 static void report_cache_pressure(struct vdo_page_cache *cache) 263 { 264 ADD_ONCE(cache->stats.cache_pressure, 1); 265 if (cache->waiter_count > cache->page_count) { 266 if ((cache->pressure_report % LOG_INTERVAL) == 0) 267 vdo_log_info("page cache pressure %u", cache->stats.cache_pressure); 268 269 if (++cache->pressure_report >= DISPLAY_INTERVAL) 270 cache->pressure_report = 0; 271 } 272 } 273 274 /** 275 * get_page_state_name() - Return the name of a page state. 276 * 277 * If the page state is invalid a static string is returned and the invalid state is logged. 278 * 279 * Return: A pointer to a static page state name. 280 */ 281 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state) 282 { 283 int result; 284 static const char * const state_names[] = { 285 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING" 286 }; 287 288 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT); 289 290 result = VDO_ASSERT(state < ARRAY_SIZE(state_names), 291 "Unknown page_state value %d", state); 292 if (result != VDO_SUCCESS) 293 return "[UNKNOWN PAGE STATE]"; 294 295 return state_names[state]; 296 } 297 298 /** 299 * update_counter() - Update the counter associated with a given state. 300 * @info: The page info to count. 301 * @delta: The delta to apply to the counter. 302 */ 303 static void update_counter(struct page_info *info, s32 delta) 304 { 305 struct block_map_statistics *stats = &info->cache->stats; 306 307 switch (info->state) { 308 case PS_FREE: 309 ADD_ONCE(stats->free_pages, delta); 310 return; 311 312 case PS_INCOMING: 313 ADD_ONCE(stats->incoming_pages, delta); 314 return; 315 316 case PS_OUTGOING: 317 ADD_ONCE(stats->outgoing_pages, delta); 318 return; 319 320 case PS_FAILED: 321 ADD_ONCE(stats->failed_pages, delta); 322 return; 323 324 case PS_RESIDENT: 325 ADD_ONCE(stats->clean_pages, delta); 326 return; 327 328 case PS_DIRTY: 329 ADD_ONCE(stats->dirty_pages, delta); 330 return; 331 332 default: 333 return; 334 } 335 } 336 337 /** update_lru() - Update the lru information for an active page. */ 338 static void update_lru(struct page_info *info) 339 { 340 if (info->cache->lru_list.prev != &info->lru_entry) 341 list_move_tail(&info->lru_entry, &info->cache->lru_list); 342 } 343 344 /** 345 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting 346 * counters. 347 */ 348 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state) 349 { 350 if (new_state == info->state) 351 return; 352 353 update_counter(info, -1); 354 info->state = new_state; 355 update_counter(info, 1); 356 357 switch (info->state) { 358 case PS_FREE: 359 case PS_FAILED: 360 list_move_tail(&info->state_entry, &info->cache->free_list); 361 return; 362 363 case PS_OUTGOING: 364 list_move_tail(&info->state_entry, &info->cache->outgoing_list); 365 return; 366 367 case PS_DIRTY: 368 return; 369 370 default: 371 list_del_init(&info->state_entry); 372 } 373 } 374 375 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */ 376 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn) 377 { 378 struct vdo_page_cache *cache = info->cache; 379 380 /* Either the new or the old page number must be NO_PAGE. */ 381 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE), 382 "Must free a page before reusing it."); 383 if (result != VDO_SUCCESS) 384 return result; 385 386 if (info->pbn != NO_PAGE) 387 vdo_int_map_remove(cache->page_map, info->pbn); 388 389 info->pbn = pbn; 390 391 if (pbn != NO_PAGE) { 392 result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL); 393 if (result != VDO_SUCCESS) 394 return result; 395 } 396 return VDO_SUCCESS; 397 } 398 399 /** reset_page_info() - Reset page info to represent an unallocated page. */ 400 static int reset_page_info(struct page_info *info) 401 { 402 int result; 403 404 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy"); 405 if (result != VDO_SUCCESS) 406 return result; 407 408 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting), 409 "VDO Page must not have waiters"); 410 if (result != VDO_SUCCESS) 411 return result; 412 413 result = set_info_pbn(info, NO_PAGE); 414 set_info_state(info, PS_FREE); 415 list_del_init(&info->lru_entry); 416 return result; 417 } 418 419 /** 420 * find_free_page() - Find a free page. 421 * 422 * Return: A pointer to the page info structure (if found), NULL otherwise. 423 */ 424 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache) 425 { 426 struct page_info *info; 427 428 info = list_first_entry_or_null(&cache->free_list, struct page_info, 429 state_entry); 430 if (info != NULL) 431 list_del_init(&info->state_entry); 432 433 return info; 434 } 435 436 /** 437 * find_page() - Find the page info (if any) associated with a given pbn. 438 * @pbn: The absolute physical block number of the page. 439 * 440 * Return: The page info for the page if available, or NULL if not. 441 */ 442 static struct page_info * __must_check find_page(struct vdo_page_cache *cache, 443 physical_block_number_t pbn) 444 { 445 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn)) 446 return cache->last_found; 447 448 cache->last_found = vdo_int_map_get(cache->page_map, pbn); 449 return cache->last_found; 450 } 451 452 /** 453 * select_lru_page() - Determine which page is least recently used. 454 * 455 * Picks the least recently used from among the non-busy entries at the front of each of the lru 456 * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely 457 * that the entries at the front are busy unless the queue is very short, but not impossible. 458 * 459 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be 460 * found. The page can be dirty or resident. 461 */ 462 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache) 463 { 464 struct page_info *info; 465 466 list_for_each_entry(info, &cache->lru_list, lru_entry) 467 if ((info->busy == 0) && !is_in_flight(info)) 468 return info; 469 470 return NULL; 471 } 472 473 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */ 474 475 /** 476 * complete_with_page() - Helper to complete the VDO Page Completion request successfully. 477 * @info: The page info representing the result page. 478 * @vdo_page_comp: The VDO page completion to complete. 479 */ 480 static void complete_with_page(struct page_info *info, 481 struct vdo_page_completion *vdo_page_comp) 482 { 483 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info); 484 485 if (!available) { 486 vdo_log_error_strerror(VDO_BAD_PAGE, 487 "Requested cache page %llu in state %s is not %s", 488 (unsigned long long) info->pbn, 489 get_page_state_name(info->state), 490 vdo_page_comp->writable ? "present" : "valid"); 491 vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE); 492 return; 493 } 494 495 vdo_page_comp->info = info; 496 vdo_page_comp->ready = true; 497 vdo_finish_completion(&vdo_page_comp->completion); 498 } 499 500 /** 501 * complete_waiter_with_error() - Complete a page completion with an error code. 502 * @waiter: The page completion, as a waiter. 503 * @result_ptr: A pointer to the error code. 504 * 505 * Implements waiter_callback_fn. 506 */ 507 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr) 508 { 509 int *result = result_ptr; 510 511 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result); 512 } 513 514 /** 515 * complete_waiter_with_page() - Complete a page completion with a page. 516 * @waiter: The page completion, as a waiter. 517 * @page_info: The page info to complete with. 518 * 519 * Implements waiter_callback_fn. 520 */ 521 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info) 522 { 523 complete_with_page(page_info, page_completion_from_waiter(waiter)); 524 } 525 526 /** 527 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result. 528 * 529 * Upon completion the waitq will be empty. 530 * 531 * Return: The number of pages distributed. 532 */ 533 static unsigned int distribute_page_over_waitq(struct page_info *info, 534 struct vdo_wait_queue *waitq) 535 { 536 size_t num_pages; 537 538 update_lru(info); 539 num_pages = vdo_waitq_num_waiters(waitq); 540 541 /* 542 * Increment the busy count once for each pending completion so that this page does not 543 * stop being busy until all completions have been processed. 544 */ 545 info->busy += num_pages; 546 547 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info); 548 return num_pages; 549 } 550 551 /** 552 * set_persistent_error() - Set a persistent error which all requests will receive in the future. 553 * @context: A string describing what triggered the error. 554 * 555 * Once triggered, all enqueued completions will get this error. Any future requests will result in 556 * this error as well. 557 */ 558 static void set_persistent_error(struct vdo_page_cache *cache, const char *context, 559 int result) 560 { 561 struct page_info *info; 562 /* If we're already read-only, there's no need to log. */ 563 struct vdo *vdo = cache->vdo; 564 565 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) { 566 vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s", 567 context); 568 vdo_enter_read_only_mode(vdo, result); 569 } 570 571 assert_on_cache_thread(cache, __func__); 572 573 vdo_waitq_notify_all_waiters(&cache->free_waiters, 574 complete_waiter_with_error, &result); 575 cache->waiter_count = 0; 576 577 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 578 vdo_waitq_notify_all_waiters(&info->waiting, 579 complete_waiter_with_error, &result); 580 } 581 } 582 583 /** 584 * validate_completed_page() - Check that a page completion which is being freed to the cache 585 * referred to a valid page and is in a valid state. 586 * @writable: Whether a writable page is required. 587 * 588 * Return: VDO_SUCCESS if the page was valid, otherwise as error 589 */ 590 static int __must_check validate_completed_page(struct vdo_page_completion *completion, 591 bool writable) 592 { 593 int result; 594 595 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready"); 596 if (result != VDO_SUCCESS) 597 return result; 598 599 result = VDO_ASSERT(completion->info != NULL, 600 "VDO Page Completion must be complete"); 601 if (result != VDO_SUCCESS) 602 return result; 603 604 result = VDO_ASSERT(completion->info->pbn == completion->pbn, 605 "VDO Page Completion pbn must be consistent"); 606 if (result != VDO_SUCCESS) 607 return result; 608 609 result = VDO_ASSERT(is_valid(completion->info), 610 "VDO Page Completion page must be valid"); 611 if (result != VDO_SUCCESS) 612 return result; 613 614 if (writable) { 615 result = VDO_ASSERT(completion->writable, 616 "VDO Page Completion must be writable"); 617 if (result != VDO_SUCCESS) 618 return result; 619 } 620 621 return VDO_SUCCESS; 622 } 623 624 static void check_for_drain_complete(struct block_map_zone *zone) 625 { 626 if (vdo_is_state_draining(&zone->state) && 627 (zone->active_lookups == 0) && 628 !vdo_waitq_has_waiters(&zone->flush_waiters) && 629 !is_vio_pool_busy(zone->vio_pool) && 630 (zone->page_cache.outstanding_reads == 0) && 631 (zone->page_cache.outstanding_writes == 0)) { 632 vdo_finish_draining_with_result(&zone->state, 633 (vdo_is_read_only(zone->block_map->vdo) ? 634 VDO_READ_ONLY : VDO_SUCCESS)); 635 } 636 } 637 638 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result) 639 { 640 vdo_enter_read_only_mode(zone->block_map->vdo, result); 641 642 /* 643 * We are in read-only mode, so we won't ever write any page out. 644 * Just take all waiters off the waitq so the zone can drain. 645 */ 646 vdo_waitq_init(&zone->flush_waiters); 647 check_for_drain_complete(zone); 648 } 649 650 static bool __must_check 651 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion, 652 bool writable) 653 { 654 int result = validate_completed_page(completion, writable); 655 656 if (result == VDO_SUCCESS) 657 return true; 658 659 enter_zone_read_only_mode(completion->info->cache->zone, result); 660 return false; 661 } 662 663 /** 664 * handle_load_error() - Handle page load errors. 665 * @completion: The page read vio. 666 */ 667 static void handle_load_error(struct vdo_completion *completion) 668 { 669 int result = completion->result; 670 struct page_info *info = completion->parent; 671 struct vdo_page_cache *cache = info->cache; 672 673 assert_on_cache_thread(cache, __func__); 674 vio_record_metadata_io_error(as_vio(completion)); 675 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result); 676 ADD_ONCE(cache->stats.failed_reads, 1); 677 set_info_state(info, PS_FAILED); 678 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result); 679 reset_page_info(info); 680 681 /* 682 * Don't decrement until right before calling check_for_drain_complete() to 683 * ensure that the above work can't cause the page cache to be freed out from under us. 684 */ 685 cache->outstanding_reads--; 686 check_for_drain_complete(cache->zone); 687 } 688 689 /** 690 * page_is_loaded() - Callback used when a page has been loaded. 691 * @completion: The vio which has loaded the page. Its parent is the page_info. 692 */ 693 static void page_is_loaded(struct vdo_completion *completion) 694 { 695 struct page_info *info = completion->parent; 696 struct vdo_page_cache *cache = info->cache; 697 nonce_t nonce = info->cache->zone->block_map->nonce; 698 struct block_map_page *page; 699 enum block_map_page_validity validity; 700 701 assert_on_cache_thread(cache, __func__); 702 703 page = (struct block_map_page *) get_page_buffer(info); 704 validity = vdo_validate_block_map_page(page, nonce, info->pbn); 705 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 706 physical_block_number_t pbn = vdo_get_block_map_page_pbn(page); 707 int result = vdo_log_error_strerror(VDO_BAD_PAGE, 708 "Expected page %llu but got page %llu instead", 709 (unsigned long long) info->pbn, 710 (unsigned long long) pbn); 711 712 vdo_continue_completion(completion, result); 713 return; 714 } 715 716 if (validity == VDO_BLOCK_MAP_PAGE_INVALID) 717 vdo_format_block_map_page(page, nonce, info->pbn, false); 718 719 info->recovery_lock = 0; 720 set_info_state(info, PS_RESIDENT); 721 distribute_page_over_waitq(info, &info->waiting); 722 723 /* 724 * Don't decrement until right before calling check_for_drain_complete() to 725 * ensure that the above work can't cause the page cache to be freed out from under us. 726 */ 727 cache->outstanding_reads--; 728 check_for_drain_complete(cache->zone); 729 } 730 731 /** 732 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild. 733 * @completion: The page load completion. 734 */ 735 static void handle_rebuild_read_error(struct vdo_completion *completion) 736 { 737 struct page_info *info = completion->parent; 738 struct vdo_page_cache *cache = info->cache; 739 740 assert_on_cache_thread(cache, __func__); 741 742 /* 743 * We are doing a read-only rebuild, so treat this as a successful read 744 * of an uninitialized page. 745 */ 746 vio_record_metadata_io_error(as_vio(completion)); 747 ADD_ONCE(cache->stats.failed_reads, 1); 748 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE); 749 vdo_reset_completion(completion); 750 page_is_loaded(completion); 751 } 752 753 static void load_cache_page_endio(struct bio *bio) 754 { 755 struct vio *vio = bio->bi_private; 756 struct page_info *info = vio->completion.parent; 757 758 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id); 759 } 760 761 /** 762 * launch_page_load() - Begin the process of loading a page. 763 * 764 * Return: VDO_SUCCESS or an error code. 765 */ 766 static int __must_check launch_page_load(struct page_info *info, 767 physical_block_number_t pbn) 768 { 769 int result; 770 vdo_action_fn callback; 771 struct vdo_page_cache *cache = info->cache; 772 773 assert_io_allowed(cache); 774 775 result = set_info_pbn(info, pbn); 776 if (result != VDO_SUCCESS) 777 return result; 778 779 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading."); 780 if (result != VDO_SUCCESS) 781 return result; 782 783 set_info_state(info, PS_INCOMING); 784 cache->outstanding_reads++; 785 ADD_ONCE(cache->stats.pages_loaded, 1); 786 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error); 787 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio, 788 callback, REQ_OP_READ | REQ_PRIO); 789 return VDO_SUCCESS; 790 } 791 792 static void write_pages(struct vdo_completion *completion); 793 794 /** handle_flush_error() - Handle errors flushing the layer. */ 795 static void handle_flush_error(struct vdo_completion *completion) 796 { 797 struct page_info *info = completion->parent; 798 799 vio_record_metadata_io_error(as_vio(completion)); 800 set_persistent_error(info->cache, "flush failed", completion->result); 801 write_pages(completion); 802 } 803 804 static void flush_endio(struct bio *bio) 805 { 806 struct vio *vio = bio->bi_private; 807 struct page_info *info = vio->completion.parent; 808 809 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id); 810 } 811 812 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */ 813 static void save_pages(struct vdo_page_cache *cache) 814 { 815 struct page_info *info; 816 struct vio *vio; 817 818 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0)) 819 return; 820 821 assert_io_allowed(cache); 822 823 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry); 824 825 cache->pages_in_flush = cache->pages_to_flush; 826 cache->pages_to_flush = 0; 827 ADD_ONCE(cache->stats.flush_count, 1); 828 829 vio = info->vio; 830 831 /* 832 * We must make sure that the recovery journal entries that changed these pages were 833 * successfully persisted, and thus must issue a flush before each batch of pages is 834 * written to ensure this. 835 */ 836 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 837 } 838 839 /** 840 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved. 841 * 842 * Once in the list, a page may not be used until it has been written out. 843 */ 844 static void schedule_page_save(struct page_info *info) 845 { 846 if (info->busy > 0) { 847 info->write_status = WRITE_STATUS_DEFERRED; 848 return; 849 } 850 851 info->cache->pages_to_flush++; 852 info->cache->outstanding_writes++; 853 set_info_state(info, PS_OUTGOING); 854 } 855 856 /** 857 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving 858 * pages if another save is not in progress. 859 */ 860 static void launch_page_save(struct page_info *info) 861 { 862 schedule_page_save(info); 863 save_pages(info->cache); 864 } 865 866 /** 867 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is 868 * requesting a given page number. 869 * @context: A pointer to the pbn of the desired page. 870 * 871 * Implements waiter_match_fn. 872 * 873 * Return: true if the page completion is for the desired page number. 874 */ 875 static bool completion_needs_page(struct vdo_waiter *waiter, void *context) 876 { 877 physical_block_number_t *pbn = context; 878 879 return (page_completion_from_waiter(waiter)->pbn == *pbn); 880 } 881 882 /** 883 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and 884 * any other completions that match it in page number. 885 */ 886 static void allocate_free_page(struct page_info *info) 887 { 888 int result; 889 struct vdo_waiter *oldest_waiter; 890 physical_block_number_t pbn; 891 struct vdo_page_cache *cache = info->cache; 892 893 assert_on_cache_thread(cache, __func__); 894 895 if (!vdo_waitq_has_waiters(&cache->free_waiters)) { 896 if (cache->stats.cache_pressure > 0) { 897 vdo_log_info("page cache pressure relieved"); 898 WRITE_ONCE(cache->stats.cache_pressure, 0); 899 } 900 901 return; 902 } 903 904 result = reset_page_info(info); 905 if (result != VDO_SUCCESS) { 906 set_persistent_error(cache, "cannot reset page info", result); 907 return; 908 } 909 910 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters); 911 pbn = page_completion_from_waiter(oldest_waiter)->pbn; 912 913 /* 914 * Remove all entries which match the page number in question and push them onto the page 915 * info's waitq. 916 */ 917 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page, 918 &pbn, &info->waiting); 919 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting); 920 921 result = launch_page_load(info, pbn); 922 if (result != VDO_SUCCESS) { 923 vdo_waitq_notify_all_waiters(&info->waiting, 924 complete_waiter_with_error, &result); 925 } 926 } 927 928 /** 929 * discard_a_page() - Begin the process of discarding a page. 930 * 931 * If no page is discardable, increments a count of deferred frees so that the next release of a 932 * page which is no longer busy will kick off another discard cycle. This is an indication that the 933 * cache is not big enough. 934 * 935 * If the selected page is not dirty, immediately allocates the page to the oldest completion 936 * waiting for a free page. 937 */ 938 static void discard_a_page(struct vdo_page_cache *cache) 939 { 940 struct page_info *info = select_lru_page(cache); 941 942 if (info == NULL) { 943 report_cache_pressure(cache); 944 return; 945 } 946 947 if (!is_dirty(info)) { 948 allocate_free_page(info); 949 return; 950 } 951 952 VDO_ASSERT_LOG_ONLY(!is_in_flight(info), 953 "page selected for discard is not in flight"); 954 955 cache->discard_count++; 956 info->write_status = WRITE_STATUS_DISCARD; 957 launch_page_save(info); 958 } 959 960 /** 961 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get 962 * a different page. 963 */ 964 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp) 965 { 966 struct vdo_page_cache *cache = vdo_page_comp->cache; 967 968 cache->waiter_count++; 969 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter); 970 discard_a_page(cache); 971 } 972 973 /** 974 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free 975 * page. 976 * @cache: The page cache. 977 */ 978 static void discard_page_if_needed(struct vdo_page_cache *cache) 979 { 980 if (cache->waiter_count > cache->discard_count) 981 discard_a_page(cache); 982 } 983 984 /** 985 * write_has_finished() - Inform the cache that a write has finished (possibly with an error). 986 * @info: The info structure for the page whose write just completed. 987 * 988 * Return: true if the page write was a discard. 989 */ 990 static bool write_has_finished(struct page_info *info) 991 { 992 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD); 993 994 assert_on_cache_thread(info->cache, __func__); 995 info->cache->outstanding_writes--; 996 997 info->write_status = WRITE_STATUS_NORMAL; 998 return was_discard; 999 } 1000 1001 /** 1002 * handle_page_write_error() - Handler for page write errors. 1003 * @completion: The page write vio. 1004 */ 1005 static void handle_page_write_error(struct vdo_completion *completion) 1006 { 1007 int result = completion->result; 1008 struct page_info *info = completion->parent; 1009 struct vdo_page_cache *cache = info->cache; 1010 1011 vio_record_metadata_io_error(as_vio(completion)); 1012 1013 /* If we're already read-only, write failures are to be expected. */ 1014 if (result != VDO_READ_ONLY) { 1015 vdo_log_ratelimit(vdo_log_error, 1016 "failed to write block map page %llu", 1017 (unsigned long long) info->pbn); 1018 } 1019 1020 set_info_state(info, PS_DIRTY); 1021 ADD_ONCE(cache->stats.failed_writes, 1); 1022 set_persistent_error(cache, "cannot write page", result); 1023 1024 if (!write_has_finished(info)) 1025 discard_page_if_needed(cache); 1026 1027 check_for_drain_complete(cache->zone); 1028 } 1029 1030 static void page_is_written_out(struct vdo_completion *completion); 1031 1032 static void write_cache_page_endio(struct bio *bio) 1033 { 1034 struct vio *vio = bio->bi_private; 1035 struct page_info *info = vio->completion.parent; 1036 1037 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id); 1038 } 1039 1040 /** 1041 * page_is_written_out() - Callback used when a page has been written out. 1042 * @completion: The vio which wrote the page. Its parent is a page_info. 1043 */ 1044 static void page_is_written_out(struct vdo_completion *completion) 1045 { 1046 bool was_discard, reclaimed; 1047 u32 reclamations; 1048 struct page_info *info = completion->parent; 1049 struct vdo_page_cache *cache = info->cache; 1050 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info); 1051 1052 if (!page->header.initialized) { 1053 page->header.initialized = true; 1054 vdo_submit_metadata_vio(info->vio, info->pbn, 1055 write_cache_page_endio, 1056 handle_page_write_error, 1057 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH); 1058 return; 1059 } 1060 1061 /* Handle journal updates and torn write protection. */ 1062 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal, 1063 info->recovery_lock, 1064 VDO_ZONE_TYPE_LOGICAL, 1065 cache->zone->zone_number); 1066 info->recovery_lock = 0; 1067 was_discard = write_has_finished(info); 1068 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting)); 1069 1070 set_info_state(info, PS_RESIDENT); 1071 1072 reclamations = distribute_page_over_waitq(info, &info->waiting); 1073 ADD_ONCE(cache->stats.reclaimed, reclamations); 1074 1075 if (was_discard) 1076 cache->discard_count--; 1077 1078 if (reclaimed) 1079 discard_page_if_needed(cache); 1080 else 1081 allocate_free_page(info); 1082 1083 check_for_drain_complete(cache->zone); 1084 } 1085 1086 /** 1087 * write_pages() - Write the batch of pages which were covered by the layer flush which just 1088 * completed. 1089 * @flush_completion: The flush vio. 1090 * 1091 * This callback is registered in save_pages(). 1092 */ 1093 static void write_pages(struct vdo_completion *flush_completion) 1094 { 1095 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache; 1096 1097 /* 1098 * We need to cache these two values on the stack since it is possible for the last 1099 * page info to cause the page cache to get freed. Hence once we launch the last page, 1100 * it may be unsafe to dereference the cache. 1101 */ 1102 bool has_unflushed_pages = (cache->pages_to_flush > 0); 1103 page_count_t pages_in_flush = cache->pages_in_flush; 1104 1105 cache->pages_in_flush = 0; 1106 while (pages_in_flush-- > 0) { 1107 struct page_info *info = 1108 list_first_entry(&cache->outgoing_list, struct page_info, 1109 state_entry); 1110 1111 list_del_init(&info->state_entry); 1112 if (vdo_is_read_only(info->cache->vdo)) { 1113 struct vdo_completion *completion = &info->vio->completion; 1114 1115 vdo_reset_completion(completion); 1116 completion->callback = page_is_written_out; 1117 completion->error_handler = handle_page_write_error; 1118 vdo_fail_completion(completion, VDO_READ_ONLY); 1119 continue; 1120 } 1121 ADD_ONCE(info->cache->stats.pages_saved, 1); 1122 vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio, 1123 handle_page_write_error, REQ_OP_WRITE | REQ_PRIO); 1124 } 1125 1126 if (has_unflushed_pages) { 1127 /* 1128 * If there are unflushed pages, the cache can't have been freed, so this call is 1129 * safe. 1130 */ 1131 save_pages(cache); 1132 } 1133 } 1134 1135 /** 1136 * vdo_release_page_completion() - Release a VDO Page Completion. 1137 * 1138 * The page referenced by this completion (if any) will no longer be held busy by this completion. 1139 * If a page becomes discardable and there are completions awaiting free pages then a new round of 1140 * page discarding is started. 1141 */ 1142 void vdo_release_page_completion(struct vdo_completion *completion) 1143 { 1144 struct page_info *discard_info = NULL; 1145 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion); 1146 struct vdo_page_cache *cache; 1147 1148 if (completion->result == VDO_SUCCESS) { 1149 if (!validate_completed_page_or_enter_read_only_mode(page_completion, false)) 1150 return; 1151 1152 if (--page_completion->info->busy == 0) 1153 discard_info = page_completion->info; 1154 } 1155 1156 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1157 "Page being released after leaving all queues"); 1158 1159 page_completion->info = NULL; 1160 cache = page_completion->cache; 1161 assert_on_cache_thread(cache, __func__); 1162 1163 if (discard_info != NULL) { 1164 if (discard_info->write_status == WRITE_STATUS_DEFERRED) { 1165 discard_info->write_status = WRITE_STATUS_NORMAL; 1166 launch_page_save(discard_info); 1167 } 1168 1169 /* 1170 * if there are excess requests for pages (that have not already started discards) 1171 * we need to discard some page (which may be this one) 1172 */ 1173 discard_page_if_needed(cache); 1174 } 1175 } 1176 1177 /** 1178 * load_page_for_completion() - Helper function to load a page as described by a VDO Page 1179 * Completion. 1180 */ 1181 static void load_page_for_completion(struct page_info *info, 1182 struct vdo_page_completion *vdo_page_comp) 1183 { 1184 int result; 1185 1186 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter); 1187 result = launch_page_load(info, vdo_page_comp->pbn); 1188 if (result != VDO_SUCCESS) { 1189 vdo_waitq_notify_all_waiters(&info->waiting, 1190 complete_waiter_with_error, &result); 1191 } 1192 } 1193 1194 /** 1195 * vdo_get_page() - Initialize a page completion and get a block map page. 1196 * @page_completion: The vdo_page_completion to initialize. 1197 * @zone: The block map zone of the desired page. 1198 * @pbn: The absolute physical block of the desired page. 1199 * @writable: Whether the page can be modified. 1200 * @parent: The object to notify when the fetch is complete. 1201 * @callback: The notification callback. 1202 * @error_handler: The handler for fetch errors. 1203 * @requeue: Whether we must requeue when notifying the parent. 1204 * 1205 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated 1206 * by the completion to be loaded from disk. When the callback is invoked, the page will be 1207 * resident in the cache and marked busy. All callers must call vdo_release_page_completion() 1208 * when they are done with the page to clear the busy mark. 1209 */ 1210 void vdo_get_page(struct vdo_page_completion *page_completion, 1211 struct block_map_zone *zone, physical_block_number_t pbn, 1212 bool writable, void *parent, vdo_action_fn callback, 1213 vdo_action_fn error_handler, bool requeue) 1214 { 1215 struct vdo_page_cache *cache = &zone->page_cache; 1216 struct vdo_completion *completion = &page_completion->completion; 1217 struct page_info *info; 1218 1219 assert_on_cache_thread(cache, __func__); 1220 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1221 "New page completion was not already on a wait queue"); 1222 1223 *page_completion = (struct vdo_page_completion) { 1224 .pbn = pbn, 1225 .writable = writable, 1226 .cache = cache, 1227 }; 1228 1229 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION); 1230 vdo_prepare_completion(completion, callback, error_handler, 1231 cache->zone->thread_id, parent); 1232 completion->requeue = requeue; 1233 1234 if (page_completion->writable && vdo_is_read_only(cache->vdo)) { 1235 vdo_fail_completion(completion, VDO_READ_ONLY); 1236 return; 1237 } 1238 1239 if (page_completion->writable) 1240 ADD_ONCE(cache->stats.write_count, 1); 1241 else 1242 ADD_ONCE(cache->stats.read_count, 1); 1243 1244 info = find_page(cache, page_completion->pbn); 1245 if (info != NULL) { 1246 /* The page is in the cache already. */ 1247 if ((info->write_status == WRITE_STATUS_DEFERRED) || 1248 is_incoming(info) || 1249 (is_outgoing(info) && page_completion->writable)) { 1250 /* The page is unusable until it has finished I/O. */ 1251 ADD_ONCE(cache->stats.wait_for_page, 1); 1252 vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter); 1253 return; 1254 } 1255 1256 if (is_valid(info)) { 1257 /* The page is usable. */ 1258 ADD_ONCE(cache->stats.found_in_cache, 1); 1259 if (!is_present(info)) 1260 ADD_ONCE(cache->stats.read_outgoing, 1); 1261 update_lru(info); 1262 info->busy++; 1263 complete_with_page(info, page_completion); 1264 return; 1265 } 1266 1267 /* Something horrible has gone wrong. */ 1268 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state."); 1269 } 1270 1271 /* The page must be fetched. */ 1272 info = find_free_page(cache); 1273 if (info != NULL) { 1274 ADD_ONCE(cache->stats.fetch_required, 1); 1275 load_page_for_completion(info, page_completion); 1276 return; 1277 } 1278 1279 /* The page must wait for a page to be discarded. */ 1280 ADD_ONCE(cache->stats.discard_required, 1); 1281 discard_page_for_completion(page_completion); 1282 } 1283 1284 /** 1285 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy. 1286 * @completion: The vdo_page_completion containing the page. 1287 */ 1288 void vdo_request_page_write(struct vdo_completion *completion) 1289 { 1290 struct page_info *info; 1291 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion); 1292 1293 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true)) 1294 return; 1295 1296 info = vdo_page_comp->info; 1297 set_info_state(info, PS_DIRTY); 1298 launch_page_save(info); 1299 } 1300 1301 /** 1302 * vdo_get_cached_page() - Get the block map page from a page completion. 1303 * @completion: A vdo page completion whose callback has been called. 1304 * @page_ptr: A pointer to hold the page 1305 * 1306 * Return: VDO_SUCCESS or an error 1307 */ 1308 int vdo_get_cached_page(struct vdo_completion *completion, 1309 struct block_map_page **page_ptr) 1310 { 1311 int result; 1312 struct vdo_page_completion *vpc; 1313 1314 vpc = as_vdo_page_completion(completion); 1315 result = validate_completed_page(vpc, true); 1316 if (result == VDO_SUCCESS) 1317 *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info); 1318 1319 return result; 1320 } 1321 1322 /** 1323 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache. 1324 * 1325 * There must not be any dirty pages in the cache. 1326 * 1327 * Return: A success or error code. 1328 */ 1329 int vdo_invalidate_page_cache(struct vdo_page_cache *cache) 1330 { 1331 struct page_info *info; 1332 1333 assert_on_cache_thread(cache, __func__); 1334 1335 /* Make sure we don't throw away any dirty pages. */ 1336 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 1337 int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages"); 1338 1339 if (result != VDO_SUCCESS) 1340 return result; 1341 } 1342 1343 /* Reset the page map by re-allocating it. */ 1344 vdo_int_map_free(vdo_forget(cache->page_map)); 1345 return vdo_int_map_create(cache->page_count, &cache->page_map); 1346 } 1347 1348 /** 1349 * get_tree_page_by_index() - Get the tree page for a given height and page index. 1350 * 1351 * Return: The requested page. 1352 */ 1353 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest, 1354 root_count_t root_index, 1355 height_t height, 1356 page_number_t page_index) 1357 { 1358 page_number_t offset = 0; 1359 size_t segment; 1360 1361 for (segment = 0; segment < forest->segments; segment++) { 1362 page_number_t border = forest->boundaries[segment].levels[height - 1]; 1363 1364 if (page_index < border) { 1365 struct block_map_tree *tree = &forest->trees[root_index]; 1366 1367 return &(tree->segments[segment].levels[height - 1][page_index - offset]); 1368 } 1369 1370 offset = border; 1371 } 1372 1373 return NULL; 1374 } 1375 1376 /* Get the page referred to by the lock's tree slot at its current height. */ 1377 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone, 1378 const struct tree_lock *lock) 1379 { 1380 return get_tree_page_by_index(zone->block_map->forest, lock->root_index, 1381 lock->height, 1382 lock->tree_slots[lock->height].page_index); 1383 } 1384 1385 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */ 1386 bool vdo_copy_valid_page(char *buffer, nonce_t nonce, 1387 physical_block_number_t pbn, 1388 struct block_map_page *page) 1389 { 1390 struct block_map_page *loaded = (struct block_map_page *) buffer; 1391 enum block_map_page_validity validity = 1392 vdo_validate_block_map_page(loaded, nonce, pbn); 1393 1394 if (validity == VDO_BLOCK_MAP_PAGE_VALID) { 1395 memcpy(page, loaded, VDO_BLOCK_SIZE); 1396 return true; 1397 } 1398 1399 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 1400 vdo_log_error_strerror(VDO_BAD_PAGE, 1401 "Expected page %llu but got page %llu instead", 1402 (unsigned long long) pbn, 1403 (unsigned long long) vdo_get_block_map_page_pbn(loaded)); 1404 } 1405 1406 return false; 1407 } 1408 1409 /** 1410 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within 1411 * a cyclic range of values from 0 to (modulus - 1). 1412 * @lower: The lowest value to accept. 1413 * @value: The value to check. 1414 * @upper: The highest value to accept. 1415 * @modulus: The size of the cyclic space, no more than 2^15. 1416 * 1417 * The value and both bounds must be smaller than the modulus. 1418 * 1419 * Return: true if the value is in range. 1420 */ 1421 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus) 1422 { 1423 if (value < lower) 1424 value += modulus; 1425 if (upper < lower) 1426 upper += modulus; 1427 return (value <= upper); 1428 } 1429 1430 /** 1431 * is_not_older() - Check whether a generation is strictly older than some other generation in the 1432 * context of a zone's current generation range. 1433 * @zone: The zone in which to do the comparison. 1434 * @a: The generation in question. 1435 * @b: The generation to compare to. 1436 * 1437 * Return: true if generation @a is not strictly older than generation @b in the context of @zone 1438 */ 1439 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b) 1440 { 1441 int result; 1442 1443 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) && 1444 in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)), 1445 "generation(s) %u, %u are out of range [%u, %u]", 1446 a, b, zone->oldest_generation, zone->generation); 1447 if (result != VDO_SUCCESS) { 1448 enter_zone_read_only_mode(zone, result); 1449 return true; 1450 } 1451 1452 return in_cyclic_range(b, a, zone->generation, 1 << 8); 1453 } 1454 1455 static void release_generation(struct block_map_zone *zone, u8 generation) 1456 { 1457 int result; 1458 1459 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0), 1460 "dirty page count underflow for generation %u", generation); 1461 if (result != VDO_SUCCESS) { 1462 enter_zone_read_only_mode(zone, result); 1463 return; 1464 } 1465 1466 zone->dirty_page_counts[generation]--; 1467 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) && 1468 (zone->oldest_generation != zone->generation)) 1469 zone->oldest_generation++; 1470 } 1471 1472 static void set_generation(struct block_map_zone *zone, struct tree_page *page, 1473 u8 new_generation) 1474 { 1475 u32 new_count; 1476 int result; 1477 bool decrement_old = vdo_waiter_is_waiting(&page->waiter); 1478 u8 old_generation = page->generation; 1479 1480 if (decrement_old && (old_generation == new_generation)) 1481 return; 1482 1483 page->generation = new_generation; 1484 new_count = ++zone->dirty_page_counts[new_generation]; 1485 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u", 1486 new_generation); 1487 if (result != VDO_SUCCESS) { 1488 enter_zone_read_only_mode(zone, result); 1489 return; 1490 } 1491 1492 if (decrement_old) 1493 release_generation(zone, old_generation); 1494 } 1495 1496 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio); 1497 1498 /* Implements waiter_callback_fn */ 1499 static void write_page_callback(struct vdo_waiter *waiter, void *context) 1500 { 1501 write_page(container_of(waiter, struct tree_page, waiter), context); 1502 } 1503 1504 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone) 1505 { 1506 waiter->callback = write_page_callback; 1507 acquire_vio_from_pool(zone->vio_pool, waiter); 1508 } 1509 1510 /* Return: true if all possible generations were not already active */ 1511 static bool attempt_increment(struct block_map_zone *zone) 1512 { 1513 u8 generation = zone->generation + 1; 1514 1515 if (zone->oldest_generation == generation) 1516 return false; 1517 1518 zone->generation = generation; 1519 return true; 1520 } 1521 1522 /* Launches a flush if one is not already in progress. */ 1523 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone) 1524 { 1525 if ((zone->flusher == NULL) && attempt_increment(zone)) { 1526 zone->flusher = page; 1527 acquire_vio(&page->waiter, zone); 1528 return; 1529 } 1530 1531 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter); 1532 } 1533 1534 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context) 1535 { 1536 struct tree_page *page = container_of(waiter, struct tree_page, waiter); 1537 struct write_if_not_dirtied_context *write_context = context; 1538 1539 if (page->generation == write_context->generation) { 1540 acquire_vio(waiter, write_context->zone); 1541 return; 1542 } 1543 1544 enqueue_page(page, write_context->zone); 1545 } 1546 1547 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio) 1548 { 1549 return_vio_to_pool(zone->vio_pool, vio); 1550 check_for_drain_complete(zone); 1551 } 1552 1553 /* This callback is registered in write_initialized_page(). */ 1554 static void finish_page_write(struct vdo_completion *completion) 1555 { 1556 bool dirty; 1557 struct vio *vio = as_vio(completion); 1558 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1559 struct tree_page *page = completion->parent; 1560 struct block_map_zone *zone = pooled->context; 1561 1562 vdo_release_recovery_journal_block_reference(zone->block_map->journal, 1563 page->writing_recovery_lock, 1564 VDO_ZONE_TYPE_LOGICAL, 1565 zone->zone_number); 1566 1567 dirty = (page->writing_generation != page->generation); 1568 release_generation(zone, page->writing_generation); 1569 page->writing = false; 1570 1571 if (zone->flusher == page) { 1572 struct write_if_not_dirtied_context context = { 1573 .zone = zone, 1574 .generation = page->writing_generation, 1575 }; 1576 1577 vdo_waitq_notify_all_waiters(&zone->flush_waiters, 1578 write_page_if_not_dirtied, &context); 1579 if (dirty && attempt_increment(zone)) { 1580 write_page(page, pooled); 1581 return; 1582 } 1583 1584 zone->flusher = NULL; 1585 } 1586 1587 if (dirty) { 1588 enqueue_page(page, zone); 1589 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) && 1590 attempt_increment(zone)) { 1591 zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters), 1592 struct tree_page, waiter); 1593 write_page(zone->flusher, pooled); 1594 return; 1595 } 1596 1597 return_to_pool(zone, pooled); 1598 } 1599 1600 static void handle_write_error(struct vdo_completion *completion) 1601 { 1602 int result = completion->result; 1603 struct vio *vio = as_vio(completion); 1604 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1605 struct block_map_zone *zone = pooled->context; 1606 1607 vio_record_metadata_io_error(vio); 1608 enter_zone_read_only_mode(zone, result); 1609 return_to_pool(zone, pooled); 1610 } 1611 1612 static void write_page_endio(struct bio *bio); 1613 1614 static void write_initialized_page(struct vdo_completion *completion) 1615 { 1616 struct vio *vio = as_vio(completion); 1617 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1618 struct block_map_zone *zone = pooled->context; 1619 struct tree_page *tree_page = completion->parent; 1620 struct block_map_page *page = (struct block_map_page *) vio->data; 1621 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO; 1622 1623 /* 1624 * Now that we know the page has been written at least once, mark the copy we are writing 1625 * as initialized. 1626 */ 1627 page->header.initialized = true; 1628 1629 if (zone->flusher == tree_page) 1630 operation |= REQ_PREFLUSH; 1631 1632 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page), 1633 write_page_endio, handle_write_error, 1634 operation); 1635 } 1636 1637 static void write_page_endio(struct bio *bio) 1638 { 1639 struct pooled_vio *vio = bio->bi_private; 1640 struct block_map_zone *zone = vio->context; 1641 struct block_map_page *page = (struct block_map_page *) vio->vio.data; 1642 1643 continue_vio_after_io(&vio->vio, 1644 (page->header.initialized ? 1645 finish_page_write : write_initialized_page), 1646 zone->thread_id); 1647 } 1648 1649 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio) 1650 { 1651 struct vdo_completion *completion = &vio->vio.completion; 1652 struct block_map_zone *zone = vio->context; 1653 struct block_map_page *page = vdo_as_block_map_page(tree_page); 1654 1655 if ((zone->flusher != tree_page) && 1656 is_not_older(zone, tree_page->generation, zone->generation)) { 1657 /* 1658 * This page was re-dirtied after the last flush was issued, hence we need to do 1659 * another flush. 1660 */ 1661 enqueue_page(tree_page, zone); 1662 return_to_pool(zone, vio); 1663 return; 1664 } 1665 1666 completion->parent = tree_page; 1667 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE); 1668 completion->callback_thread_id = zone->thread_id; 1669 1670 tree_page->writing = true; 1671 tree_page->writing_generation = tree_page->generation; 1672 tree_page->writing_recovery_lock = tree_page->recovery_lock; 1673 1674 /* Clear this now so that we know this page is not on any dirty list. */ 1675 tree_page->recovery_lock = 0; 1676 1677 /* 1678 * We've already copied the page into the vio which will write it, so if it was not yet 1679 * initialized, the first write will indicate that (for torn write protection). It is now 1680 * safe to mark it as initialized in memory since if the write fails, the in memory state 1681 * will become irrelevant. 1682 */ 1683 if (page->header.initialized) { 1684 write_initialized_page(completion); 1685 return; 1686 } 1687 1688 page->header.initialized = true; 1689 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page), 1690 write_page_endio, handle_write_error, 1691 REQ_OP_WRITE | REQ_PRIO); 1692 } 1693 1694 /* Release a lock on a page which was being loaded or allocated. */ 1695 static void release_page_lock(struct data_vio *data_vio, char *what) 1696 { 1697 struct block_map_zone *zone; 1698 struct tree_lock *lock_holder; 1699 struct tree_lock *lock = &data_vio->tree_lock; 1700 1701 VDO_ASSERT_LOG_ONLY(lock->locked, 1702 "release of unlocked block map page %s for key %llu in tree %u", 1703 what, (unsigned long long) lock->key, lock->root_index); 1704 1705 zone = data_vio->logical.zone->block_map_zone; 1706 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key); 1707 VDO_ASSERT_LOG_ONLY((lock_holder == lock), 1708 "block map page %s mismatch for key %llu in tree %u", 1709 what, (unsigned long long) lock->key, lock->root_index); 1710 lock->locked = false; 1711 } 1712 1713 static void finish_lookup(struct data_vio *data_vio, int result) 1714 { 1715 data_vio->tree_lock.height = 0; 1716 1717 --data_vio->logical.zone->block_map_zone->active_lookups; 1718 1719 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot); 1720 data_vio->vio.completion.error_handler = handle_data_vio_error; 1721 continue_data_vio_with_error(data_vio, result); 1722 } 1723 1724 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context) 1725 { 1726 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1727 int result = *((int *) context); 1728 1729 if (!data_vio->write) { 1730 if (result == VDO_NO_SPACE) 1731 result = VDO_SUCCESS; 1732 } else if (result != VDO_NO_SPACE) { 1733 result = VDO_READ_ONLY; 1734 } 1735 1736 finish_lookup(data_vio, result); 1737 } 1738 1739 static void abort_lookup(struct data_vio *data_vio, int result, char *what) 1740 { 1741 if (result != VDO_NO_SPACE) 1742 enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result); 1743 1744 if (data_vio->tree_lock.locked) { 1745 release_page_lock(data_vio, what); 1746 vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters, 1747 abort_lookup_for_waiter, 1748 &result); 1749 } 1750 1751 finish_lookup(data_vio, result); 1752 } 1753 1754 static void abort_load(struct data_vio *data_vio, int result) 1755 { 1756 abort_lookup(data_vio, result, "load"); 1757 } 1758 1759 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo, 1760 const struct data_location *mapping, 1761 height_t height) 1762 { 1763 if (!vdo_is_valid_location(mapping) || 1764 vdo_is_state_compressed(mapping->state) || 1765 (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK))) 1766 return true; 1767 1768 /* Roots aren't physical data blocks, so we can't check their PBNs. */ 1769 if (height == VDO_BLOCK_MAP_TREE_HEIGHT) 1770 return false; 1771 1772 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn); 1773 } 1774 1775 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio); 1776 static void allocate_block_map_page(struct block_map_zone *zone, 1777 struct data_vio *data_vio); 1778 1779 static void continue_with_loaded_page(struct data_vio *data_vio, 1780 struct block_map_page *page) 1781 { 1782 struct tree_lock *lock = &data_vio->tree_lock; 1783 struct block_map_tree_slot slot = lock->tree_slots[lock->height]; 1784 struct data_location mapping = 1785 vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]); 1786 1787 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 1788 vdo_log_error_strerror(VDO_BAD_MAPPING, 1789 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 1790 (unsigned long long) mapping.pbn, mapping.state, 1791 lock->tree_slots[lock->height - 1].page_index, 1792 lock->height - 1); 1793 abort_load(data_vio, VDO_BAD_MAPPING); 1794 return; 1795 } 1796 1797 if (!vdo_is_mapped_location(&mapping)) { 1798 /* The page we need is unallocated */ 1799 allocate_block_map_page(data_vio->logical.zone->block_map_zone, 1800 data_vio); 1801 return; 1802 } 1803 1804 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 1805 if (lock->height == 1) { 1806 finish_lookup(data_vio, VDO_SUCCESS); 1807 return; 1808 } 1809 1810 /* We know what page we need to load next */ 1811 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1812 } 1813 1814 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context) 1815 { 1816 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1817 1818 data_vio->tree_lock.height--; 1819 continue_with_loaded_page(data_vio, context); 1820 } 1821 1822 static void finish_block_map_page_load(struct vdo_completion *completion) 1823 { 1824 physical_block_number_t pbn; 1825 struct tree_page *tree_page; 1826 struct block_map_page *page; 1827 nonce_t nonce; 1828 struct vio *vio = as_vio(completion); 1829 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1830 struct data_vio *data_vio = completion->parent; 1831 struct block_map_zone *zone = pooled->context; 1832 struct tree_lock *tree_lock = &data_vio->tree_lock; 1833 1834 tree_lock->height--; 1835 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn; 1836 tree_page = get_tree_page(zone, tree_lock); 1837 page = (struct block_map_page *) tree_page->page_buffer; 1838 nonce = zone->block_map->nonce; 1839 1840 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page)) 1841 vdo_format_block_map_page(page, nonce, pbn, false); 1842 return_vio_to_pool(zone->vio_pool, pooled); 1843 1844 /* Release our claim to the load and wake any waiters */ 1845 release_page_lock(data_vio, "load"); 1846 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page); 1847 continue_with_loaded_page(data_vio, page); 1848 } 1849 1850 static void handle_io_error(struct vdo_completion *completion) 1851 { 1852 int result = completion->result; 1853 struct vio *vio = as_vio(completion); 1854 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1855 struct data_vio *data_vio = completion->parent; 1856 struct block_map_zone *zone = pooled->context; 1857 1858 vio_record_metadata_io_error(vio); 1859 return_vio_to_pool(zone->vio_pool, pooled); 1860 abort_load(data_vio, result); 1861 } 1862 1863 static void load_page_endio(struct bio *bio) 1864 { 1865 struct vio *vio = bio->bi_private; 1866 struct data_vio *data_vio = vio->completion.parent; 1867 1868 continue_vio_after_io(vio, finish_block_map_page_load, 1869 data_vio->logical.zone->thread_id); 1870 } 1871 1872 static void load_page(struct vdo_waiter *waiter, void *context) 1873 { 1874 struct pooled_vio *pooled = context; 1875 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1876 struct tree_lock *lock = &data_vio->tree_lock; 1877 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn; 1878 1879 pooled->vio.completion.parent = data_vio; 1880 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio, 1881 handle_io_error, REQ_OP_READ | REQ_PRIO); 1882 } 1883 1884 /* 1885 * If the page is already locked, queue up to wait for the lock to be released. If the lock is 1886 * acquired, @data_vio->tree_lock.locked will be true. 1887 */ 1888 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio) 1889 { 1890 int result; 1891 struct tree_lock *lock_holder; 1892 struct tree_lock *lock = &data_vio->tree_lock; 1893 height_t height = lock->height; 1894 struct block_map_tree_slot tree_slot = lock->tree_slots[height]; 1895 union page_key key; 1896 1897 key.descriptor = (struct page_descriptor) { 1898 .root_index = lock->root_index, 1899 .height = height, 1900 .page_index = tree_slot.page_index, 1901 .slot = tree_slot.block_map_slot.slot, 1902 }; 1903 lock->key = key.key; 1904 1905 result = vdo_int_map_put(zone->loading_pages, lock->key, 1906 lock, false, (void **) &lock_holder); 1907 if (result != VDO_SUCCESS) 1908 return result; 1909 1910 if (lock_holder == NULL) { 1911 /* We got the lock */ 1912 data_vio->tree_lock.locked = true; 1913 return VDO_SUCCESS; 1914 } 1915 1916 /* Someone else is loading or allocating the page we need */ 1917 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter); 1918 return VDO_SUCCESS; 1919 } 1920 1921 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */ 1922 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio) 1923 { 1924 int result; 1925 1926 result = attempt_page_lock(zone, data_vio); 1927 if (result != VDO_SUCCESS) { 1928 abort_load(data_vio, result); 1929 return; 1930 } 1931 1932 if (data_vio->tree_lock.locked) { 1933 data_vio->waiter.callback = load_page; 1934 acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter); 1935 } 1936 } 1937 1938 static void allocation_failure(struct vdo_completion *completion) 1939 { 1940 struct data_vio *data_vio = as_data_vio(completion); 1941 1942 if (vdo_requeue_completion_if_needed(completion, 1943 data_vio->logical.zone->thread_id)) 1944 return; 1945 1946 abort_lookup(data_vio, completion->result, "allocation"); 1947 } 1948 1949 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context) 1950 { 1951 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1952 struct tree_lock *tree_lock = &data_vio->tree_lock; 1953 physical_block_number_t pbn = *((physical_block_number_t *) context); 1954 1955 tree_lock->height--; 1956 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn; 1957 1958 if (tree_lock->height == 0) { 1959 finish_lookup(data_vio, VDO_SUCCESS); 1960 return; 1961 } 1962 1963 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1964 } 1965 1966 /** expire_oldest_list() - Expire the oldest list. */ 1967 static void expire_oldest_list(struct dirty_lists *dirty_lists) 1968 { 1969 block_count_t i = dirty_lists->offset++; 1970 1971 dirty_lists->oldest_period++; 1972 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) { 1973 list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE], 1974 &dirty_lists->expired[VDO_TREE_PAGE]); 1975 } 1976 1977 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) { 1978 list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE], 1979 &dirty_lists->expired[VDO_CACHE_PAGE]); 1980 } 1981 1982 if (dirty_lists->offset == dirty_lists->maximum_age) 1983 dirty_lists->offset = 0; 1984 } 1985 1986 1987 /** update_period() - Update the dirty_lists period if necessary. */ 1988 static void update_period(struct dirty_lists *dirty, sequence_number_t period) 1989 { 1990 while (dirty->next_period <= period) { 1991 if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age) 1992 expire_oldest_list(dirty); 1993 dirty->next_period++; 1994 } 1995 } 1996 1997 /** write_expired_elements() - Write out the expired list. */ 1998 static void write_expired_elements(struct block_map_zone *zone) 1999 { 2000 struct tree_page *page, *ttmp; 2001 struct page_info *info, *ptmp; 2002 struct list_head *expired; 2003 u8 generation = zone->generation; 2004 2005 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE]; 2006 list_for_each_entry_safe(page, ttmp, expired, entry) { 2007 int result; 2008 2009 list_del_init(&page->entry); 2010 2011 result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter), 2012 "Newly expired page not already waiting to write"); 2013 if (result != VDO_SUCCESS) { 2014 enter_zone_read_only_mode(zone, result); 2015 continue; 2016 } 2017 2018 set_generation(zone, page, generation); 2019 if (!page->writing) 2020 enqueue_page(page, zone); 2021 } 2022 2023 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE]; 2024 list_for_each_entry_safe(info, ptmp, expired, state_entry) { 2025 list_del_init(&info->state_entry); 2026 schedule_page_save(info); 2027 } 2028 2029 save_pages(&zone->page_cache); 2030 } 2031 2032 /** 2033 * add_to_dirty_lists() - Add an element to the dirty lists. 2034 * @zone: The zone in which we are operating. 2035 * @entry: The list entry of the element to add. 2036 * @type: The type of page. 2037 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty. 2038 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a 2039 * lock. 2040 */ 2041 static void add_to_dirty_lists(struct block_map_zone *zone, 2042 struct list_head *entry, 2043 enum block_map_page_type type, 2044 sequence_number_t old_period, 2045 sequence_number_t new_period) 2046 { 2047 struct dirty_lists *dirty_lists = zone->dirty_lists; 2048 2049 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period))) 2050 return; 2051 2052 if (new_period < dirty_lists->oldest_period) { 2053 list_move_tail(entry, &dirty_lists->expired[type]); 2054 } else { 2055 update_period(dirty_lists, new_period); 2056 list_move_tail(entry, 2057 &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]); 2058 } 2059 2060 write_expired_elements(zone); 2061 } 2062 2063 /* 2064 * Record the allocation in the tree and wake any waiters now that the write lock has been 2065 * released. 2066 */ 2067 static void finish_block_map_allocation(struct vdo_completion *completion) 2068 { 2069 physical_block_number_t pbn; 2070 struct tree_page *tree_page; 2071 struct block_map_page *page; 2072 sequence_number_t old_lock; 2073 struct data_vio *data_vio = as_data_vio(completion); 2074 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2075 struct tree_lock *tree_lock = &data_vio->tree_lock; 2076 height_t height = tree_lock->height; 2077 2078 assert_data_vio_in_logical_zone(data_vio); 2079 2080 tree_page = get_tree_page(zone, tree_lock); 2081 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn; 2082 2083 /* Record the allocation. */ 2084 page = (struct block_map_page *) tree_page->page_buffer; 2085 old_lock = tree_page->recovery_lock; 2086 vdo_update_block_map_page(page, data_vio, pbn, 2087 VDO_MAPPING_STATE_UNCOMPRESSED, 2088 &tree_page->recovery_lock); 2089 2090 if (vdo_waiter_is_waiting(&tree_page->waiter)) { 2091 /* This page is waiting to be written out. */ 2092 if (zone->flusher != tree_page) { 2093 /* 2094 * The outstanding flush won't cover the update we just made, 2095 * so mark the page as needing another flush. 2096 */ 2097 set_generation(zone, tree_page, zone->generation); 2098 } 2099 } else { 2100 /* Put the page on a dirty list */ 2101 if (old_lock == 0) 2102 INIT_LIST_HEAD(&tree_page->entry); 2103 add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE, 2104 old_lock, tree_page->recovery_lock); 2105 } 2106 2107 tree_lock->height--; 2108 if (height > 1) { 2109 /* Format the interior node we just allocated (in memory). */ 2110 tree_page = get_tree_page(zone, tree_lock); 2111 vdo_format_block_map_page(tree_page->page_buffer, 2112 zone->block_map->nonce, 2113 pbn, false); 2114 } 2115 2116 /* Release our claim to the allocation and wake any waiters */ 2117 release_page_lock(data_vio, "allocation"); 2118 vdo_waitq_notify_all_waiters(&tree_lock->waiters, 2119 continue_allocation_for_waiter, &pbn); 2120 if (tree_lock->height == 0) { 2121 finish_lookup(data_vio, VDO_SUCCESS); 2122 return; 2123 } 2124 2125 allocate_block_map_page(zone, data_vio); 2126 } 2127 2128 static void release_block_map_write_lock(struct vdo_completion *completion) 2129 { 2130 struct data_vio *data_vio = as_data_vio(completion); 2131 2132 assert_data_vio_in_allocated_zone(data_vio); 2133 2134 release_data_vio_allocation_lock(data_vio, true); 2135 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation); 2136 } 2137 2138 /* 2139 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled, 2140 * to prevent deduplication against the block after we release the write lock on it, but before we 2141 * write out the page. 2142 */ 2143 static void set_block_map_page_reference_count(struct vdo_completion *completion) 2144 { 2145 struct data_vio *data_vio = as_data_vio(completion); 2146 2147 assert_data_vio_in_allocated_zone(data_vio); 2148 2149 completion->callback = release_block_map_write_lock; 2150 vdo_modify_reference_count(completion, &data_vio->increment_updater); 2151 } 2152 2153 static void journal_block_map_allocation(struct vdo_completion *completion) 2154 { 2155 struct data_vio *data_vio = as_data_vio(completion); 2156 2157 assert_data_vio_in_journal_zone(data_vio); 2158 2159 set_data_vio_allocated_zone_callback(data_vio, 2160 set_block_map_page_reference_count); 2161 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 2162 } 2163 2164 static void allocate_block(struct vdo_completion *completion) 2165 { 2166 struct data_vio *data_vio = as_data_vio(completion); 2167 struct tree_lock *lock = &data_vio->tree_lock; 2168 physical_block_number_t pbn; 2169 2170 assert_data_vio_in_allocated_zone(data_vio); 2171 2172 if (!vdo_allocate_block_in_zone(data_vio)) 2173 return; 2174 2175 pbn = data_vio->allocation.pbn; 2176 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn; 2177 data_vio->increment_updater = (struct reference_updater) { 2178 .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING, 2179 .increment = true, 2180 .zpbn = { 2181 .pbn = pbn, 2182 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 2183 }, 2184 .lock = data_vio->allocation.lock, 2185 }; 2186 2187 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation); 2188 } 2189 2190 static void allocate_block_map_page(struct block_map_zone *zone, 2191 struct data_vio *data_vio) 2192 { 2193 int result; 2194 2195 if (!data_vio->write || data_vio->is_discard) { 2196 /* This is a pure read or a discard, so there's nothing left to do here. */ 2197 finish_lookup(data_vio, VDO_SUCCESS); 2198 return; 2199 } 2200 2201 result = attempt_page_lock(zone, data_vio); 2202 if (result != VDO_SUCCESS) { 2203 abort_lookup(data_vio, result, "allocation"); 2204 return; 2205 } 2206 2207 if (!data_vio->tree_lock.locked) 2208 return; 2209 2210 data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK, 2211 allocate_block, allocation_failure); 2212 } 2213 2214 /** 2215 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio 2216 * resides and cache that result in the data_vio. 2217 * 2218 * All ancestors in the tree will be allocated or loaded, as needed. 2219 */ 2220 void vdo_find_block_map_slot(struct data_vio *data_vio) 2221 { 2222 page_number_t page_index; 2223 struct block_map_tree_slot tree_slot; 2224 struct data_location mapping; 2225 struct block_map_page *page = NULL; 2226 struct tree_lock *lock = &data_vio->tree_lock; 2227 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2228 2229 zone->active_lookups++; 2230 if (vdo_is_state_draining(&zone->state)) { 2231 finish_lookup(data_vio, VDO_SHUTTING_DOWN); 2232 return; 2233 } 2234 2235 lock->tree_slots[0].block_map_slot.slot = 2236 data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2237 page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count); 2238 tree_slot = (struct block_map_tree_slot) { 2239 .page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2240 .block_map_slot = { 2241 .pbn = 0, 2242 .slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2243 }, 2244 }; 2245 2246 for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) { 2247 physical_block_number_t pbn; 2248 2249 lock->tree_slots[lock->height] = tree_slot; 2250 page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer); 2251 pbn = vdo_get_block_map_page_pbn(page); 2252 if (pbn != VDO_ZERO_BLOCK) { 2253 lock->tree_slots[lock->height].block_map_slot.pbn = pbn; 2254 break; 2255 } 2256 2257 /* Calculate the index and slot for the next level. */ 2258 tree_slot.block_map_slot.slot = 2259 tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2260 tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2261 } 2262 2263 /* The page at this height has been allocated and loaded. */ 2264 mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]); 2265 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 2266 vdo_log_error_strerror(VDO_BAD_MAPPING, 2267 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 2268 (unsigned long long) mapping.pbn, mapping.state, 2269 lock->tree_slots[lock->height - 1].page_index, 2270 lock->height - 1); 2271 abort_load(data_vio, VDO_BAD_MAPPING); 2272 return; 2273 } 2274 2275 if (!vdo_is_mapped_location(&mapping)) { 2276 /* The page we want one level down has not been allocated, so allocate it. */ 2277 allocate_block_map_page(zone, data_vio); 2278 return; 2279 } 2280 2281 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 2282 if (lock->height == 1) { 2283 /* This is the ultimate block map page, so we're done */ 2284 finish_lookup(data_vio, VDO_SUCCESS); 2285 return; 2286 } 2287 2288 /* We know what page we need to load. */ 2289 load_block_map_page(zone, data_vio); 2290 } 2291 2292 /* 2293 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree 2294 * pages have been loaded, otherwise, it may give the wrong answer (0). 2295 */ 2296 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map, 2297 page_number_t page_number) 2298 { 2299 struct data_location mapping; 2300 struct tree_page *tree_page; 2301 struct block_map_page *page; 2302 root_count_t root_index = page_number % map->root_count; 2303 page_number_t page_index = page_number / map->root_count; 2304 slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2305 2306 page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2307 2308 tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index); 2309 page = (struct block_map_page *) tree_page->page_buffer; 2310 if (!page->header.initialized) 2311 return VDO_ZERO_BLOCK; 2312 2313 mapping = vdo_unpack_block_map_entry(&page->entries[slot]); 2314 if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state)) 2315 return VDO_ZERO_BLOCK; 2316 return mapping.pbn; 2317 } 2318 2319 /* 2320 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This 2321 * method is used when correcting errors in the tree during read-only rebuild. 2322 */ 2323 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone) 2324 { 2325 bool waiting = vdo_waiter_is_waiting(&page->waiter); 2326 2327 if (waiting && (zone->flusher == page)) 2328 return; 2329 2330 set_generation(zone, page, zone->generation); 2331 if (waiting || page->writing) 2332 return; 2333 2334 enqueue_page(page, zone); 2335 } 2336 2337 static int make_segment(struct forest *old_forest, block_count_t new_pages, 2338 struct boundary *new_boundary, struct forest *forest) 2339 { 2340 size_t index = (old_forest == NULL) ? 0 : old_forest->segments; 2341 struct tree_page *page_ptr; 2342 page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT]; 2343 height_t height; 2344 root_count_t root; 2345 int result; 2346 2347 forest->segments = index + 1; 2348 2349 result = vdo_allocate(forest->segments, struct boundary, 2350 "forest boundary array", &forest->boundaries); 2351 if (result != VDO_SUCCESS) 2352 return result; 2353 2354 result = vdo_allocate(forest->segments, struct tree_page *, 2355 "forest page pointers", &forest->pages); 2356 if (result != VDO_SUCCESS) 2357 return result; 2358 2359 result = vdo_allocate(new_pages, struct tree_page, 2360 "new forest pages", &forest->pages[index]); 2361 if (result != VDO_SUCCESS) 2362 return result; 2363 2364 if (index > 0) { 2365 memcpy(forest->boundaries, old_forest->boundaries, 2366 index * sizeof(struct boundary)); 2367 memcpy(forest->pages, old_forest->pages, 2368 index * sizeof(struct tree_page *)); 2369 } 2370 2371 memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary)); 2372 2373 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2374 segment_sizes[height] = new_boundary->levels[height]; 2375 if (index > 0) 2376 segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height]; 2377 } 2378 2379 page_ptr = forest->pages[index]; 2380 for (root = 0; root < forest->map->root_count; root++) { 2381 struct block_map_tree_segment *segment; 2382 struct block_map_tree *tree = &(forest->trees[root]); 2383 height_t height; 2384 2385 int result = vdo_allocate(forest->segments, 2386 struct block_map_tree_segment, 2387 "tree root segments", &tree->segments); 2388 if (result != VDO_SUCCESS) 2389 return result; 2390 2391 if (index > 0) { 2392 memcpy(tree->segments, old_forest->trees[root].segments, 2393 index * sizeof(struct block_map_tree_segment)); 2394 } 2395 2396 segment = &(tree->segments[index]); 2397 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2398 if (segment_sizes[height] == 0) 2399 continue; 2400 2401 segment->levels[height] = page_ptr; 2402 if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) { 2403 /* Record the root. */ 2404 struct block_map_page *page = 2405 vdo_format_block_map_page(page_ptr->page_buffer, 2406 forest->map->nonce, 2407 VDO_INVALID_PBN, true); 2408 page->entries[0] = 2409 vdo_pack_block_map_entry(forest->map->root_origin + root, 2410 VDO_MAPPING_STATE_UNCOMPRESSED); 2411 } 2412 page_ptr += segment_sizes[height]; 2413 } 2414 } 2415 2416 return VDO_SUCCESS; 2417 } 2418 2419 static void deforest(struct forest *forest, size_t first_page_segment) 2420 { 2421 root_count_t root; 2422 2423 if (forest->pages != NULL) { 2424 size_t segment; 2425 2426 for (segment = first_page_segment; segment < forest->segments; segment++) 2427 vdo_free(forest->pages[segment]); 2428 vdo_free(forest->pages); 2429 } 2430 2431 for (root = 0; root < forest->map->root_count; root++) 2432 vdo_free(forest->trees[root].segments); 2433 2434 vdo_free(forest->boundaries); 2435 vdo_free(forest); 2436 } 2437 2438 /** 2439 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if 2440 * there is one. 2441 * @entries: The number of entries the block map will hold. 2442 * 2443 * Return: VDO_SUCCESS or an error. 2444 */ 2445 static int make_forest(struct block_map *map, block_count_t entries) 2446 { 2447 struct forest *forest, *old_forest = map->forest; 2448 struct boundary new_boundary, *old_boundary = NULL; 2449 block_count_t new_pages; 2450 int result; 2451 2452 if (old_forest != NULL) 2453 old_boundary = &(old_forest->boundaries[old_forest->segments - 1]); 2454 2455 new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary, 2456 entries, &new_boundary); 2457 if (new_pages == 0) { 2458 map->next_entry_count = entries; 2459 return VDO_SUCCESS; 2460 } 2461 2462 result = vdo_allocate_extended(struct forest, map->root_count, 2463 struct block_map_tree, __func__, 2464 &forest); 2465 if (result != VDO_SUCCESS) 2466 return result; 2467 2468 forest->map = map; 2469 result = make_segment(old_forest, new_pages, &new_boundary, forest); 2470 if (result != VDO_SUCCESS) { 2471 deforest(forest, forest->segments - 1); 2472 return result; 2473 } 2474 2475 map->next_forest = forest; 2476 map->next_entry_count = entries; 2477 return VDO_SUCCESS; 2478 } 2479 2480 /** 2481 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest. 2482 */ 2483 static void replace_forest(struct block_map *map) 2484 { 2485 if (map->next_forest != NULL) { 2486 if (map->forest != NULL) 2487 deforest(map->forest, map->forest->segments); 2488 map->forest = vdo_forget(map->next_forest); 2489 } 2490 2491 map->entry_count = map->next_entry_count; 2492 map->next_entry_count = 0; 2493 } 2494 2495 /** 2496 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the 2497 * traversal. 2498 */ 2499 static void finish_cursor(struct cursor *cursor) 2500 { 2501 struct cursors *cursors = cursor->parent; 2502 struct vdo_completion *completion = cursors->completion; 2503 2504 return_vio_to_pool(cursors->pool, vdo_forget(cursor->vio)); 2505 if (--cursors->active_roots > 0) 2506 return; 2507 2508 vdo_free(cursors); 2509 2510 vdo_finish_completion(completion); 2511 } 2512 2513 static void traverse(struct cursor *cursor); 2514 2515 /** 2516 * continue_traversal() - Continue traversing a block map tree. 2517 * @completion: The VIO doing a read or write. 2518 */ 2519 static void continue_traversal(struct vdo_completion *completion) 2520 { 2521 vio_record_metadata_io_error(as_vio(completion)); 2522 traverse(completion->parent); 2523 } 2524 2525 /** 2526 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded. 2527 * @completion: The VIO doing the read. 2528 */ 2529 static void finish_traversal_load(struct vdo_completion *completion) 2530 { 2531 struct cursor *cursor = completion->parent; 2532 height_t height = cursor->height; 2533 struct cursor_level *level = &cursor->levels[height]; 2534 struct tree_page *tree_page = 2535 &(cursor->tree->segments[0].levels[height][level->page_index]); 2536 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2537 2538 vdo_copy_valid_page(cursor->vio->vio.data, 2539 cursor->parent->zone->block_map->nonce, 2540 pbn_from_vio_bio(cursor->vio->vio.bio), page); 2541 traverse(cursor); 2542 } 2543 2544 static void traversal_endio(struct bio *bio) 2545 { 2546 struct vio *vio = bio->bi_private; 2547 struct cursor *cursor = vio->completion.parent; 2548 2549 continue_vio_after_io(vio, finish_traversal_load, 2550 cursor->parent->zone->thread_id); 2551 } 2552 2553 /** 2554 * traverse() - Traverse a single block map tree. 2555 * 2556 * This is the recursive heart of the traversal process. 2557 */ 2558 static void traverse(struct cursor *cursor) 2559 { 2560 for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) { 2561 height_t height = cursor->height; 2562 struct cursor_level *level = &cursor->levels[height]; 2563 struct tree_page *tree_page = 2564 &(cursor->tree->segments[0].levels[height][level->page_index]); 2565 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2566 2567 if (!page->header.initialized) 2568 continue; 2569 2570 for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) { 2571 struct cursor_level *next_level; 2572 page_number_t entry_index = 2573 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot; 2574 struct data_location location = 2575 vdo_unpack_block_map_entry(&page->entries[level->slot]); 2576 2577 if (!vdo_is_valid_location(&location)) { 2578 /* This entry is invalid, so remove it from the page. */ 2579 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2580 vdo_write_tree_page(tree_page, cursor->parent->zone); 2581 continue; 2582 } 2583 2584 if (!vdo_is_mapped_location(&location)) 2585 continue; 2586 2587 /* Erase mapped entries past the end of the logical space. */ 2588 if (entry_index >= cursor->boundary.levels[height]) { 2589 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2590 vdo_write_tree_page(tree_page, cursor->parent->zone); 2591 continue; 2592 } 2593 2594 if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) { 2595 int result = cursor->parent->entry_callback(location.pbn, 2596 cursor->parent->completion); 2597 if (result != VDO_SUCCESS) { 2598 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2599 vdo_write_tree_page(tree_page, cursor->parent->zone); 2600 continue; 2601 } 2602 } 2603 2604 if (cursor->height == 0) 2605 continue; 2606 2607 cursor->height--; 2608 next_level = &cursor->levels[cursor->height]; 2609 next_level->page_index = entry_index; 2610 next_level->slot = 0; 2611 level->slot++; 2612 vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn, 2613 traversal_endio, continue_traversal, 2614 REQ_OP_READ | REQ_PRIO); 2615 return; 2616 } 2617 } 2618 2619 finish_cursor(cursor); 2620 } 2621 2622 /** 2623 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with 2624 * which to load pages. 2625 * @context: The pooled_vio just acquired. 2626 * 2627 * Implements waiter_callback_fn. 2628 */ 2629 static void launch_cursor(struct vdo_waiter *waiter, void *context) 2630 { 2631 struct cursor *cursor = container_of(waiter, struct cursor, waiter); 2632 struct pooled_vio *pooled = context; 2633 2634 cursor->vio = pooled; 2635 pooled->vio.completion.parent = cursor; 2636 pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id; 2637 traverse(cursor); 2638 } 2639 2640 /** 2641 * compute_boundary() - Compute the number of pages used at each level of the given root's tree. 2642 * 2643 * Return: The list of page counts as a boundary structure. 2644 */ 2645 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index) 2646 { 2647 struct boundary boundary; 2648 height_t height; 2649 page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count); 2650 /* 2651 * Compute the leaf pages for this root. If the number of leaf pages does not distribute 2652 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to 2653 * roots starting from tree 0. 2654 */ 2655 page_count_t last_tree_root = (leaf_pages - 1) % map->root_count; 2656 page_count_t level_pages = leaf_pages / map->root_count; 2657 2658 if (root_index <= last_tree_root) 2659 level_pages++; 2660 2661 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) { 2662 boundary.levels[height] = level_pages; 2663 level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE); 2664 } 2665 2666 /* The root node always exists, even if the root is otherwise unused. */ 2667 boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1; 2668 2669 return boundary; 2670 } 2671 2672 /** 2673 * vdo_traverse_forest() - Walk the entire forest of a block map. 2674 * @callback: A function to call with the pbn of each allocated node in the forest. 2675 * @completion: The completion to notify on each traversed PBN, and when traversal completes. 2676 */ 2677 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback, 2678 struct vdo_completion *completion) 2679 { 2680 root_count_t root; 2681 struct cursors *cursors; 2682 int result; 2683 2684 result = vdo_allocate_extended(struct cursors, map->root_count, 2685 struct cursor, __func__, &cursors); 2686 if (result != VDO_SUCCESS) { 2687 vdo_fail_completion(completion, result); 2688 return; 2689 } 2690 2691 cursors->zone = &map->zones[0]; 2692 cursors->pool = cursors->zone->vio_pool; 2693 cursors->entry_callback = callback; 2694 cursors->completion = completion; 2695 cursors->active_roots = map->root_count; 2696 for (root = 0; root < map->root_count; root++) { 2697 struct cursor *cursor = &cursors->cursors[root]; 2698 2699 *cursor = (struct cursor) { 2700 .tree = &map->forest->trees[root], 2701 .height = VDO_BLOCK_MAP_TREE_HEIGHT - 1, 2702 .parent = cursors, 2703 .boundary = compute_boundary(map, root), 2704 }; 2705 2706 cursor->waiter.callback = launch_cursor; 2707 acquire_vio_from_pool(cursors->pool, &cursor->waiter); 2708 } 2709 } 2710 2711 /** 2712 * initialize_block_map_zone() - Initialize the per-zone portions of the block map. 2713 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be 2714 * written out. 2715 */ 2716 static int __must_check initialize_block_map_zone(struct block_map *map, 2717 zone_count_t zone_number, 2718 page_count_t cache_size, 2719 block_count_t maximum_age) 2720 { 2721 int result; 2722 block_count_t i; 2723 struct vdo *vdo = map->vdo; 2724 struct block_map_zone *zone = &map->zones[zone_number]; 2725 2726 BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64)); 2727 2728 zone->zone_number = zone_number; 2729 zone->thread_id = vdo->thread_config.logical_threads[zone_number]; 2730 zone->block_map = map; 2731 2732 result = vdo_allocate_extended(struct dirty_lists, maximum_age, 2733 dirty_era_t, __func__, 2734 &zone->dirty_lists); 2735 if (result != VDO_SUCCESS) 2736 return result; 2737 2738 zone->dirty_lists->maximum_age = maximum_age; 2739 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]); 2740 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]); 2741 2742 for (i = 0; i < maximum_age; i++) { 2743 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]); 2744 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]); 2745 } 2746 2747 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages); 2748 if (result != VDO_SUCCESS) 2749 return result; 2750 2751 result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 2752 zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR, 2753 VIO_PRIORITY_METADATA, zone, &zone->vio_pool); 2754 if (result != VDO_SUCCESS) 2755 return result; 2756 2757 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 2758 2759 zone->page_cache.zone = zone; 2760 zone->page_cache.vdo = vdo; 2761 zone->page_cache.page_count = cache_size / map->zone_count; 2762 zone->page_cache.stats.free_pages = zone->page_cache.page_count; 2763 2764 result = allocate_cache_components(&zone->page_cache); 2765 if (result != VDO_SUCCESS) 2766 return result; 2767 2768 /* initialize empty circular queues */ 2769 INIT_LIST_HEAD(&zone->page_cache.lru_list); 2770 INIT_LIST_HEAD(&zone->page_cache.outgoing_list); 2771 2772 return VDO_SUCCESS; 2773 } 2774 2775 /* Implements vdo_zone_thread_getter_fn */ 2776 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number) 2777 { 2778 struct block_map *map = context; 2779 2780 return map->zones[zone_number].thread_id; 2781 } 2782 2783 /* Implements vdo_action_preamble_fn */ 2784 static void prepare_for_era_advance(void *context, struct vdo_completion *parent) 2785 { 2786 struct block_map *map = context; 2787 2788 map->current_era_point = map->pending_era_point; 2789 vdo_finish_completion(parent); 2790 } 2791 2792 /* Implements vdo_zone_action_fn */ 2793 static void advance_block_map_zone_era(void *context, zone_count_t zone_number, 2794 struct vdo_completion *parent) 2795 { 2796 struct block_map *map = context; 2797 struct block_map_zone *zone = &map->zones[zone_number]; 2798 2799 update_period(zone->dirty_lists, map->current_era_point); 2800 write_expired_elements(zone); 2801 vdo_finish_completion(parent); 2802 } 2803 2804 /* 2805 * Schedule an era advance if necessary. This method should not be called directly. Rather, call 2806 * vdo_schedule_default_action() on the block map's action manager. 2807 * 2808 * Implements vdo_action_scheduler_fn. 2809 */ 2810 static bool schedule_era_advance(void *context) 2811 { 2812 struct block_map *map = context; 2813 2814 if (map->current_era_point == map->pending_era_point) 2815 return false; 2816 2817 return vdo_schedule_action(map->action_manager, prepare_for_era_advance, 2818 advance_block_map_zone_era, NULL, NULL); 2819 } 2820 2821 static void uninitialize_block_map_zone(struct block_map_zone *zone) 2822 { 2823 struct vdo_page_cache *cache = &zone->page_cache; 2824 2825 vdo_free(vdo_forget(zone->dirty_lists)); 2826 free_vio_pool(vdo_forget(zone->vio_pool)); 2827 vdo_int_map_free(vdo_forget(zone->loading_pages)); 2828 if (cache->infos != NULL) { 2829 struct page_info *info; 2830 2831 for (info = cache->infos; info < cache->infos + cache->page_count; info++) 2832 free_vio(vdo_forget(info->vio)); 2833 } 2834 2835 vdo_int_map_free(vdo_forget(cache->page_map)); 2836 vdo_free(vdo_forget(cache->infos)); 2837 vdo_free(vdo_forget(cache->pages)); 2838 } 2839 2840 void vdo_free_block_map(struct block_map *map) 2841 { 2842 zone_count_t zone; 2843 2844 if (map == NULL) 2845 return; 2846 2847 for (zone = 0; zone < map->zone_count; zone++) 2848 uninitialize_block_map_zone(&map->zones[zone]); 2849 2850 vdo_abandon_block_map_growth(map); 2851 if (map->forest != NULL) 2852 deforest(vdo_forget(map->forest), 0); 2853 vdo_free(vdo_forget(map->action_manager)); 2854 vdo_free(map); 2855 } 2856 2857 /* @journal may be NULL. */ 2858 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks, 2859 struct vdo *vdo, struct recovery_journal *journal, 2860 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age, 2861 struct block_map **map_ptr) 2862 { 2863 struct block_map *map; 2864 int result; 2865 zone_count_t zone = 0; 2866 2867 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE != 2868 ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) / 2869 sizeof(struct block_map_entry))); 2870 result = VDO_ASSERT(cache_size > 0, "block map cache size is specified"); 2871 if (result != VDO_SUCCESS) 2872 return result; 2873 2874 result = vdo_allocate_extended(struct block_map, 2875 vdo->thread_config.logical_zone_count, 2876 struct block_map_zone, __func__, &map); 2877 if (result != VDO_SUCCESS) 2878 return result; 2879 2880 map->vdo = vdo; 2881 map->root_origin = state.root_origin; 2882 map->root_count = state.root_count; 2883 map->entry_count = logical_blocks; 2884 map->journal = journal; 2885 map->nonce = nonce; 2886 2887 result = make_forest(map, map->entry_count); 2888 if (result != VDO_SUCCESS) { 2889 vdo_free_block_map(map); 2890 return result; 2891 } 2892 2893 replace_forest(map); 2894 2895 map->zone_count = vdo->thread_config.logical_zone_count; 2896 for (zone = 0; zone < map->zone_count; zone++) { 2897 result = initialize_block_map_zone(map, zone, cache_size, maximum_age); 2898 if (result != VDO_SUCCESS) { 2899 vdo_free_block_map(map); 2900 return result; 2901 } 2902 } 2903 2904 result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id, 2905 vdo_get_recovery_journal_thread_id(journal), 2906 map, schedule_era_advance, vdo, 2907 &map->action_manager); 2908 if (result != VDO_SUCCESS) { 2909 vdo_free_block_map(map); 2910 return result; 2911 } 2912 2913 *map_ptr = map; 2914 return VDO_SUCCESS; 2915 } 2916 2917 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map) 2918 { 2919 return (struct block_map_state_2_0) { 2920 .flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN, 2921 /* This is the flat page count, which has turned out to always be 0. */ 2922 .flat_page_count = 0, 2923 .root_origin = map->root_origin, 2924 .root_count = map->root_count, 2925 }; 2926 } 2927 2928 /* The block map needs to know the journals' sequence number to initialize the eras. */ 2929 void vdo_initialize_block_map_from_journal(struct block_map *map, 2930 struct recovery_journal *journal) 2931 { 2932 zone_count_t z = 0; 2933 2934 map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal); 2935 map->pending_era_point = map->current_era_point; 2936 2937 for (z = 0; z < map->zone_count; z++) { 2938 struct dirty_lists *dirty_lists = map->zones[z].dirty_lists; 2939 2940 VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set"); 2941 dirty_lists->oldest_period = map->current_era_point; 2942 dirty_lists->next_period = map->current_era_point + 1; 2943 dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age; 2944 } 2945 } 2946 2947 /* Compute the logical zone for the LBN of a data vio. */ 2948 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio) 2949 { 2950 struct block_map *map = vdo_from_data_vio(data_vio)->block_map; 2951 struct tree_lock *tree_lock = &data_vio->tree_lock; 2952 page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2953 2954 tree_lock->tree_slots[0].page_index = page_number; 2955 tree_lock->root_index = page_number % map->root_count; 2956 return (tree_lock->root_index % map->zone_count); 2957 } 2958 2959 void vdo_advance_block_map_era(struct block_map *map, 2960 sequence_number_t recovery_block_number) 2961 { 2962 if (map == NULL) 2963 return; 2964 2965 map->pending_era_point = recovery_block_number; 2966 vdo_schedule_default_action(map->action_manager); 2967 } 2968 2969 /* Implements vdo_admin_initiator_fn */ 2970 static void initiate_drain(struct admin_state *state) 2971 { 2972 struct block_map_zone *zone = container_of(state, struct block_map_zone, state); 2973 2974 VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0), 2975 "%s() called with no active lookups", __func__); 2976 2977 if (!vdo_is_state_suspending(state)) { 2978 while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period) 2979 expire_oldest_list(zone->dirty_lists); 2980 write_expired_elements(zone); 2981 } 2982 2983 check_for_drain_complete(zone); 2984 } 2985 2986 /* Implements vdo_zone_action_fn. */ 2987 static void drain_zone(void *context, zone_count_t zone_number, 2988 struct vdo_completion *parent) 2989 { 2990 struct block_map *map = context; 2991 struct block_map_zone *zone = &map->zones[zone_number]; 2992 2993 vdo_start_draining(&zone->state, 2994 vdo_get_current_manager_operation(map->action_manager), 2995 parent, initiate_drain); 2996 } 2997 2998 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation, 2999 struct vdo_completion *parent) 3000 { 3001 vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL, 3002 parent); 3003 } 3004 3005 /* Implements vdo_zone_action_fn. */ 3006 static void resume_block_map_zone(void *context, zone_count_t zone_number, 3007 struct vdo_completion *parent) 3008 { 3009 struct block_map *map = context; 3010 struct block_map_zone *zone = &map->zones[zone_number]; 3011 3012 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state)); 3013 } 3014 3015 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent) 3016 { 3017 vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING, 3018 NULL, resume_block_map_zone, NULL, parent); 3019 } 3020 3021 /* Allocate an expanded collection of trees, for a future growth. */ 3022 int vdo_prepare_to_grow_block_map(struct block_map *map, 3023 block_count_t new_logical_blocks) 3024 { 3025 if (map->next_entry_count == new_logical_blocks) 3026 return VDO_SUCCESS; 3027 3028 if (map->next_entry_count > 0) 3029 vdo_abandon_block_map_growth(map); 3030 3031 if (new_logical_blocks < map->entry_count) { 3032 map->next_entry_count = map->entry_count; 3033 return VDO_SUCCESS; 3034 } 3035 3036 return make_forest(map, new_logical_blocks); 3037 } 3038 3039 /* Implements vdo_action_preamble_fn */ 3040 static void grow_forest(void *context, struct vdo_completion *completion) 3041 { 3042 replace_forest(context); 3043 vdo_finish_completion(completion); 3044 } 3045 3046 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */ 3047 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent) 3048 { 3049 vdo_schedule_operation(map->action_manager, 3050 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 3051 grow_forest, NULL, NULL, parent); 3052 } 3053 3054 void vdo_abandon_block_map_growth(struct block_map *map) 3055 { 3056 struct forest *forest = vdo_forget(map->next_forest); 3057 3058 if (forest != NULL) 3059 deforest(forest, forest->segments - 1); 3060 3061 map->next_entry_count = 0; 3062 } 3063 3064 /* Release the page completion and then continue the requester. */ 3065 static inline void finish_processing_page(struct vdo_completion *completion, int result) 3066 { 3067 struct vdo_completion *parent = completion->parent; 3068 3069 vdo_release_page_completion(completion); 3070 vdo_continue_completion(parent, result); 3071 } 3072 3073 static void handle_page_error(struct vdo_completion *completion) 3074 { 3075 finish_processing_page(completion, completion->result); 3076 } 3077 3078 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */ 3079 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable, 3080 vdo_action_fn action) 3081 { 3082 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3083 3084 if (vdo_is_state_draining(&zone->state)) { 3085 continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN); 3086 return; 3087 } 3088 3089 vdo_get_page(&data_vio->page_completion, zone, 3090 data_vio->tree_lock.tree_slots[0].block_map_slot.pbn, 3091 modifiable, &data_vio->vio.completion, 3092 action, handle_page_error, false); 3093 } 3094 3095 /** 3096 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped. 3097 * 3098 * This indicates the block map entry for the logical block is either unmapped or corrupted. 3099 */ 3100 static void clear_mapped_location(struct data_vio *data_vio) 3101 { 3102 data_vio->mapped = (struct zoned_pbn) { 3103 .state = VDO_MAPPING_STATE_UNMAPPED, 3104 }; 3105 } 3106 3107 /** 3108 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a 3109 * data_vio. 3110 * 3111 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any 3112 * other failure 3113 */ 3114 static int __must_check set_mapped_location(struct data_vio *data_vio, 3115 const struct block_map_entry *entry) 3116 { 3117 /* Unpack the PBN for logging purposes even if the entry is invalid. */ 3118 struct data_location mapped = vdo_unpack_block_map_entry(entry); 3119 3120 if (vdo_is_valid_location(&mapped)) { 3121 int result; 3122 3123 result = vdo_get_physical_zone(vdo_from_data_vio(data_vio), 3124 mapped.pbn, &data_vio->mapped.zone); 3125 if (result == VDO_SUCCESS) { 3126 data_vio->mapped.pbn = mapped.pbn; 3127 data_vio->mapped.state = mapped.state; 3128 return VDO_SUCCESS; 3129 } 3130 3131 /* 3132 * Return all errors not specifically known to be errors from validating the 3133 * location. 3134 */ 3135 if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING)) 3136 return result; 3137 } 3138 3139 /* 3140 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases 3141 * to VDO_BAD_MAPPING. 3142 */ 3143 vdo_log_error_strerror(VDO_BAD_MAPPING, 3144 "PBN %llu with state %u read from the block map was invalid", 3145 (unsigned long long) mapped.pbn, mapped.state); 3146 3147 /* 3148 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding 3149 * known data loss. 3150 */ 3151 if (!data_vio->write) 3152 return VDO_BAD_MAPPING; 3153 3154 /* 3155 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped 3156 * entry rather than fail the write. 3157 */ 3158 clear_mapped_location(data_vio); 3159 return VDO_SUCCESS; 3160 } 3161 3162 /* This callback is registered in vdo_get_mapped_block(). */ 3163 static void get_mapping_from_fetched_page(struct vdo_completion *completion) 3164 { 3165 int result; 3166 struct vdo_page_completion *vpc = as_vdo_page_completion(completion); 3167 const struct block_map_page *page; 3168 const struct block_map_entry *entry; 3169 struct data_vio *data_vio = as_data_vio(completion->parent); 3170 struct block_map_tree_slot *tree_slot; 3171 3172 if (completion->result != VDO_SUCCESS) { 3173 finish_processing_page(completion, completion->result); 3174 return; 3175 } 3176 3177 result = validate_completed_page(vpc, false); 3178 if (result != VDO_SUCCESS) { 3179 finish_processing_page(completion, result); 3180 return; 3181 } 3182 3183 page = (const struct block_map_page *) get_page_buffer(vpc->info); 3184 tree_slot = &data_vio->tree_lock.tree_slots[0]; 3185 entry = &page->entries[tree_slot->block_map_slot.slot]; 3186 3187 result = set_mapped_location(data_vio, entry); 3188 finish_processing_page(completion, result); 3189 } 3190 3191 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio, 3192 physical_block_number_t pbn, 3193 enum block_mapping_state mapping_state, 3194 sequence_number_t *recovery_lock) 3195 { 3196 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3197 struct block_map *block_map = zone->block_map; 3198 struct recovery_journal *journal = block_map->journal; 3199 sequence_number_t old_locked, new_locked; 3200 struct tree_lock *tree_lock = &data_vio->tree_lock; 3201 3202 /* Encode the new mapping. */ 3203 page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] = 3204 vdo_pack_block_map_entry(pbn, mapping_state); 3205 3206 /* Adjust references on the recovery journal blocks. */ 3207 old_locked = *recovery_lock; 3208 new_locked = data_vio->recovery_sequence_number; 3209 3210 if ((old_locked == 0) || (old_locked > new_locked)) { 3211 vdo_acquire_recovery_journal_block_reference(journal, new_locked, 3212 VDO_ZONE_TYPE_LOGICAL, 3213 zone->zone_number); 3214 3215 if (old_locked > 0) { 3216 vdo_release_recovery_journal_block_reference(journal, old_locked, 3217 VDO_ZONE_TYPE_LOGICAL, 3218 zone->zone_number); 3219 } 3220 3221 *recovery_lock = new_locked; 3222 } 3223 3224 /* 3225 * FIXME: explain this more 3226 * Release the transferred lock from the data_vio. 3227 */ 3228 vdo_release_journal_entry_lock(journal, new_locked); 3229 data_vio->recovery_sequence_number = 0; 3230 } 3231 3232 static void put_mapping_in_fetched_page(struct vdo_completion *completion) 3233 { 3234 struct data_vio *data_vio = as_data_vio(completion->parent); 3235 sequence_number_t old_lock; 3236 struct vdo_page_completion *vpc; 3237 struct page_info *info; 3238 int result; 3239 3240 if (completion->result != VDO_SUCCESS) { 3241 finish_processing_page(completion, completion->result); 3242 return; 3243 } 3244 3245 vpc = as_vdo_page_completion(completion); 3246 result = validate_completed_page(vpc, true); 3247 if (result != VDO_SUCCESS) { 3248 finish_processing_page(completion, result); 3249 return; 3250 } 3251 3252 info = vpc->info; 3253 old_lock = info->recovery_lock; 3254 vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info), 3255 data_vio, data_vio->new_mapped.pbn, 3256 data_vio->new_mapped.state, &info->recovery_lock); 3257 set_info_state(info, PS_DIRTY); 3258 add_to_dirty_lists(info->cache->zone, &info->state_entry, 3259 VDO_CACHE_PAGE, old_lock, info->recovery_lock); 3260 finish_processing_page(completion, VDO_SUCCESS); 3261 } 3262 3263 /* Read a stored block mapping into a data_vio. */ 3264 void vdo_get_mapped_block(struct data_vio *data_vio) 3265 { 3266 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 3267 /* 3268 * We know that the block map page for this LBN has not been allocated, so the 3269 * block must be unmapped. 3270 */ 3271 clear_mapped_location(data_vio); 3272 continue_data_vio(data_vio); 3273 return; 3274 } 3275 3276 fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page); 3277 } 3278 3279 /* Update a stored block mapping to reflect a data_vio's new mapping. */ 3280 void vdo_put_mapped_block(struct data_vio *data_vio) 3281 { 3282 fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page); 3283 } 3284 3285 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map) 3286 { 3287 zone_count_t zone = 0; 3288 struct block_map_statistics totals; 3289 3290 memset(&totals, 0, sizeof(struct block_map_statistics)); 3291 for (zone = 0; zone < map->zone_count; zone++) { 3292 const struct block_map_statistics *stats = 3293 &(map->zones[zone].page_cache.stats); 3294 3295 totals.dirty_pages += READ_ONCE(stats->dirty_pages); 3296 totals.clean_pages += READ_ONCE(stats->clean_pages); 3297 totals.free_pages += READ_ONCE(stats->free_pages); 3298 totals.failed_pages += READ_ONCE(stats->failed_pages); 3299 totals.incoming_pages += READ_ONCE(stats->incoming_pages); 3300 totals.outgoing_pages += READ_ONCE(stats->outgoing_pages); 3301 totals.cache_pressure += READ_ONCE(stats->cache_pressure); 3302 totals.read_count += READ_ONCE(stats->read_count); 3303 totals.write_count += READ_ONCE(stats->write_count); 3304 totals.failed_reads += READ_ONCE(stats->failed_reads); 3305 totals.failed_writes += READ_ONCE(stats->failed_writes); 3306 totals.reclaimed += READ_ONCE(stats->reclaimed); 3307 totals.read_outgoing += READ_ONCE(stats->read_outgoing); 3308 totals.found_in_cache += READ_ONCE(stats->found_in_cache); 3309 totals.discard_required += READ_ONCE(stats->discard_required); 3310 totals.wait_for_page += READ_ONCE(stats->wait_for_page); 3311 totals.fetch_required += READ_ONCE(stats->fetch_required); 3312 totals.pages_loaded += READ_ONCE(stats->pages_loaded); 3313 totals.pages_saved += READ_ONCE(stats->pages_saved); 3314 totals.flush_count += READ_ONCE(stats->flush_count); 3315 } 3316 3317 return totals; 3318 } 3319