1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "block-map.h" 7 8 #include <linux/bio.h> 9 #include <linux/ratelimit.h> 10 11 #include "errors.h" 12 #include "logger.h" 13 #include "memory-alloc.h" 14 #include "permassert.h" 15 16 #include "action-manager.h" 17 #include "admin-state.h" 18 #include "completion.h" 19 #include "constants.h" 20 #include "data-vio.h" 21 #include "encodings.h" 22 #include "io-submitter.h" 23 #include "physical-zone.h" 24 #include "recovery-journal.h" 25 #include "slab-depot.h" 26 #include "status-codes.h" 27 #include "types.h" 28 #include "vdo.h" 29 #include "vio.h" 30 #include "wait-queue.h" 31 32 /** 33 * DOC: Block map eras 34 * 35 * The block map era, or maximum age, is used as follows: 36 * 37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of 38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every 39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras 40 * according to the sequence number they record. 41 * 42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In 43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in 44 * this era are issued for write. In all older eras, pages are issued for write immediately. 45 */ 46 47 struct page_descriptor { 48 root_count_t root_index; 49 height_t height; 50 page_number_t page_index; 51 slot_number_t slot; 52 } __packed; 53 54 union page_key { 55 struct page_descriptor descriptor; 56 u64 key; 57 }; 58 59 struct write_if_not_dirtied_context { 60 struct block_map_zone *zone; 61 u8 generation; 62 }; 63 64 struct block_map_tree_segment { 65 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 66 }; 67 68 struct block_map_tree { 69 struct block_map_tree_segment *segments; 70 }; 71 72 struct forest { 73 struct block_map *map; 74 size_t segments; 75 struct boundary *boundaries; 76 struct tree_page **pages; 77 struct block_map_tree trees[]; 78 }; 79 80 struct cursor_level { 81 page_number_t page_index; 82 slot_number_t slot; 83 }; 84 85 struct cursors; 86 87 struct cursor { 88 struct vdo_waiter waiter; 89 struct block_map_tree *tree; 90 height_t height; 91 struct cursors *parent; 92 struct boundary boundary; 93 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT]; 94 struct pooled_vio *vio; 95 }; 96 97 struct cursors { 98 struct block_map_zone *zone; 99 struct vio_pool *pool; 100 vdo_entry_callback_fn entry_callback; 101 struct vdo_completion *completion; 102 root_count_t active_roots; 103 struct cursor cursors[]; 104 }; 105 106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF; 107 108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */ 109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF; 110 111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = { 112 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F, 113 .pbn_high_nibble = 0, 114 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX), 115 }; 116 117 #define LOG_INTERVAL 4000 118 #define DISPLAY_INTERVAL 100000 119 120 /* 121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread. 122 * Prevents any compiler shenanigans from affecting other threads reading those stats. 123 */ 124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta)) 125 126 static inline bool is_dirty(const struct page_info *info) 127 { 128 return info->state == PS_DIRTY; 129 } 130 131 static inline bool is_present(const struct page_info *info) 132 { 133 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY); 134 } 135 136 static inline bool is_in_flight(const struct page_info *info) 137 { 138 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING); 139 } 140 141 static inline bool is_incoming(const struct page_info *info) 142 { 143 return info->state == PS_INCOMING; 144 } 145 146 static inline bool is_outgoing(const struct page_info *info) 147 { 148 return info->state == PS_OUTGOING; 149 } 150 151 static inline bool is_valid(const struct page_info *info) 152 { 153 return is_present(info) || is_outgoing(info); 154 } 155 156 static char *get_page_buffer(struct page_info *info) 157 { 158 struct vdo_page_cache *cache = info->cache; 159 160 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE]; 161 } 162 163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter) 164 { 165 struct vdo_page_completion *completion; 166 167 if (waiter == NULL) 168 return NULL; 169 170 completion = container_of(waiter, struct vdo_page_completion, waiter); 171 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION); 172 return completion; 173 } 174 175 /** 176 * initialize_info() - Initialize all page info structures and put them on the free list. 177 * 178 * Return: VDO_SUCCESS or an error. 179 */ 180 static int initialize_info(struct vdo_page_cache *cache) 181 { 182 struct page_info *info; 183 184 INIT_LIST_HEAD(&cache->free_list); 185 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 186 int result; 187 188 info->cache = cache; 189 info->state = PS_FREE; 190 info->pbn = NO_PAGE; 191 192 result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP, 193 VIO_PRIORITY_METADATA, info, 194 get_page_buffer(info), &info->vio); 195 if (result != VDO_SUCCESS) 196 return result; 197 198 /* The thread ID should never change. */ 199 info->vio->completion.callback_thread_id = cache->zone->thread_id; 200 201 INIT_LIST_HEAD(&info->state_entry); 202 list_add_tail(&info->state_entry, &cache->free_list); 203 INIT_LIST_HEAD(&info->lru_entry); 204 } 205 206 return VDO_SUCCESS; 207 } 208 209 /** 210 * allocate_cache_components() - Allocate components of the cache which require their own 211 * allocation. 212 * 213 * The caller is responsible for all clean up on errors. 214 * 215 * Return: VDO_SUCCESS or an error code. 216 */ 217 static int __must_check allocate_cache_components(struct vdo_page_cache *cache) 218 { 219 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE; 220 int result; 221 222 result = vdo_allocate(cache->page_count, struct page_info, "page infos", 223 &cache->infos); 224 if (result != VDO_SUCCESS) 225 return result; 226 227 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages); 228 if (result != VDO_SUCCESS) 229 return result; 230 231 result = vdo_int_map_create(cache->page_count, &cache->page_map); 232 if (result != VDO_SUCCESS) 233 return result; 234 235 return initialize_info(cache); 236 } 237 238 /** 239 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's 240 * thread. 241 */ 242 static inline void assert_on_cache_thread(struct vdo_page_cache *cache, 243 const char *function_name) 244 { 245 thread_id_t thread_id = vdo_get_callback_thread_id(); 246 247 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id), 248 "%s() must only be called on cache thread %d, not thread %d", 249 function_name, cache->zone->thread_id, thread_id); 250 } 251 252 /** assert_io_allowed() - Assert that a page cache may issue I/O. */ 253 static inline void assert_io_allowed(struct vdo_page_cache *cache) 254 { 255 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state), 256 "VDO page cache may issue I/O"); 257 } 258 259 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */ 260 static void report_cache_pressure(struct vdo_page_cache *cache) 261 { 262 ADD_ONCE(cache->stats.cache_pressure, 1); 263 if (cache->waiter_count > cache->page_count) { 264 if ((cache->pressure_report % LOG_INTERVAL) == 0) 265 vdo_log_info("page cache pressure %u", cache->stats.cache_pressure); 266 267 if (++cache->pressure_report >= DISPLAY_INTERVAL) 268 cache->pressure_report = 0; 269 } 270 } 271 272 /** 273 * get_page_state_name() - Return the name of a page state. 274 * 275 * If the page state is invalid a static string is returned and the invalid state is logged. 276 * 277 * Return: A pointer to a static page state name. 278 */ 279 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state) 280 { 281 int result; 282 static const char * const state_names[] = { 283 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING" 284 }; 285 286 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT); 287 288 result = VDO_ASSERT(state < ARRAY_SIZE(state_names), 289 "Unknown page_state value %d", state); 290 if (result != VDO_SUCCESS) 291 return "[UNKNOWN PAGE STATE]"; 292 293 return state_names[state]; 294 } 295 296 /** 297 * update_counter() - Update the counter associated with a given state. 298 * @info: The page info to count. 299 * @delta: The delta to apply to the counter. 300 */ 301 static void update_counter(struct page_info *info, s32 delta) 302 { 303 struct block_map_statistics *stats = &info->cache->stats; 304 305 switch (info->state) { 306 case PS_FREE: 307 ADD_ONCE(stats->free_pages, delta); 308 return; 309 310 case PS_INCOMING: 311 ADD_ONCE(stats->incoming_pages, delta); 312 return; 313 314 case PS_OUTGOING: 315 ADD_ONCE(stats->outgoing_pages, delta); 316 return; 317 318 case PS_FAILED: 319 ADD_ONCE(stats->failed_pages, delta); 320 return; 321 322 case PS_RESIDENT: 323 ADD_ONCE(stats->clean_pages, delta); 324 return; 325 326 case PS_DIRTY: 327 ADD_ONCE(stats->dirty_pages, delta); 328 return; 329 330 default: 331 return; 332 } 333 } 334 335 /** update_lru() - Update the lru information for an active page. */ 336 static void update_lru(struct page_info *info) 337 { 338 if (info->cache->lru_list.prev != &info->lru_entry) 339 list_move_tail(&info->lru_entry, &info->cache->lru_list); 340 } 341 342 /** 343 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting 344 * counters. 345 */ 346 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state) 347 { 348 if (new_state == info->state) 349 return; 350 351 update_counter(info, -1); 352 info->state = new_state; 353 update_counter(info, 1); 354 355 switch (info->state) { 356 case PS_FREE: 357 case PS_FAILED: 358 list_move_tail(&info->state_entry, &info->cache->free_list); 359 return; 360 361 case PS_OUTGOING: 362 list_move_tail(&info->state_entry, &info->cache->outgoing_list); 363 return; 364 365 case PS_DIRTY: 366 return; 367 368 default: 369 list_del_init(&info->state_entry); 370 } 371 } 372 373 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */ 374 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn) 375 { 376 struct vdo_page_cache *cache = info->cache; 377 378 /* Either the new or the old page number must be NO_PAGE. */ 379 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE), 380 "Must free a page before reusing it."); 381 if (result != VDO_SUCCESS) 382 return result; 383 384 if (info->pbn != NO_PAGE) 385 vdo_int_map_remove(cache->page_map, info->pbn); 386 387 info->pbn = pbn; 388 389 if (pbn != NO_PAGE) { 390 result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL); 391 if (result != VDO_SUCCESS) 392 return result; 393 } 394 return VDO_SUCCESS; 395 } 396 397 /** reset_page_info() - Reset page info to represent an unallocated page. */ 398 static int reset_page_info(struct page_info *info) 399 { 400 int result; 401 402 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy"); 403 if (result != VDO_SUCCESS) 404 return result; 405 406 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting), 407 "VDO Page must not have waiters"); 408 if (result != VDO_SUCCESS) 409 return result; 410 411 result = set_info_pbn(info, NO_PAGE); 412 set_info_state(info, PS_FREE); 413 list_del_init(&info->lru_entry); 414 return result; 415 } 416 417 /** 418 * find_free_page() - Find a free page. 419 * 420 * Return: A pointer to the page info structure (if found), NULL otherwise. 421 */ 422 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache) 423 { 424 struct page_info *info; 425 426 info = list_first_entry_or_null(&cache->free_list, struct page_info, 427 state_entry); 428 if (info != NULL) 429 list_del_init(&info->state_entry); 430 431 return info; 432 } 433 434 /** 435 * find_page() - Find the page info (if any) associated with a given pbn. 436 * @pbn: The absolute physical block number of the page. 437 * 438 * Return: The page info for the page if available, or NULL if not. 439 */ 440 static struct page_info * __must_check find_page(struct vdo_page_cache *cache, 441 physical_block_number_t pbn) 442 { 443 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn)) 444 return cache->last_found; 445 446 cache->last_found = vdo_int_map_get(cache->page_map, pbn); 447 return cache->last_found; 448 } 449 450 /** 451 * select_lru_page() - Determine which page is least recently used. 452 * 453 * Picks the least recently used from among the non-busy entries at the front of each of the lru 454 * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely 455 * that the entries at the front are busy unless the queue is very short, but not impossible. 456 * 457 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be 458 * found. The page can be dirty or resident. 459 */ 460 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache) 461 { 462 struct page_info *info; 463 464 list_for_each_entry(info, &cache->lru_list, lru_entry) 465 if ((info->busy == 0) && !is_in_flight(info)) 466 return info; 467 468 return NULL; 469 } 470 471 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */ 472 473 /** 474 * complete_with_page() - Helper to complete the VDO Page Completion request successfully. 475 * @info: The page info representing the result page. 476 * @vdo_page_comp: The VDO page completion to complete. 477 */ 478 static void complete_with_page(struct page_info *info, 479 struct vdo_page_completion *vdo_page_comp) 480 { 481 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info); 482 483 if (!available) { 484 vdo_log_error_strerror(VDO_BAD_PAGE, 485 "Requested cache page %llu in state %s is not %s", 486 (unsigned long long) info->pbn, 487 get_page_state_name(info->state), 488 vdo_page_comp->writable ? "present" : "valid"); 489 vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE); 490 return; 491 } 492 493 vdo_page_comp->info = info; 494 vdo_page_comp->ready = true; 495 vdo_finish_completion(&vdo_page_comp->completion); 496 } 497 498 /** 499 * complete_waiter_with_error() - Complete a page completion with an error code. 500 * @waiter: The page completion, as a waiter. 501 * @result_ptr: A pointer to the error code. 502 * 503 * Implements waiter_callback_fn. 504 */ 505 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr) 506 { 507 int *result = result_ptr; 508 509 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result); 510 } 511 512 /** 513 * complete_waiter_with_page() - Complete a page completion with a page. 514 * @waiter: The page completion, as a waiter. 515 * @page_info: The page info to complete with. 516 * 517 * Implements waiter_callback_fn. 518 */ 519 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info) 520 { 521 complete_with_page(page_info, page_completion_from_waiter(waiter)); 522 } 523 524 /** 525 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result. 526 * 527 * Upon completion the waitq will be empty. 528 * 529 * Return: The number of pages distributed. 530 */ 531 static unsigned int distribute_page_over_waitq(struct page_info *info, 532 struct vdo_wait_queue *waitq) 533 { 534 size_t num_pages; 535 536 update_lru(info); 537 num_pages = vdo_waitq_num_waiters(waitq); 538 539 /* 540 * Increment the busy count once for each pending completion so that this page does not 541 * stop being busy until all completions have been processed. 542 */ 543 info->busy += num_pages; 544 545 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info); 546 return num_pages; 547 } 548 549 /** 550 * set_persistent_error() - Set a persistent error which all requests will receive in the future. 551 * @context: A string describing what triggered the error. 552 * 553 * Once triggered, all enqueued completions will get this error. Any future requests will result in 554 * this error as well. 555 */ 556 static void set_persistent_error(struct vdo_page_cache *cache, const char *context, 557 int result) 558 { 559 struct page_info *info; 560 /* If we're already read-only, there's no need to log. */ 561 struct vdo *vdo = cache->vdo; 562 563 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) { 564 vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s", 565 context); 566 vdo_enter_read_only_mode(vdo, result); 567 } 568 569 assert_on_cache_thread(cache, __func__); 570 571 vdo_waitq_notify_all_waiters(&cache->free_waiters, 572 complete_waiter_with_error, &result); 573 cache->waiter_count = 0; 574 575 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 576 vdo_waitq_notify_all_waiters(&info->waiting, 577 complete_waiter_with_error, &result); 578 } 579 } 580 581 /** 582 * validate_completed_page() - Check that a page completion which is being freed to the cache 583 * referred to a valid page and is in a valid state. 584 * @writable: Whether a writable page is required. 585 * 586 * Return: VDO_SUCCESS if the page was valid, otherwise as error 587 */ 588 static int __must_check validate_completed_page(struct vdo_page_completion *completion, 589 bool writable) 590 { 591 int result; 592 593 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready"); 594 if (result != VDO_SUCCESS) 595 return result; 596 597 result = VDO_ASSERT(completion->info != NULL, 598 "VDO Page Completion must be complete"); 599 if (result != VDO_SUCCESS) 600 return result; 601 602 result = VDO_ASSERT(completion->info->pbn == completion->pbn, 603 "VDO Page Completion pbn must be consistent"); 604 if (result != VDO_SUCCESS) 605 return result; 606 607 result = VDO_ASSERT(is_valid(completion->info), 608 "VDO Page Completion page must be valid"); 609 if (result != VDO_SUCCESS) 610 return result; 611 612 if (writable) { 613 result = VDO_ASSERT(completion->writable, 614 "VDO Page Completion must be writable"); 615 if (result != VDO_SUCCESS) 616 return result; 617 } 618 619 return VDO_SUCCESS; 620 } 621 622 static void check_for_drain_complete(struct block_map_zone *zone) 623 { 624 if (vdo_is_state_draining(&zone->state) && 625 (zone->active_lookups == 0) && 626 !vdo_waitq_has_waiters(&zone->flush_waiters) && 627 !is_vio_pool_busy(zone->vio_pool) && 628 (zone->page_cache.outstanding_reads == 0) && 629 (zone->page_cache.outstanding_writes == 0)) { 630 vdo_finish_draining_with_result(&zone->state, 631 (vdo_is_read_only(zone->block_map->vdo) ? 632 VDO_READ_ONLY : VDO_SUCCESS)); 633 } 634 } 635 636 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result) 637 { 638 vdo_enter_read_only_mode(zone->block_map->vdo, result); 639 640 /* 641 * We are in read-only mode, so we won't ever write any page out. 642 * Just take all waiters off the waitq so the zone can drain. 643 */ 644 vdo_waitq_init(&zone->flush_waiters); 645 check_for_drain_complete(zone); 646 } 647 648 static bool __must_check 649 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion, 650 bool writable) 651 { 652 int result = validate_completed_page(completion, writable); 653 654 if (result == VDO_SUCCESS) 655 return true; 656 657 enter_zone_read_only_mode(completion->info->cache->zone, result); 658 return false; 659 } 660 661 /** 662 * handle_load_error() - Handle page load errors. 663 * @completion: The page read vio. 664 */ 665 static void handle_load_error(struct vdo_completion *completion) 666 { 667 int result = completion->result; 668 struct page_info *info = completion->parent; 669 struct vdo_page_cache *cache = info->cache; 670 671 assert_on_cache_thread(cache, __func__); 672 vio_record_metadata_io_error(as_vio(completion)); 673 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result); 674 ADD_ONCE(cache->stats.failed_reads, 1); 675 set_info_state(info, PS_FAILED); 676 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result); 677 reset_page_info(info); 678 679 /* 680 * Don't decrement until right before calling check_for_drain_complete() to 681 * ensure that the above work can't cause the page cache to be freed out from under us. 682 */ 683 cache->outstanding_reads--; 684 check_for_drain_complete(cache->zone); 685 } 686 687 /** 688 * page_is_loaded() - Callback used when a page has been loaded. 689 * @completion: The vio which has loaded the page. Its parent is the page_info. 690 */ 691 static void page_is_loaded(struct vdo_completion *completion) 692 { 693 struct page_info *info = completion->parent; 694 struct vdo_page_cache *cache = info->cache; 695 nonce_t nonce = info->cache->zone->block_map->nonce; 696 struct block_map_page *page; 697 enum block_map_page_validity validity; 698 699 assert_on_cache_thread(cache, __func__); 700 701 page = (struct block_map_page *) get_page_buffer(info); 702 validity = vdo_validate_block_map_page(page, nonce, info->pbn); 703 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 704 physical_block_number_t pbn = vdo_get_block_map_page_pbn(page); 705 int result = vdo_log_error_strerror(VDO_BAD_PAGE, 706 "Expected page %llu but got page %llu instead", 707 (unsigned long long) info->pbn, 708 (unsigned long long) pbn); 709 710 vdo_continue_completion(completion, result); 711 return; 712 } 713 714 if (validity == VDO_BLOCK_MAP_PAGE_INVALID) 715 vdo_format_block_map_page(page, nonce, info->pbn, false); 716 717 info->recovery_lock = 0; 718 set_info_state(info, PS_RESIDENT); 719 distribute_page_over_waitq(info, &info->waiting); 720 721 /* 722 * Don't decrement until right before calling check_for_drain_complete() to 723 * ensure that the above work can't cause the page cache to be freed out from under us. 724 */ 725 cache->outstanding_reads--; 726 check_for_drain_complete(cache->zone); 727 } 728 729 /** 730 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild. 731 * @completion: The page load completion. 732 */ 733 static void handle_rebuild_read_error(struct vdo_completion *completion) 734 { 735 struct page_info *info = completion->parent; 736 struct vdo_page_cache *cache = info->cache; 737 738 assert_on_cache_thread(cache, __func__); 739 740 /* 741 * We are doing a read-only rebuild, so treat this as a successful read 742 * of an uninitialized page. 743 */ 744 vio_record_metadata_io_error(as_vio(completion)); 745 ADD_ONCE(cache->stats.failed_reads, 1); 746 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE); 747 vdo_reset_completion(completion); 748 page_is_loaded(completion); 749 } 750 751 static void load_cache_page_endio(struct bio *bio) 752 { 753 struct vio *vio = bio->bi_private; 754 struct page_info *info = vio->completion.parent; 755 756 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id); 757 } 758 759 /** 760 * launch_page_load() - Begin the process of loading a page. 761 * 762 * Return: VDO_SUCCESS or an error code. 763 */ 764 static int __must_check launch_page_load(struct page_info *info, 765 physical_block_number_t pbn) 766 { 767 int result; 768 vdo_action_fn callback; 769 struct vdo_page_cache *cache = info->cache; 770 771 assert_io_allowed(cache); 772 773 result = set_info_pbn(info, pbn); 774 if (result != VDO_SUCCESS) 775 return result; 776 777 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading."); 778 if (result != VDO_SUCCESS) 779 return result; 780 781 set_info_state(info, PS_INCOMING); 782 cache->outstanding_reads++; 783 ADD_ONCE(cache->stats.pages_loaded, 1); 784 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error); 785 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio, 786 callback, REQ_OP_READ | REQ_PRIO); 787 return VDO_SUCCESS; 788 } 789 790 static void write_pages(struct vdo_completion *completion); 791 792 /** handle_flush_error() - Handle errors flushing the layer. */ 793 static void handle_flush_error(struct vdo_completion *completion) 794 { 795 struct page_info *info = completion->parent; 796 797 vio_record_metadata_io_error(as_vio(completion)); 798 set_persistent_error(info->cache, "flush failed", completion->result); 799 write_pages(completion); 800 } 801 802 static void flush_endio(struct bio *bio) 803 { 804 struct vio *vio = bio->bi_private; 805 struct page_info *info = vio->completion.parent; 806 807 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id); 808 } 809 810 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */ 811 static void save_pages(struct vdo_page_cache *cache) 812 { 813 struct page_info *info; 814 struct vio *vio; 815 816 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0)) 817 return; 818 819 assert_io_allowed(cache); 820 821 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry); 822 823 cache->pages_in_flush = cache->pages_to_flush; 824 cache->pages_to_flush = 0; 825 ADD_ONCE(cache->stats.flush_count, 1); 826 827 vio = info->vio; 828 829 /* 830 * We must make sure that the recovery journal entries that changed these pages were 831 * successfully persisted, and thus must issue a flush before each batch of pages is 832 * written to ensure this. 833 */ 834 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 835 } 836 837 /** 838 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved. 839 * 840 * Once in the list, a page may not be used until it has been written out. 841 */ 842 static void schedule_page_save(struct page_info *info) 843 { 844 if (info->busy > 0) { 845 info->write_status = WRITE_STATUS_DEFERRED; 846 return; 847 } 848 849 info->cache->pages_to_flush++; 850 info->cache->outstanding_writes++; 851 set_info_state(info, PS_OUTGOING); 852 } 853 854 /** 855 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving 856 * pages if another save is not in progress. 857 */ 858 static void launch_page_save(struct page_info *info) 859 { 860 schedule_page_save(info); 861 save_pages(info->cache); 862 } 863 864 /** 865 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is 866 * requesting a given page number. 867 * @context: A pointer to the pbn of the desired page. 868 * 869 * Implements waiter_match_fn. 870 * 871 * Return: true if the page completion is for the desired page number. 872 */ 873 static bool completion_needs_page(struct vdo_waiter *waiter, void *context) 874 { 875 physical_block_number_t *pbn = context; 876 877 return (page_completion_from_waiter(waiter)->pbn == *pbn); 878 } 879 880 /** 881 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and 882 * any other completions that match it in page number. 883 */ 884 static void allocate_free_page(struct page_info *info) 885 { 886 int result; 887 struct vdo_waiter *oldest_waiter; 888 physical_block_number_t pbn; 889 struct vdo_page_cache *cache = info->cache; 890 891 assert_on_cache_thread(cache, __func__); 892 893 if (!vdo_waitq_has_waiters(&cache->free_waiters)) { 894 if (cache->stats.cache_pressure > 0) { 895 vdo_log_info("page cache pressure relieved"); 896 WRITE_ONCE(cache->stats.cache_pressure, 0); 897 } 898 899 return; 900 } 901 902 result = reset_page_info(info); 903 if (result != VDO_SUCCESS) { 904 set_persistent_error(cache, "cannot reset page info", result); 905 return; 906 } 907 908 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters); 909 pbn = page_completion_from_waiter(oldest_waiter)->pbn; 910 911 /* 912 * Remove all entries which match the page number in question and push them onto the page 913 * info's waitq. 914 */ 915 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page, 916 &pbn, &info->waiting); 917 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting); 918 919 result = launch_page_load(info, pbn); 920 if (result != VDO_SUCCESS) { 921 vdo_waitq_notify_all_waiters(&info->waiting, 922 complete_waiter_with_error, &result); 923 } 924 } 925 926 /** 927 * discard_a_page() - Begin the process of discarding a page. 928 * 929 * If no page is discardable, increments a count of deferred frees so that the next release of a 930 * page which is no longer busy will kick off another discard cycle. This is an indication that the 931 * cache is not big enough. 932 * 933 * If the selected page is not dirty, immediately allocates the page to the oldest completion 934 * waiting for a free page. 935 */ 936 static void discard_a_page(struct vdo_page_cache *cache) 937 { 938 struct page_info *info = select_lru_page(cache); 939 940 if (info == NULL) { 941 report_cache_pressure(cache); 942 return; 943 } 944 945 if (!is_dirty(info)) { 946 allocate_free_page(info); 947 return; 948 } 949 950 VDO_ASSERT_LOG_ONLY(!is_in_flight(info), 951 "page selected for discard is not in flight"); 952 953 cache->discard_count++; 954 info->write_status = WRITE_STATUS_DISCARD; 955 launch_page_save(info); 956 } 957 958 /** 959 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get 960 * a different page. 961 */ 962 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp) 963 { 964 struct vdo_page_cache *cache = vdo_page_comp->cache; 965 966 cache->waiter_count++; 967 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter); 968 discard_a_page(cache); 969 } 970 971 /** 972 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free 973 * page. 974 * @cache: The page cache. 975 */ 976 static void discard_page_if_needed(struct vdo_page_cache *cache) 977 { 978 if (cache->waiter_count > cache->discard_count) 979 discard_a_page(cache); 980 } 981 982 /** 983 * write_has_finished() - Inform the cache that a write has finished (possibly with an error). 984 * @info: The info structure for the page whose write just completed. 985 * 986 * Return: true if the page write was a discard. 987 */ 988 static bool write_has_finished(struct page_info *info) 989 { 990 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD); 991 992 assert_on_cache_thread(info->cache, __func__); 993 info->cache->outstanding_writes--; 994 995 info->write_status = WRITE_STATUS_NORMAL; 996 return was_discard; 997 } 998 999 /** 1000 * handle_page_write_error() - Handler for page write errors. 1001 * @completion: The page write vio. 1002 */ 1003 static void handle_page_write_error(struct vdo_completion *completion) 1004 { 1005 int result = completion->result; 1006 struct page_info *info = completion->parent; 1007 struct vdo_page_cache *cache = info->cache; 1008 1009 vio_record_metadata_io_error(as_vio(completion)); 1010 1011 /* If we're already read-only, write failures are to be expected. */ 1012 if (result != VDO_READ_ONLY) { 1013 vdo_log_ratelimit(vdo_log_error, 1014 "failed to write block map page %llu", 1015 (unsigned long long) info->pbn); 1016 } 1017 1018 set_info_state(info, PS_DIRTY); 1019 ADD_ONCE(cache->stats.failed_writes, 1); 1020 set_persistent_error(cache, "cannot write page", result); 1021 1022 if (!write_has_finished(info)) 1023 discard_page_if_needed(cache); 1024 1025 check_for_drain_complete(cache->zone); 1026 } 1027 1028 static void page_is_written_out(struct vdo_completion *completion); 1029 1030 static void write_cache_page_endio(struct bio *bio) 1031 { 1032 struct vio *vio = bio->bi_private; 1033 struct page_info *info = vio->completion.parent; 1034 1035 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id); 1036 } 1037 1038 /** 1039 * page_is_written_out() - Callback used when a page has been written out. 1040 * @completion: The vio which wrote the page. Its parent is a page_info. 1041 */ 1042 static void page_is_written_out(struct vdo_completion *completion) 1043 { 1044 bool was_discard, reclaimed; 1045 u32 reclamations; 1046 struct page_info *info = completion->parent; 1047 struct vdo_page_cache *cache = info->cache; 1048 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info); 1049 1050 if (!page->header.initialized) { 1051 page->header.initialized = true; 1052 vdo_submit_metadata_vio(info->vio, info->pbn, 1053 write_cache_page_endio, 1054 handle_page_write_error, 1055 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH); 1056 return; 1057 } 1058 1059 /* Handle journal updates and torn write protection. */ 1060 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal, 1061 info->recovery_lock, 1062 VDO_ZONE_TYPE_LOGICAL, 1063 cache->zone->zone_number); 1064 info->recovery_lock = 0; 1065 was_discard = write_has_finished(info); 1066 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting)); 1067 1068 set_info_state(info, PS_RESIDENT); 1069 1070 reclamations = distribute_page_over_waitq(info, &info->waiting); 1071 ADD_ONCE(cache->stats.reclaimed, reclamations); 1072 1073 if (was_discard) 1074 cache->discard_count--; 1075 1076 if (reclaimed) 1077 discard_page_if_needed(cache); 1078 else 1079 allocate_free_page(info); 1080 1081 check_for_drain_complete(cache->zone); 1082 } 1083 1084 /** 1085 * write_pages() - Write the batch of pages which were covered by the layer flush which just 1086 * completed. 1087 * @flush_completion: The flush vio. 1088 * 1089 * This callback is registered in save_pages(). 1090 */ 1091 static void write_pages(struct vdo_completion *flush_completion) 1092 { 1093 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache; 1094 1095 /* 1096 * We need to cache these two values on the stack since it is possible for the last 1097 * page info to cause the page cache to get freed. Hence once we launch the last page, 1098 * it may be unsafe to dereference the cache. 1099 */ 1100 bool has_unflushed_pages = (cache->pages_to_flush > 0); 1101 page_count_t pages_in_flush = cache->pages_in_flush; 1102 1103 cache->pages_in_flush = 0; 1104 while (pages_in_flush-- > 0) { 1105 struct page_info *info = 1106 list_first_entry(&cache->outgoing_list, struct page_info, 1107 state_entry); 1108 1109 list_del_init(&info->state_entry); 1110 if (vdo_is_read_only(info->cache->vdo)) { 1111 struct vdo_completion *completion = &info->vio->completion; 1112 1113 vdo_reset_completion(completion); 1114 completion->callback = page_is_written_out; 1115 completion->error_handler = handle_page_write_error; 1116 vdo_fail_completion(completion, VDO_READ_ONLY); 1117 continue; 1118 } 1119 ADD_ONCE(info->cache->stats.pages_saved, 1); 1120 vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio, 1121 handle_page_write_error, REQ_OP_WRITE | REQ_PRIO); 1122 } 1123 1124 if (has_unflushed_pages) { 1125 /* 1126 * If there are unflushed pages, the cache can't have been freed, so this call is 1127 * safe. 1128 */ 1129 save_pages(cache); 1130 } 1131 } 1132 1133 /** 1134 * vdo_release_page_completion() - Release a VDO Page Completion. 1135 * 1136 * The page referenced by this completion (if any) will no longer be held busy by this completion. 1137 * If a page becomes discardable and there are completions awaiting free pages then a new round of 1138 * page discarding is started. 1139 */ 1140 void vdo_release_page_completion(struct vdo_completion *completion) 1141 { 1142 struct page_info *discard_info = NULL; 1143 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion); 1144 struct vdo_page_cache *cache; 1145 1146 if (completion->result == VDO_SUCCESS) { 1147 if (!validate_completed_page_or_enter_read_only_mode(page_completion, false)) 1148 return; 1149 1150 if (--page_completion->info->busy == 0) 1151 discard_info = page_completion->info; 1152 } 1153 1154 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1155 "Page being released after leaving all queues"); 1156 1157 page_completion->info = NULL; 1158 cache = page_completion->cache; 1159 assert_on_cache_thread(cache, __func__); 1160 1161 if (discard_info != NULL) { 1162 if (discard_info->write_status == WRITE_STATUS_DEFERRED) { 1163 discard_info->write_status = WRITE_STATUS_NORMAL; 1164 launch_page_save(discard_info); 1165 } 1166 1167 /* 1168 * if there are excess requests for pages (that have not already started discards) 1169 * we need to discard some page (which may be this one) 1170 */ 1171 discard_page_if_needed(cache); 1172 } 1173 } 1174 1175 /** 1176 * load_page_for_completion() - Helper function to load a page as described by a VDO Page 1177 * Completion. 1178 */ 1179 static void load_page_for_completion(struct page_info *info, 1180 struct vdo_page_completion *vdo_page_comp) 1181 { 1182 int result; 1183 1184 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter); 1185 result = launch_page_load(info, vdo_page_comp->pbn); 1186 if (result != VDO_SUCCESS) { 1187 vdo_waitq_notify_all_waiters(&info->waiting, 1188 complete_waiter_with_error, &result); 1189 } 1190 } 1191 1192 /** 1193 * vdo_get_page() - Initialize a page completion and get a block map page. 1194 * @page_completion: The vdo_page_completion to initialize. 1195 * @zone: The block map zone of the desired page. 1196 * @pbn: The absolute physical block of the desired page. 1197 * @writable: Whether the page can be modified. 1198 * @parent: The object to notify when the fetch is complete. 1199 * @callback: The notification callback. 1200 * @error_handler: The handler for fetch errors. 1201 * @requeue: Whether we must requeue when notifying the parent. 1202 * 1203 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated 1204 * by the completion to be loaded from disk. When the callback is invoked, the page will be 1205 * resident in the cache and marked busy. All callers must call vdo_release_page_completion() 1206 * when they are done with the page to clear the busy mark. 1207 */ 1208 void vdo_get_page(struct vdo_page_completion *page_completion, 1209 struct block_map_zone *zone, physical_block_number_t pbn, 1210 bool writable, void *parent, vdo_action_fn callback, 1211 vdo_action_fn error_handler, bool requeue) 1212 { 1213 struct vdo_page_cache *cache = &zone->page_cache; 1214 struct vdo_completion *completion = &page_completion->completion; 1215 struct page_info *info; 1216 1217 assert_on_cache_thread(cache, __func__); 1218 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL), 1219 "New page completion was not already on a wait queue"); 1220 1221 *page_completion = (struct vdo_page_completion) { 1222 .pbn = pbn, 1223 .writable = writable, 1224 .cache = cache, 1225 }; 1226 1227 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION); 1228 vdo_prepare_completion(completion, callback, error_handler, 1229 cache->zone->thread_id, parent); 1230 completion->requeue = requeue; 1231 1232 if (page_completion->writable && vdo_is_read_only(cache->vdo)) { 1233 vdo_fail_completion(completion, VDO_READ_ONLY); 1234 return; 1235 } 1236 1237 if (page_completion->writable) 1238 ADD_ONCE(cache->stats.write_count, 1); 1239 else 1240 ADD_ONCE(cache->stats.read_count, 1); 1241 1242 info = find_page(cache, page_completion->pbn); 1243 if (info != NULL) { 1244 /* The page is in the cache already. */ 1245 if ((info->write_status == WRITE_STATUS_DEFERRED) || 1246 is_incoming(info) || 1247 (is_outgoing(info) && page_completion->writable)) { 1248 /* The page is unusable until it has finished I/O. */ 1249 ADD_ONCE(cache->stats.wait_for_page, 1); 1250 vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter); 1251 return; 1252 } 1253 1254 if (is_valid(info)) { 1255 /* The page is usable. */ 1256 ADD_ONCE(cache->stats.found_in_cache, 1); 1257 if (!is_present(info)) 1258 ADD_ONCE(cache->stats.read_outgoing, 1); 1259 update_lru(info); 1260 info->busy++; 1261 complete_with_page(info, page_completion); 1262 return; 1263 } 1264 1265 /* Something horrible has gone wrong. */ 1266 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state."); 1267 } 1268 1269 /* The page must be fetched. */ 1270 info = find_free_page(cache); 1271 if (info != NULL) { 1272 ADD_ONCE(cache->stats.fetch_required, 1); 1273 load_page_for_completion(info, page_completion); 1274 return; 1275 } 1276 1277 /* The page must wait for a page to be discarded. */ 1278 ADD_ONCE(cache->stats.discard_required, 1); 1279 discard_page_for_completion(page_completion); 1280 } 1281 1282 /** 1283 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy. 1284 * @completion: The vdo_page_completion containing the page. 1285 */ 1286 void vdo_request_page_write(struct vdo_completion *completion) 1287 { 1288 struct page_info *info; 1289 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion); 1290 1291 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true)) 1292 return; 1293 1294 info = vdo_page_comp->info; 1295 set_info_state(info, PS_DIRTY); 1296 launch_page_save(info); 1297 } 1298 1299 /** 1300 * vdo_get_cached_page() - Get the block map page from a page completion. 1301 * @completion: A vdo page completion whose callback has been called. 1302 * @page_ptr: A pointer to hold the page 1303 * 1304 * Return: VDO_SUCCESS or an error 1305 */ 1306 int vdo_get_cached_page(struct vdo_completion *completion, 1307 struct block_map_page **page_ptr) 1308 { 1309 int result; 1310 struct vdo_page_completion *vpc; 1311 1312 vpc = as_vdo_page_completion(completion); 1313 result = validate_completed_page(vpc, true); 1314 if (result == VDO_SUCCESS) 1315 *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info); 1316 1317 return result; 1318 } 1319 1320 /** 1321 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache. 1322 * 1323 * There must not be any dirty pages in the cache. 1324 * 1325 * Return: A success or error code. 1326 */ 1327 int vdo_invalidate_page_cache(struct vdo_page_cache *cache) 1328 { 1329 struct page_info *info; 1330 1331 assert_on_cache_thread(cache, __func__); 1332 1333 /* Make sure we don't throw away any dirty pages. */ 1334 for (info = cache->infos; info < cache->infos + cache->page_count; info++) { 1335 int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages"); 1336 1337 if (result != VDO_SUCCESS) 1338 return result; 1339 } 1340 1341 /* Reset the page map by re-allocating it. */ 1342 vdo_int_map_free(vdo_forget(cache->page_map)); 1343 return vdo_int_map_create(cache->page_count, &cache->page_map); 1344 } 1345 1346 /** 1347 * get_tree_page_by_index() - Get the tree page for a given height and page index. 1348 * 1349 * Return: The requested page. 1350 */ 1351 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest, 1352 root_count_t root_index, 1353 height_t height, 1354 page_number_t page_index) 1355 { 1356 page_number_t offset = 0; 1357 size_t segment; 1358 1359 for (segment = 0; segment < forest->segments; segment++) { 1360 page_number_t border = forest->boundaries[segment].levels[height - 1]; 1361 1362 if (page_index < border) { 1363 struct block_map_tree *tree = &forest->trees[root_index]; 1364 1365 return &(tree->segments[segment].levels[height - 1][page_index - offset]); 1366 } 1367 1368 offset = border; 1369 } 1370 1371 return NULL; 1372 } 1373 1374 /* Get the page referred to by the lock's tree slot at its current height. */ 1375 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone, 1376 const struct tree_lock *lock) 1377 { 1378 return get_tree_page_by_index(zone->block_map->forest, lock->root_index, 1379 lock->height, 1380 lock->tree_slots[lock->height].page_index); 1381 } 1382 1383 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */ 1384 bool vdo_copy_valid_page(char *buffer, nonce_t nonce, 1385 physical_block_number_t pbn, 1386 struct block_map_page *page) 1387 { 1388 struct block_map_page *loaded = (struct block_map_page *) buffer; 1389 enum block_map_page_validity validity = 1390 vdo_validate_block_map_page(loaded, nonce, pbn); 1391 1392 if (validity == VDO_BLOCK_MAP_PAGE_VALID) { 1393 memcpy(page, loaded, VDO_BLOCK_SIZE); 1394 return true; 1395 } 1396 1397 if (validity == VDO_BLOCK_MAP_PAGE_BAD) { 1398 vdo_log_error_strerror(VDO_BAD_PAGE, 1399 "Expected page %llu but got page %llu instead", 1400 (unsigned long long) pbn, 1401 (unsigned long long) vdo_get_block_map_page_pbn(loaded)); 1402 } 1403 1404 return false; 1405 } 1406 1407 /** 1408 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within 1409 * a cyclic range of values from 0 to (modulus - 1). 1410 * @lower: The lowest value to accept. 1411 * @value: The value to check. 1412 * @upper: The highest value to accept. 1413 * @modulus: The size of the cyclic space, no more than 2^15. 1414 * 1415 * The value and both bounds must be smaller than the modulus. 1416 * 1417 * Return: true if the value is in range. 1418 */ 1419 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus) 1420 { 1421 if (value < lower) 1422 value += modulus; 1423 if (upper < lower) 1424 upper += modulus; 1425 return (value <= upper); 1426 } 1427 1428 /** 1429 * is_not_older() - Check whether a generation is strictly older than some other generation in the 1430 * context of a zone's current generation range. 1431 * @zone: The zone in which to do the comparison. 1432 * @a: The generation in question. 1433 * @b: The generation to compare to. 1434 * 1435 * Return: true if generation @a is not strictly older than generation @b in the context of @zone 1436 */ 1437 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b) 1438 { 1439 int result; 1440 1441 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) && 1442 in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)), 1443 "generation(s) %u, %u are out of range [%u, %u]", 1444 a, b, zone->oldest_generation, zone->generation); 1445 if (result != VDO_SUCCESS) { 1446 enter_zone_read_only_mode(zone, result); 1447 return true; 1448 } 1449 1450 return in_cyclic_range(b, a, zone->generation, 1 << 8); 1451 } 1452 1453 static void release_generation(struct block_map_zone *zone, u8 generation) 1454 { 1455 int result; 1456 1457 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0), 1458 "dirty page count underflow for generation %u", generation); 1459 if (result != VDO_SUCCESS) { 1460 enter_zone_read_only_mode(zone, result); 1461 return; 1462 } 1463 1464 zone->dirty_page_counts[generation]--; 1465 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) && 1466 (zone->oldest_generation != zone->generation)) 1467 zone->oldest_generation++; 1468 } 1469 1470 static void set_generation(struct block_map_zone *zone, struct tree_page *page, 1471 u8 new_generation) 1472 { 1473 u32 new_count; 1474 int result; 1475 bool decrement_old = vdo_waiter_is_waiting(&page->waiter); 1476 u8 old_generation = page->generation; 1477 1478 if (decrement_old && (old_generation == new_generation)) 1479 return; 1480 1481 page->generation = new_generation; 1482 new_count = ++zone->dirty_page_counts[new_generation]; 1483 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u", 1484 new_generation); 1485 if (result != VDO_SUCCESS) { 1486 enter_zone_read_only_mode(zone, result); 1487 return; 1488 } 1489 1490 if (decrement_old) 1491 release_generation(zone, old_generation); 1492 } 1493 1494 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio); 1495 1496 /* Implements waiter_callback_fn */ 1497 static void write_page_callback(struct vdo_waiter *waiter, void *context) 1498 { 1499 write_page(container_of(waiter, struct tree_page, waiter), context); 1500 } 1501 1502 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone) 1503 { 1504 waiter->callback = write_page_callback; 1505 acquire_vio_from_pool(zone->vio_pool, waiter); 1506 } 1507 1508 /* Return: true if all possible generations were not already active */ 1509 static bool attempt_increment(struct block_map_zone *zone) 1510 { 1511 u8 generation = zone->generation + 1; 1512 1513 if (zone->oldest_generation == generation) 1514 return false; 1515 1516 zone->generation = generation; 1517 return true; 1518 } 1519 1520 /* Launches a flush if one is not already in progress. */ 1521 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone) 1522 { 1523 if ((zone->flusher == NULL) && attempt_increment(zone)) { 1524 zone->flusher = page; 1525 acquire_vio(&page->waiter, zone); 1526 return; 1527 } 1528 1529 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter); 1530 } 1531 1532 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context) 1533 { 1534 struct tree_page *page = container_of(waiter, struct tree_page, waiter); 1535 struct write_if_not_dirtied_context *write_context = context; 1536 1537 if (page->generation == write_context->generation) { 1538 acquire_vio(waiter, write_context->zone); 1539 return; 1540 } 1541 1542 enqueue_page(page, write_context->zone); 1543 } 1544 1545 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio) 1546 { 1547 return_vio_to_pool(zone->vio_pool, vio); 1548 check_for_drain_complete(zone); 1549 } 1550 1551 /* This callback is registered in write_initialized_page(). */ 1552 static void finish_page_write(struct vdo_completion *completion) 1553 { 1554 bool dirty; 1555 struct vio *vio = as_vio(completion); 1556 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1557 struct tree_page *page = completion->parent; 1558 struct block_map_zone *zone = pooled->context; 1559 1560 vdo_release_recovery_journal_block_reference(zone->block_map->journal, 1561 page->writing_recovery_lock, 1562 VDO_ZONE_TYPE_LOGICAL, 1563 zone->zone_number); 1564 1565 dirty = (page->writing_generation != page->generation); 1566 release_generation(zone, page->writing_generation); 1567 page->writing = false; 1568 1569 if (zone->flusher == page) { 1570 struct write_if_not_dirtied_context context = { 1571 .zone = zone, 1572 .generation = page->writing_generation, 1573 }; 1574 1575 vdo_waitq_notify_all_waiters(&zone->flush_waiters, 1576 write_page_if_not_dirtied, &context); 1577 if (dirty && attempt_increment(zone)) { 1578 write_page(page, pooled); 1579 return; 1580 } 1581 1582 zone->flusher = NULL; 1583 } 1584 1585 if (dirty) { 1586 enqueue_page(page, zone); 1587 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) && 1588 attempt_increment(zone)) { 1589 zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters), 1590 struct tree_page, waiter); 1591 write_page(zone->flusher, pooled); 1592 return; 1593 } 1594 1595 return_to_pool(zone, pooled); 1596 } 1597 1598 static void handle_write_error(struct vdo_completion *completion) 1599 { 1600 int result = completion->result; 1601 struct vio *vio = as_vio(completion); 1602 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1603 struct block_map_zone *zone = pooled->context; 1604 1605 vio_record_metadata_io_error(vio); 1606 enter_zone_read_only_mode(zone, result); 1607 return_to_pool(zone, pooled); 1608 } 1609 1610 static void write_page_endio(struct bio *bio); 1611 1612 static void write_initialized_page(struct vdo_completion *completion) 1613 { 1614 struct vio *vio = as_vio(completion); 1615 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1616 struct block_map_zone *zone = pooled->context; 1617 struct tree_page *tree_page = completion->parent; 1618 struct block_map_page *page = (struct block_map_page *) vio->data; 1619 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO; 1620 1621 /* 1622 * Now that we know the page has been written at least once, mark the copy we are writing 1623 * as initialized. 1624 */ 1625 page->header.initialized = true; 1626 1627 if (zone->flusher == tree_page) 1628 operation |= REQ_PREFLUSH; 1629 1630 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page), 1631 write_page_endio, handle_write_error, 1632 operation); 1633 } 1634 1635 static void write_page_endio(struct bio *bio) 1636 { 1637 struct pooled_vio *vio = bio->bi_private; 1638 struct block_map_zone *zone = vio->context; 1639 struct block_map_page *page = (struct block_map_page *) vio->vio.data; 1640 1641 continue_vio_after_io(&vio->vio, 1642 (page->header.initialized ? 1643 finish_page_write : write_initialized_page), 1644 zone->thread_id); 1645 } 1646 1647 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio) 1648 { 1649 struct vdo_completion *completion = &vio->vio.completion; 1650 struct block_map_zone *zone = vio->context; 1651 struct block_map_page *page = vdo_as_block_map_page(tree_page); 1652 1653 if ((zone->flusher != tree_page) && 1654 is_not_older(zone, tree_page->generation, zone->generation)) { 1655 /* 1656 * This page was re-dirtied after the last flush was issued, hence we need to do 1657 * another flush. 1658 */ 1659 enqueue_page(tree_page, zone); 1660 return_to_pool(zone, vio); 1661 return; 1662 } 1663 1664 completion->parent = tree_page; 1665 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE); 1666 completion->callback_thread_id = zone->thread_id; 1667 1668 tree_page->writing = true; 1669 tree_page->writing_generation = tree_page->generation; 1670 tree_page->writing_recovery_lock = tree_page->recovery_lock; 1671 1672 /* Clear this now so that we know this page is not on any dirty list. */ 1673 tree_page->recovery_lock = 0; 1674 1675 /* 1676 * We've already copied the page into the vio which will write it, so if it was not yet 1677 * initialized, the first write will indicate that (for torn write protection). It is now 1678 * safe to mark it as initialized in memory since if the write fails, the in memory state 1679 * will become irrelevant. 1680 */ 1681 if (page->header.initialized) { 1682 write_initialized_page(completion); 1683 return; 1684 } 1685 1686 page->header.initialized = true; 1687 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page), 1688 write_page_endio, handle_write_error, 1689 REQ_OP_WRITE | REQ_PRIO); 1690 } 1691 1692 /* Release a lock on a page which was being loaded or allocated. */ 1693 static void release_page_lock(struct data_vio *data_vio, char *what) 1694 { 1695 struct block_map_zone *zone; 1696 struct tree_lock *lock_holder; 1697 struct tree_lock *lock = &data_vio->tree_lock; 1698 1699 VDO_ASSERT_LOG_ONLY(lock->locked, 1700 "release of unlocked block map page %s for key %llu in tree %u", 1701 what, (unsigned long long) lock->key, lock->root_index); 1702 1703 zone = data_vio->logical.zone->block_map_zone; 1704 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key); 1705 VDO_ASSERT_LOG_ONLY((lock_holder == lock), 1706 "block map page %s mismatch for key %llu in tree %u", 1707 what, (unsigned long long) lock->key, lock->root_index); 1708 lock->locked = false; 1709 } 1710 1711 static void finish_lookup(struct data_vio *data_vio, int result) 1712 { 1713 data_vio->tree_lock.height = 0; 1714 1715 --data_vio->logical.zone->block_map_zone->active_lookups; 1716 1717 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot); 1718 data_vio->vio.completion.error_handler = handle_data_vio_error; 1719 continue_data_vio_with_error(data_vio, result); 1720 } 1721 1722 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context) 1723 { 1724 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1725 int result = *((int *) context); 1726 1727 if (!data_vio->write) { 1728 if (result == VDO_NO_SPACE) 1729 result = VDO_SUCCESS; 1730 } else if (result != VDO_NO_SPACE) { 1731 result = VDO_READ_ONLY; 1732 } 1733 1734 finish_lookup(data_vio, result); 1735 } 1736 1737 static void abort_lookup(struct data_vio *data_vio, int result, char *what) 1738 { 1739 if (result != VDO_NO_SPACE) 1740 enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result); 1741 1742 if (data_vio->tree_lock.locked) { 1743 release_page_lock(data_vio, what); 1744 vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters, 1745 abort_lookup_for_waiter, 1746 &result); 1747 } 1748 1749 finish_lookup(data_vio, result); 1750 } 1751 1752 static void abort_load(struct data_vio *data_vio, int result) 1753 { 1754 abort_lookup(data_vio, result, "load"); 1755 } 1756 1757 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo, 1758 const struct data_location *mapping, 1759 height_t height) 1760 { 1761 if (!vdo_is_valid_location(mapping) || 1762 vdo_is_state_compressed(mapping->state) || 1763 (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK))) 1764 return true; 1765 1766 /* Roots aren't physical data blocks, so we can't check their PBNs. */ 1767 if (height == VDO_BLOCK_MAP_TREE_HEIGHT) 1768 return false; 1769 1770 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn); 1771 } 1772 1773 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio); 1774 static void allocate_block_map_page(struct block_map_zone *zone, 1775 struct data_vio *data_vio); 1776 1777 static void continue_with_loaded_page(struct data_vio *data_vio, 1778 struct block_map_page *page) 1779 { 1780 struct tree_lock *lock = &data_vio->tree_lock; 1781 struct block_map_tree_slot slot = lock->tree_slots[lock->height]; 1782 struct data_location mapping = 1783 vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]); 1784 1785 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 1786 vdo_log_error_strerror(VDO_BAD_MAPPING, 1787 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 1788 (unsigned long long) mapping.pbn, mapping.state, 1789 lock->tree_slots[lock->height - 1].page_index, 1790 lock->height - 1); 1791 abort_load(data_vio, VDO_BAD_MAPPING); 1792 return; 1793 } 1794 1795 if (!vdo_is_mapped_location(&mapping)) { 1796 /* The page we need is unallocated */ 1797 allocate_block_map_page(data_vio->logical.zone->block_map_zone, 1798 data_vio); 1799 return; 1800 } 1801 1802 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 1803 if (lock->height == 1) { 1804 finish_lookup(data_vio, VDO_SUCCESS); 1805 return; 1806 } 1807 1808 /* We know what page we need to load next */ 1809 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1810 } 1811 1812 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context) 1813 { 1814 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1815 1816 data_vio->tree_lock.height--; 1817 continue_with_loaded_page(data_vio, context); 1818 } 1819 1820 static void finish_block_map_page_load(struct vdo_completion *completion) 1821 { 1822 physical_block_number_t pbn; 1823 struct tree_page *tree_page; 1824 struct block_map_page *page; 1825 nonce_t nonce; 1826 struct vio *vio = as_vio(completion); 1827 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1828 struct data_vio *data_vio = completion->parent; 1829 struct block_map_zone *zone = pooled->context; 1830 struct tree_lock *tree_lock = &data_vio->tree_lock; 1831 1832 tree_lock->height--; 1833 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn; 1834 tree_page = get_tree_page(zone, tree_lock); 1835 page = (struct block_map_page *) tree_page->page_buffer; 1836 nonce = zone->block_map->nonce; 1837 1838 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page)) 1839 vdo_format_block_map_page(page, nonce, pbn, false); 1840 return_vio_to_pool(zone->vio_pool, pooled); 1841 1842 /* Release our claim to the load and wake any waiters */ 1843 release_page_lock(data_vio, "load"); 1844 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page); 1845 continue_with_loaded_page(data_vio, page); 1846 } 1847 1848 static void handle_io_error(struct vdo_completion *completion) 1849 { 1850 int result = completion->result; 1851 struct vio *vio = as_vio(completion); 1852 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio); 1853 struct data_vio *data_vio = completion->parent; 1854 struct block_map_zone *zone = pooled->context; 1855 1856 vio_record_metadata_io_error(vio); 1857 return_vio_to_pool(zone->vio_pool, pooled); 1858 abort_load(data_vio, result); 1859 } 1860 1861 static void load_page_endio(struct bio *bio) 1862 { 1863 struct vio *vio = bio->bi_private; 1864 struct data_vio *data_vio = vio->completion.parent; 1865 1866 continue_vio_after_io(vio, finish_block_map_page_load, 1867 data_vio->logical.zone->thread_id); 1868 } 1869 1870 static void load_page(struct vdo_waiter *waiter, void *context) 1871 { 1872 struct pooled_vio *pooled = context; 1873 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1874 struct tree_lock *lock = &data_vio->tree_lock; 1875 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn; 1876 1877 pooled->vio.completion.parent = data_vio; 1878 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio, 1879 handle_io_error, REQ_OP_READ | REQ_PRIO); 1880 } 1881 1882 /* 1883 * If the page is already locked, queue up to wait for the lock to be released. If the lock is 1884 * acquired, @data_vio->tree_lock.locked will be true. 1885 */ 1886 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio) 1887 { 1888 int result; 1889 struct tree_lock *lock_holder; 1890 struct tree_lock *lock = &data_vio->tree_lock; 1891 height_t height = lock->height; 1892 struct block_map_tree_slot tree_slot = lock->tree_slots[height]; 1893 union page_key key; 1894 1895 key.descriptor = (struct page_descriptor) { 1896 .root_index = lock->root_index, 1897 .height = height, 1898 .page_index = tree_slot.page_index, 1899 .slot = tree_slot.block_map_slot.slot, 1900 }; 1901 lock->key = key.key; 1902 1903 result = vdo_int_map_put(zone->loading_pages, lock->key, 1904 lock, false, (void **) &lock_holder); 1905 if (result != VDO_SUCCESS) 1906 return result; 1907 1908 if (lock_holder == NULL) { 1909 /* We got the lock */ 1910 data_vio->tree_lock.locked = true; 1911 return VDO_SUCCESS; 1912 } 1913 1914 /* Someone else is loading or allocating the page we need */ 1915 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter); 1916 return VDO_SUCCESS; 1917 } 1918 1919 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */ 1920 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio) 1921 { 1922 int result; 1923 1924 result = attempt_page_lock(zone, data_vio); 1925 if (result != VDO_SUCCESS) { 1926 abort_load(data_vio, result); 1927 return; 1928 } 1929 1930 if (data_vio->tree_lock.locked) { 1931 data_vio->waiter.callback = load_page; 1932 acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter); 1933 } 1934 } 1935 1936 static void allocation_failure(struct vdo_completion *completion) 1937 { 1938 struct data_vio *data_vio = as_data_vio(completion); 1939 1940 if (vdo_requeue_completion_if_needed(completion, 1941 data_vio->logical.zone->thread_id)) 1942 return; 1943 1944 abort_lookup(data_vio, completion->result, "allocation"); 1945 } 1946 1947 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context) 1948 { 1949 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter); 1950 struct tree_lock *tree_lock = &data_vio->tree_lock; 1951 physical_block_number_t pbn = *((physical_block_number_t *) context); 1952 1953 tree_lock->height--; 1954 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn; 1955 1956 if (tree_lock->height == 0) { 1957 finish_lookup(data_vio, VDO_SUCCESS); 1958 return; 1959 } 1960 1961 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio); 1962 } 1963 1964 /** expire_oldest_list() - Expire the oldest list. */ 1965 static void expire_oldest_list(struct dirty_lists *dirty_lists) 1966 { 1967 block_count_t i = dirty_lists->offset++; 1968 1969 dirty_lists->oldest_period++; 1970 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) { 1971 list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE], 1972 &dirty_lists->expired[VDO_TREE_PAGE]); 1973 } 1974 1975 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) { 1976 list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE], 1977 &dirty_lists->expired[VDO_CACHE_PAGE]); 1978 } 1979 1980 if (dirty_lists->offset == dirty_lists->maximum_age) 1981 dirty_lists->offset = 0; 1982 } 1983 1984 1985 /** update_period() - Update the dirty_lists period if necessary. */ 1986 static void update_period(struct dirty_lists *dirty, sequence_number_t period) 1987 { 1988 while (dirty->next_period <= period) { 1989 if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age) 1990 expire_oldest_list(dirty); 1991 dirty->next_period++; 1992 } 1993 } 1994 1995 /** write_expired_elements() - Write out the expired list. */ 1996 static void write_expired_elements(struct block_map_zone *zone) 1997 { 1998 struct tree_page *page, *ttmp; 1999 struct page_info *info, *ptmp; 2000 struct list_head *expired; 2001 u8 generation = zone->generation; 2002 2003 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE]; 2004 list_for_each_entry_safe(page, ttmp, expired, entry) { 2005 int result; 2006 2007 list_del_init(&page->entry); 2008 2009 result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter), 2010 "Newly expired page not already waiting to write"); 2011 if (result != VDO_SUCCESS) { 2012 enter_zone_read_only_mode(zone, result); 2013 continue; 2014 } 2015 2016 set_generation(zone, page, generation); 2017 if (!page->writing) 2018 enqueue_page(page, zone); 2019 } 2020 2021 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE]; 2022 list_for_each_entry_safe(info, ptmp, expired, state_entry) { 2023 list_del_init(&info->state_entry); 2024 schedule_page_save(info); 2025 } 2026 2027 save_pages(&zone->page_cache); 2028 } 2029 2030 /** 2031 * add_to_dirty_lists() - Add an element to the dirty lists. 2032 * @zone: The zone in which we are operating. 2033 * @entry: The list entry of the element to add. 2034 * @type: The type of page. 2035 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty. 2036 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a 2037 * lock. 2038 */ 2039 static void add_to_dirty_lists(struct block_map_zone *zone, 2040 struct list_head *entry, 2041 enum block_map_page_type type, 2042 sequence_number_t old_period, 2043 sequence_number_t new_period) 2044 { 2045 struct dirty_lists *dirty_lists = zone->dirty_lists; 2046 2047 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period))) 2048 return; 2049 2050 if (new_period < dirty_lists->oldest_period) { 2051 list_move_tail(entry, &dirty_lists->expired[type]); 2052 } else { 2053 update_period(dirty_lists, new_period); 2054 list_move_tail(entry, 2055 &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]); 2056 } 2057 2058 write_expired_elements(zone); 2059 } 2060 2061 /* 2062 * Record the allocation in the tree and wake any waiters now that the write lock has been 2063 * released. 2064 */ 2065 static void finish_block_map_allocation(struct vdo_completion *completion) 2066 { 2067 physical_block_number_t pbn; 2068 struct tree_page *tree_page; 2069 struct block_map_page *page; 2070 sequence_number_t old_lock; 2071 struct data_vio *data_vio = as_data_vio(completion); 2072 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2073 struct tree_lock *tree_lock = &data_vio->tree_lock; 2074 height_t height = tree_lock->height; 2075 2076 assert_data_vio_in_logical_zone(data_vio); 2077 2078 tree_page = get_tree_page(zone, tree_lock); 2079 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn; 2080 2081 /* Record the allocation. */ 2082 page = (struct block_map_page *) tree_page->page_buffer; 2083 old_lock = tree_page->recovery_lock; 2084 vdo_update_block_map_page(page, data_vio, pbn, 2085 VDO_MAPPING_STATE_UNCOMPRESSED, 2086 &tree_page->recovery_lock); 2087 2088 if (vdo_waiter_is_waiting(&tree_page->waiter)) { 2089 /* This page is waiting to be written out. */ 2090 if (zone->flusher != tree_page) { 2091 /* 2092 * The outstanding flush won't cover the update we just made, 2093 * so mark the page as needing another flush. 2094 */ 2095 set_generation(zone, tree_page, zone->generation); 2096 } 2097 } else { 2098 /* Put the page on a dirty list */ 2099 if (old_lock == 0) 2100 INIT_LIST_HEAD(&tree_page->entry); 2101 add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE, 2102 old_lock, tree_page->recovery_lock); 2103 } 2104 2105 tree_lock->height--; 2106 if (height > 1) { 2107 /* Format the interior node we just allocated (in memory). */ 2108 tree_page = get_tree_page(zone, tree_lock); 2109 vdo_format_block_map_page(tree_page->page_buffer, 2110 zone->block_map->nonce, 2111 pbn, false); 2112 } 2113 2114 /* Release our claim to the allocation and wake any waiters */ 2115 release_page_lock(data_vio, "allocation"); 2116 vdo_waitq_notify_all_waiters(&tree_lock->waiters, 2117 continue_allocation_for_waiter, &pbn); 2118 if (tree_lock->height == 0) { 2119 finish_lookup(data_vio, VDO_SUCCESS); 2120 return; 2121 } 2122 2123 allocate_block_map_page(zone, data_vio); 2124 } 2125 2126 static void release_block_map_write_lock(struct vdo_completion *completion) 2127 { 2128 struct data_vio *data_vio = as_data_vio(completion); 2129 2130 assert_data_vio_in_allocated_zone(data_vio); 2131 2132 release_data_vio_allocation_lock(data_vio, true); 2133 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation); 2134 } 2135 2136 /* 2137 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled, 2138 * to prevent deduplication against the block after we release the write lock on it, but before we 2139 * write out the page. 2140 */ 2141 static void set_block_map_page_reference_count(struct vdo_completion *completion) 2142 { 2143 struct data_vio *data_vio = as_data_vio(completion); 2144 2145 assert_data_vio_in_allocated_zone(data_vio); 2146 2147 completion->callback = release_block_map_write_lock; 2148 vdo_modify_reference_count(completion, &data_vio->increment_updater); 2149 } 2150 2151 static void journal_block_map_allocation(struct vdo_completion *completion) 2152 { 2153 struct data_vio *data_vio = as_data_vio(completion); 2154 2155 assert_data_vio_in_journal_zone(data_vio); 2156 2157 set_data_vio_allocated_zone_callback(data_vio, 2158 set_block_map_page_reference_count); 2159 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 2160 } 2161 2162 static void allocate_block(struct vdo_completion *completion) 2163 { 2164 struct data_vio *data_vio = as_data_vio(completion); 2165 struct tree_lock *lock = &data_vio->tree_lock; 2166 physical_block_number_t pbn; 2167 2168 assert_data_vio_in_allocated_zone(data_vio); 2169 2170 if (!vdo_allocate_block_in_zone(data_vio)) 2171 return; 2172 2173 pbn = data_vio->allocation.pbn; 2174 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn; 2175 data_vio->increment_updater = (struct reference_updater) { 2176 .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING, 2177 .increment = true, 2178 .zpbn = { 2179 .pbn = pbn, 2180 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 2181 }, 2182 .lock = data_vio->allocation.lock, 2183 }; 2184 2185 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation); 2186 } 2187 2188 static void allocate_block_map_page(struct block_map_zone *zone, 2189 struct data_vio *data_vio) 2190 { 2191 int result; 2192 2193 if (!data_vio->write || data_vio->is_discard) { 2194 /* This is a pure read or a discard, so there's nothing left to do here. */ 2195 finish_lookup(data_vio, VDO_SUCCESS); 2196 return; 2197 } 2198 2199 result = attempt_page_lock(zone, data_vio); 2200 if (result != VDO_SUCCESS) { 2201 abort_lookup(data_vio, result, "allocation"); 2202 return; 2203 } 2204 2205 if (!data_vio->tree_lock.locked) 2206 return; 2207 2208 data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK, 2209 allocate_block, allocation_failure); 2210 } 2211 2212 /** 2213 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio 2214 * resides and cache that result in the data_vio. 2215 * 2216 * All ancestors in the tree will be allocated or loaded, as needed. 2217 */ 2218 void vdo_find_block_map_slot(struct data_vio *data_vio) 2219 { 2220 page_number_t page_index; 2221 struct block_map_tree_slot tree_slot; 2222 struct data_location mapping; 2223 struct block_map_page *page = NULL; 2224 struct tree_lock *lock = &data_vio->tree_lock; 2225 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 2226 2227 zone->active_lookups++; 2228 if (vdo_is_state_draining(&zone->state)) { 2229 finish_lookup(data_vio, VDO_SHUTTING_DOWN); 2230 return; 2231 } 2232 2233 lock->tree_slots[0].block_map_slot.slot = 2234 data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2235 page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count); 2236 tree_slot = (struct block_map_tree_slot) { 2237 .page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2238 .block_map_slot = { 2239 .pbn = 0, 2240 .slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE, 2241 }, 2242 }; 2243 2244 for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) { 2245 physical_block_number_t pbn; 2246 2247 lock->tree_slots[lock->height] = tree_slot; 2248 page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer); 2249 pbn = vdo_get_block_map_page_pbn(page); 2250 if (pbn != VDO_ZERO_BLOCK) { 2251 lock->tree_slots[lock->height].block_map_slot.pbn = pbn; 2252 break; 2253 } 2254 2255 /* Calculate the index and slot for the next level. */ 2256 tree_slot.block_map_slot.slot = 2257 tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2258 tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2259 } 2260 2261 /* The page at this height has been allocated and loaded. */ 2262 mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]); 2263 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) { 2264 vdo_log_error_strerror(VDO_BAD_MAPPING, 2265 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u", 2266 (unsigned long long) mapping.pbn, mapping.state, 2267 lock->tree_slots[lock->height - 1].page_index, 2268 lock->height - 1); 2269 abort_load(data_vio, VDO_BAD_MAPPING); 2270 return; 2271 } 2272 2273 if (!vdo_is_mapped_location(&mapping)) { 2274 /* The page we want one level down has not been allocated, so allocate it. */ 2275 allocate_block_map_page(zone, data_vio); 2276 return; 2277 } 2278 2279 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn; 2280 if (lock->height == 1) { 2281 /* This is the ultimate block map page, so we're done */ 2282 finish_lookup(data_vio, VDO_SUCCESS); 2283 return; 2284 } 2285 2286 /* We know what page we need to load. */ 2287 load_block_map_page(zone, data_vio); 2288 } 2289 2290 /* 2291 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree 2292 * pages have been loaded, otherwise, it may give the wrong answer (0). 2293 */ 2294 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map, 2295 page_number_t page_number) 2296 { 2297 struct data_location mapping; 2298 struct tree_page *tree_page; 2299 struct block_map_page *page; 2300 root_count_t root_index = page_number % map->root_count; 2301 page_number_t page_index = page_number / map->root_count; 2302 slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2303 2304 page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2305 2306 tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index); 2307 page = (struct block_map_page *) tree_page->page_buffer; 2308 if (!page->header.initialized) 2309 return VDO_ZERO_BLOCK; 2310 2311 mapping = vdo_unpack_block_map_entry(&page->entries[slot]); 2312 if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state)) 2313 return VDO_ZERO_BLOCK; 2314 return mapping.pbn; 2315 } 2316 2317 /* 2318 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This 2319 * method is used when correcting errors in the tree during read-only rebuild. 2320 */ 2321 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone) 2322 { 2323 bool waiting = vdo_waiter_is_waiting(&page->waiter); 2324 2325 if (waiting && (zone->flusher == page)) 2326 return; 2327 2328 set_generation(zone, page, zone->generation); 2329 if (waiting || page->writing) 2330 return; 2331 2332 enqueue_page(page, zone); 2333 } 2334 2335 static int make_segment(struct forest *old_forest, block_count_t new_pages, 2336 struct boundary *new_boundary, struct forest *forest) 2337 { 2338 size_t index = (old_forest == NULL) ? 0 : old_forest->segments; 2339 struct tree_page *page_ptr; 2340 page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT]; 2341 height_t height; 2342 root_count_t root; 2343 int result; 2344 2345 forest->segments = index + 1; 2346 2347 result = vdo_allocate(forest->segments, struct boundary, 2348 "forest boundary array", &forest->boundaries); 2349 if (result != VDO_SUCCESS) 2350 return result; 2351 2352 result = vdo_allocate(forest->segments, struct tree_page *, 2353 "forest page pointers", &forest->pages); 2354 if (result != VDO_SUCCESS) 2355 return result; 2356 2357 result = vdo_allocate(new_pages, struct tree_page, 2358 "new forest pages", &forest->pages[index]); 2359 if (result != VDO_SUCCESS) 2360 return result; 2361 2362 if (index > 0) { 2363 memcpy(forest->boundaries, old_forest->boundaries, 2364 index * sizeof(struct boundary)); 2365 memcpy(forest->pages, old_forest->pages, 2366 index * sizeof(struct tree_page *)); 2367 } 2368 2369 memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary)); 2370 2371 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2372 segment_sizes[height] = new_boundary->levels[height]; 2373 if (index > 0) 2374 segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height]; 2375 } 2376 2377 page_ptr = forest->pages[index]; 2378 for (root = 0; root < forest->map->root_count; root++) { 2379 struct block_map_tree_segment *segment; 2380 struct block_map_tree *tree = &(forest->trees[root]); 2381 height_t height; 2382 2383 int result = vdo_allocate(forest->segments, 2384 struct block_map_tree_segment, 2385 "tree root segments", &tree->segments); 2386 if (result != VDO_SUCCESS) 2387 return result; 2388 2389 if (index > 0) { 2390 memcpy(tree->segments, old_forest->trees[root].segments, 2391 index * sizeof(struct block_map_tree_segment)); 2392 } 2393 2394 segment = &(tree->segments[index]); 2395 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) { 2396 if (segment_sizes[height] == 0) 2397 continue; 2398 2399 segment->levels[height] = page_ptr; 2400 if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) { 2401 /* Record the root. */ 2402 struct block_map_page *page = 2403 vdo_format_block_map_page(page_ptr->page_buffer, 2404 forest->map->nonce, 2405 VDO_INVALID_PBN, true); 2406 page->entries[0] = 2407 vdo_pack_block_map_entry(forest->map->root_origin + root, 2408 VDO_MAPPING_STATE_UNCOMPRESSED); 2409 } 2410 page_ptr += segment_sizes[height]; 2411 } 2412 } 2413 2414 return VDO_SUCCESS; 2415 } 2416 2417 static void deforest(struct forest *forest, size_t first_page_segment) 2418 { 2419 root_count_t root; 2420 2421 if (forest->pages != NULL) { 2422 size_t segment; 2423 2424 for (segment = first_page_segment; segment < forest->segments; segment++) 2425 vdo_free(forest->pages[segment]); 2426 vdo_free(forest->pages); 2427 } 2428 2429 for (root = 0; root < forest->map->root_count; root++) 2430 vdo_free(forest->trees[root].segments); 2431 2432 vdo_free(forest->boundaries); 2433 vdo_free(forest); 2434 } 2435 2436 /** 2437 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if 2438 * there is one. 2439 * @entries: The number of entries the block map will hold. 2440 * 2441 * Return: VDO_SUCCESS or an error. 2442 */ 2443 static int make_forest(struct block_map *map, block_count_t entries) 2444 { 2445 struct forest *forest, *old_forest = map->forest; 2446 struct boundary new_boundary, *old_boundary = NULL; 2447 block_count_t new_pages; 2448 int result; 2449 2450 if (old_forest != NULL) 2451 old_boundary = &(old_forest->boundaries[old_forest->segments - 1]); 2452 2453 new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary, 2454 entries, &new_boundary); 2455 if (new_pages == 0) { 2456 map->next_entry_count = entries; 2457 return VDO_SUCCESS; 2458 } 2459 2460 result = vdo_allocate_extended(struct forest, map->root_count, 2461 struct block_map_tree, __func__, 2462 &forest); 2463 if (result != VDO_SUCCESS) 2464 return result; 2465 2466 forest->map = map; 2467 result = make_segment(old_forest, new_pages, &new_boundary, forest); 2468 if (result != VDO_SUCCESS) { 2469 deforest(forest, forest->segments - 1); 2470 return result; 2471 } 2472 2473 map->next_forest = forest; 2474 map->next_entry_count = entries; 2475 return VDO_SUCCESS; 2476 } 2477 2478 /** 2479 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest. 2480 */ 2481 static void replace_forest(struct block_map *map) 2482 { 2483 if (map->next_forest != NULL) { 2484 if (map->forest != NULL) 2485 deforest(map->forest, map->forest->segments); 2486 map->forest = vdo_forget(map->next_forest); 2487 } 2488 2489 map->entry_count = map->next_entry_count; 2490 map->next_entry_count = 0; 2491 } 2492 2493 /** 2494 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the 2495 * traversal. 2496 */ 2497 static void finish_cursor(struct cursor *cursor) 2498 { 2499 struct cursors *cursors = cursor->parent; 2500 struct vdo_completion *completion = cursors->completion; 2501 2502 return_vio_to_pool(cursors->pool, vdo_forget(cursor->vio)); 2503 if (--cursors->active_roots > 0) 2504 return; 2505 2506 vdo_free(cursors); 2507 2508 vdo_finish_completion(completion); 2509 } 2510 2511 static void traverse(struct cursor *cursor); 2512 2513 /** 2514 * continue_traversal() - Continue traversing a block map tree. 2515 * @completion: The VIO doing a read or write. 2516 */ 2517 static void continue_traversal(struct vdo_completion *completion) 2518 { 2519 vio_record_metadata_io_error(as_vio(completion)); 2520 traverse(completion->parent); 2521 } 2522 2523 /** 2524 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded. 2525 * @completion: The VIO doing the read. 2526 */ 2527 static void finish_traversal_load(struct vdo_completion *completion) 2528 { 2529 struct cursor *cursor = completion->parent; 2530 height_t height = cursor->height; 2531 struct cursor_level *level = &cursor->levels[height]; 2532 struct tree_page *tree_page = 2533 &(cursor->tree->segments[0].levels[height][level->page_index]); 2534 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2535 2536 vdo_copy_valid_page(cursor->vio->vio.data, 2537 cursor->parent->zone->block_map->nonce, 2538 pbn_from_vio_bio(cursor->vio->vio.bio), page); 2539 traverse(cursor); 2540 } 2541 2542 static void traversal_endio(struct bio *bio) 2543 { 2544 struct vio *vio = bio->bi_private; 2545 struct cursor *cursor = vio->completion.parent; 2546 2547 continue_vio_after_io(vio, finish_traversal_load, 2548 cursor->parent->zone->thread_id); 2549 } 2550 2551 /** 2552 * traverse() - Traverse a single block map tree. 2553 * 2554 * This is the recursive heart of the traversal process. 2555 */ 2556 static void traverse(struct cursor *cursor) 2557 { 2558 for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) { 2559 height_t height = cursor->height; 2560 struct cursor_level *level = &cursor->levels[height]; 2561 struct tree_page *tree_page = 2562 &(cursor->tree->segments[0].levels[height][level->page_index]); 2563 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer; 2564 2565 if (!page->header.initialized) 2566 continue; 2567 2568 for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) { 2569 struct cursor_level *next_level; 2570 page_number_t entry_index = 2571 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot; 2572 struct data_location location = 2573 vdo_unpack_block_map_entry(&page->entries[level->slot]); 2574 2575 if (!vdo_is_valid_location(&location)) { 2576 /* This entry is invalid, so remove it from the page. */ 2577 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2578 vdo_write_tree_page(tree_page, cursor->parent->zone); 2579 continue; 2580 } 2581 2582 if (!vdo_is_mapped_location(&location)) 2583 continue; 2584 2585 /* Erase mapped entries past the end of the logical space. */ 2586 if (entry_index >= cursor->boundary.levels[height]) { 2587 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2588 vdo_write_tree_page(tree_page, cursor->parent->zone); 2589 continue; 2590 } 2591 2592 if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) { 2593 int result = cursor->parent->entry_callback(location.pbn, 2594 cursor->parent->completion); 2595 if (result != VDO_SUCCESS) { 2596 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY; 2597 vdo_write_tree_page(tree_page, cursor->parent->zone); 2598 continue; 2599 } 2600 } 2601 2602 if (cursor->height == 0) 2603 continue; 2604 2605 cursor->height--; 2606 next_level = &cursor->levels[cursor->height]; 2607 next_level->page_index = entry_index; 2608 next_level->slot = 0; 2609 level->slot++; 2610 vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn, 2611 traversal_endio, continue_traversal, 2612 REQ_OP_READ | REQ_PRIO); 2613 return; 2614 } 2615 } 2616 2617 finish_cursor(cursor); 2618 } 2619 2620 /** 2621 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with 2622 * which to load pages. 2623 * @context: The pooled_vio just acquired. 2624 * 2625 * Implements waiter_callback_fn. 2626 */ 2627 static void launch_cursor(struct vdo_waiter *waiter, void *context) 2628 { 2629 struct cursor *cursor = container_of(waiter, struct cursor, waiter); 2630 struct pooled_vio *pooled = context; 2631 2632 cursor->vio = pooled; 2633 pooled->vio.completion.parent = cursor; 2634 pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id; 2635 traverse(cursor); 2636 } 2637 2638 /** 2639 * compute_boundary() - Compute the number of pages used at each level of the given root's tree. 2640 * 2641 * Return: The list of page counts as a boundary structure. 2642 */ 2643 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index) 2644 { 2645 struct boundary boundary; 2646 height_t height; 2647 page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count); 2648 /* 2649 * Compute the leaf pages for this root. If the number of leaf pages does not distribute 2650 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to 2651 * roots starting from tree 0. 2652 */ 2653 page_count_t last_tree_root = (leaf_pages - 1) % map->root_count; 2654 page_count_t level_pages = leaf_pages / map->root_count; 2655 2656 if (root_index <= last_tree_root) 2657 level_pages++; 2658 2659 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) { 2660 boundary.levels[height] = level_pages; 2661 level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE); 2662 } 2663 2664 /* The root node always exists, even if the root is otherwise unused. */ 2665 boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1; 2666 2667 return boundary; 2668 } 2669 2670 /** 2671 * vdo_traverse_forest() - Walk the entire forest of a block map. 2672 * @callback: A function to call with the pbn of each allocated node in the forest. 2673 * @completion: The completion to notify on each traversed PBN, and when traversal completes. 2674 */ 2675 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback, 2676 struct vdo_completion *completion) 2677 { 2678 root_count_t root; 2679 struct cursors *cursors; 2680 int result; 2681 2682 result = vdo_allocate_extended(struct cursors, map->root_count, 2683 struct cursor, __func__, &cursors); 2684 if (result != VDO_SUCCESS) { 2685 vdo_fail_completion(completion, result); 2686 return; 2687 } 2688 2689 cursors->zone = &map->zones[0]; 2690 cursors->pool = cursors->zone->vio_pool; 2691 cursors->entry_callback = callback; 2692 cursors->completion = completion; 2693 cursors->active_roots = map->root_count; 2694 for (root = 0; root < map->root_count; root++) { 2695 struct cursor *cursor = &cursors->cursors[root]; 2696 2697 *cursor = (struct cursor) { 2698 .tree = &map->forest->trees[root], 2699 .height = VDO_BLOCK_MAP_TREE_HEIGHT - 1, 2700 .parent = cursors, 2701 .boundary = compute_boundary(map, root), 2702 }; 2703 2704 cursor->waiter.callback = launch_cursor; 2705 acquire_vio_from_pool(cursors->pool, &cursor->waiter); 2706 } 2707 } 2708 2709 /** 2710 * initialize_block_map_zone() - Initialize the per-zone portions of the block map. 2711 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be 2712 * written out. 2713 */ 2714 static int __must_check initialize_block_map_zone(struct block_map *map, 2715 zone_count_t zone_number, 2716 page_count_t cache_size, 2717 block_count_t maximum_age) 2718 { 2719 int result; 2720 block_count_t i; 2721 struct vdo *vdo = map->vdo; 2722 struct block_map_zone *zone = &map->zones[zone_number]; 2723 2724 BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64)); 2725 2726 zone->zone_number = zone_number; 2727 zone->thread_id = vdo->thread_config.logical_threads[zone_number]; 2728 zone->block_map = map; 2729 2730 result = vdo_allocate_extended(struct dirty_lists, maximum_age, 2731 dirty_era_t, __func__, 2732 &zone->dirty_lists); 2733 if (result != VDO_SUCCESS) 2734 return result; 2735 2736 zone->dirty_lists->maximum_age = maximum_age; 2737 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]); 2738 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]); 2739 2740 for (i = 0; i < maximum_age; i++) { 2741 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]); 2742 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]); 2743 } 2744 2745 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages); 2746 if (result != VDO_SUCCESS) 2747 return result; 2748 2749 result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 2750 zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR, 2751 VIO_PRIORITY_METADATA, zone, &zone->vio_pool); 2752 if (result != VDO_SUCCESS) 2753 return result; 2754 2755 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 2756 2757 zone->page_cache.zone = zone; 2758 zone->page_cache.vdo = vdo; 2759 zone->page_cache.page_count = cache_size / map->zone_count; 2760 zone->page_cache.stats.free_pages = zone->page_cache.page_count; 2761 2762 result = allocate_cache_components(&zone->page_cache); 2763 if (result != VDO_SUCCESS) 2764 return result; 2765 2766 /* initialize empty circular queues */ 2767 INIT_LIST_HEAD(&zone->page_cache.lru_list); 2768 INIT_LIST_HEAD(&zone->page_cache.outgoing_list); 2769 2770 return VDO_SUCCESS; 2771 } 2772 2773 /* Implements vdo_zone_thread_getter_fn */ 2774 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number) 2775 { 2776 struct block_map *map = context; 2777 2778 return map->zones[zone_number].thread_id; 2779 } 2780 2781 /* Implements vdo_action_preamble_fn */ 2782 static void prepare_for_era_advance(void *context, struct vdo_completion *parent) 2783 { 2784 struct block_map *map = context; 2785 2786 map->current_era_point = map->pending_era_point; 2787 vdo_finish_completion(parent); 2788 } 2789 2790 /* Implements vdo_zone_action_fn */ 2791 static void advance_block_map_zone_era(void *context, zone_count_t zone_number, 2792 struct vdo_completion *parent) 2793 { 2794 struct block_map *map = context; 2795 struct block_map_zone *zone = &map->zones[zone_number]; 2796 2797 update_period(zone->dirty_lists, map->current_era_point); 2798 write_expired_elements(zone); 2799 vdo_finish_completion(parent); 2800 } 2801 2802 /* 2803 * Schedule an era advance if necessary. This method should not be called directly. Rather, call 2804 * vdo_schedule_default_action() on the block map's action manager. 2805 * 2806 * Implements vdo_action_scheduler_fn. 2807 */ 2808 static bool schedule_era_advance(void *context) 2809 { 2810 struct block_map *map = context; 2811 2812 if (map->current_era_point == map->pending_era_point) 2813 return false; 2814 2815 return vdo_schedule_action(map->action_manager, prepare_for_era_advance, 2816 advance_block_map_zone_era, NULL, NULL); 2817 } 2818 2819 static void uninitialize_block_map_zone(struct block_map_zone *zone) 2820 { 2821 struct vdo_page_cache *cache = &zone->page_cache; 2822 2823 vdo_free(vdo_forget(zone->dirty_lists)); 2824 free_vio_pool(vdo_forget(zone->vio_pool)); 2825 vdo_int_map_free(vdo_forget(zone->loading_pages)); 2826 if (cache->infos != NULL) { 2827 struct page_info *info; 2828 2829 for (info = cache->infos; info < cache->infos + cache->page_count; info++) 2830 free_vio(vdo_forget(info->vio)); 2831 } 2832 2833 vdo_int_map_free(vdo_forget(cache->page_map)); 2834 vdo_free(vdo_forget(cache->infos)); 2835 vdo_free(vdo_forget(cache->pages)); 2836 } 2837 2838 void vdo_free_block_map(struct block_map *map) 2839 { 2840 zone_count_t zone; 2841 2842 if (map == NULL) 2843 return; 2844 2845 for (zone = 0; zone < map->zone_count; zone++) 2846 uninitialize_block_map_zone(&map->zones[zone]); 2847 2848 vdo_abandon_block_map_growth(map); 2849 if (map->forest != NULL) 2850 deforest(vdo_forget(map->forest), 0); 2851 vdo_free(vdo_forget(map->action_manager)); 2852 vdo_free(map); 2853 } 2854 2855 /* @journal may be NULL. */ 2856 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks, 2857 struct vdo *vdo, struct recovery_journal *journal, 2858 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age, 2859 struct block_map **map_ptr) 2860 { 2861 struct block_map *map; 2862 int result; 2863 zone_count_t zone = 0; 2864 2865 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE != 2866 ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) / 2867 sizeof(struct block_map_entry))); 2868 result = VDO_ASSERT(cache_size > 0, "block map cache size is specified"); 2869 if (result != VDO_SUCCESS) 2870 return result; 2871 2872 result = vdo_allocate_extended(struct block_map, 2873 vdo->thread_config.logical_zone_count, 2874 struct block_map_zone, __func__, &map); 2875 if (result != VDO_SUCCESS) 2876 return result; 2877 2878 map->vdo = vdo; 2879 map->root_origin = state.root_origin; 2880 map->root_count = state.root_count; 2881 map->entry_count = logical_blocks; 2882 map->journal = journal; 2883 map->nonce = nonce; 2884 2885 result = make_forest(map, map->entry_count); 2886 if (result != VDO_SUCCESS) { 2887 vdo_free_block_map(map); 2888 return result; 2889 } 2890 2891 replace_forest(map); 2892 2893 map->zone_count = vdo->thread_config.logical_zone_count; 2894 for (zone = 0; zone < map->zone_count; zone++) { 2895 result = initialize_block_map_zone(map, zone, cache_size, maximum_age); 2896 if (result != VDO_SUCCESS) { 2897 vdo_free_block_map(map); 2898 return result; 2899 } 2900 } 2901 2902 result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id, 2903 vdo_get_recovery_journal_thread_id(journal), 2904 map, schedule_era_advance, vdo, 2905 &map->action_manager); 2906 if (result != VDO_SUCCESS) { 2907 vdo_free_block_map(map); 2908 return result; 2909 } 2910 2911 *map_ptr = map; 2912 return VDO_SUCCESS; 2913 } 2914 2915 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map) 2916 { 2917 return (struct block_map_state_2_0) { 2918 .flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN, 2919 /* This is the flat page count, which has turned out to always be 0. */ 2920 .flat_page_count = 0, 2921 .root_origin = map->root_origin, 2922 .root_count = map->root_count, 2923 }; 2924 } 2925 2926 /* The block map needs to know the journals' sequence number to initialize the eras. */ 2927 void vdo_initialize_block_map_from_journal(struct block_map *map, 2928 struct recovery_journal *journal) 2929 { 2930 zone_count_t z = 0; 2931 2932 map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal); 2933 map->pending_era_point = map->current_era_point; 2934 2935 for (z = 0; z < map->zone_count; z++) { 2936 struct dirty_lists *dirty_lists = map->zones[z].dirty_lists; 2937 2938 VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set"); 2939 dirty_lists->oldest_period = map->current_era_point; 2940 dirty_lists->next_period = map->current_era_point + 1; 2941 dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age; 2942 } 2943 } 2944 2945 /* Compute the logical zone for the LBN of a data vio. */ 2946 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio) 2947 { 2948 struct block_map *map = vdo_from_data_vio(data_vio)->block_map; 2949 struct tree_lock *tree_lock = &data_vio->tree_lock; 2950 page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE; 2951 2952 tree_lock->tree_slots[0].page_index = page_number; 2953 tree_lock->root_index = page_number % map->root_count; 2954 return (tree_lock->root_index % map->zone_count); 2955 } 2956 2957 void vdo_advance_block_map_era(struct block_map *map, 2958 sequence_number_t recovery_block_number) 2959 { 2960 if (map == NULL) 2961 return; 2962 2963 map->pending_era_point = recovery_block_number; 2964 vdo_schedule_default_action(map->action_manager); 2965 } 2966 2967 /* Implements vdo_admin_initiator_fn */ 2968 static void initiate_drain(struct admin_state *state) 2969 { 2970 struct block_map_zone *zone = container_of(state, struct block_map_zone, state); 2971 2972 VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0), 2973 "%s() called with no active lookups", __func__); 2974 2975 if (!vdo_is_state_suspending(state)) { 2976 while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period) 2977 expire_oldest_list(zone->dirty_lists); 2978 write_expired_elements(zone); 2979 } 2980 2981 check_for_drain_complete(zone); 2982 } 2983 2984 /* Implements vdo_zone_action_fn. */ 2985 static void drain_zone(void *context, zone_count_t zone_number, 2986 struct vdo_completion *parent) 2987 { 2988 struct block_map *map = context; 2989 struct block_map_zone *zone = &map->zones[zone_number]; 2990 2991 vdo_start_draining(&zone->state, 2992 vdo_get_current_manager_operation(map->action_manager), 2993 parent, initiate_drain); 2994 } 2995 2996 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation, 2997 struct vdo_completion *parent) 2998 { 2999 vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL, 3000 parent); 3001 } 3002 3003 /* Implements vdo_zone_action_fn. */ 3004 static void resume_block_map_zone(void *context, zone_count_t zone_number, 3005 struct vdo_completion *parent) 3006 { 3007 struct block_map *map = context; 3008 struct block_map_zone *zone = &map->zones[zone_number]; 3009 3010 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state)); 3011 } 3012 3013 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent) 3014 { 3015 vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING, 3016 NULL, resume_block_map_zone, NULL, parent); 3017 } 3018 3019 /* Allocate an expanded collection of trees, for a future growth. */ 3020 int vdo_prepare_to_grow_block_map(struct block_map *map, 3021 block_count_t new_logical_blocks) 3022 { 3023 if (map->next_entry_count == new_logical_blocks) 3024 return VDO_SUCCESS; 3025 3026 if (map->next_entry_count > 0) 3027 vdo_abandon_block_map_growth(map); 3028 3029 if (new_logical_blocks < map->entry_count) { 3030 map->next_entry_count = map->entry_count; 3031 return VDO_SUCCESS; 3032 } 3033 3034 return make_forest(map, new_logical_blocks); 3035 } 3036 3037 /* Implements vdo_action_preamble_fn */ 3038 static void grow_forest(void *context, struct vdo_completion *completion) 3039 { 3040 replace_forest(context); 3041 vdo_finish_completion(completion); 3042 } 3043 3044 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */ 3045 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent) 3046 { 3047 vdo_schedule_operation(map->action_manager, 3048 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 3049 grow_forest, NULL, NULL, parent); 3050 } 3051 3052 void vdo_abandon_block_map_growth(struct block_map *map) 3053 { 3054 struct forest *forest = vdo_forget(map->next_forest); 3055 3056 if (forest != NULL) 3057 deforest(forest, forest->segments - 1); 3058 3059 map->next_entry_count = 0; 3060 } 3061 3062 /* Release the page completion and then continue the requester. */ 3063 static inline void finish_processing_page(struct vdo_completion *completion, int result) 3064 { 3065 struct vdo_completion *parent = completion->parent; 3066 3067 vdo_release_page_completion(completion); 3068 vdo_continue_completion(parent, result); 3069 } 3070 3071 static void handle_page_error(struct vdo_completion *completion) 3072 { 3073 finish_processing_page(completion, completion->result); 3074 } 3075 3076 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */ 3077 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable, 3078 vdo_action_fn action) 3079 { 3080 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3081 3082 if (vdo_is_state_draining(&zone->state)) { 3083 continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN); 3084 return; 3085 } 3086 3087 vdo_get_page(&data_vio->page_completion, zone, 3088 data_vio->tree_lock.tree_slots[0].block_map_slot.pbn, 3089 modifiable, &data_vio->vio.completion, 3090 action, handle_page_error, false); 3091 } 3092 3093 /** 3094 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped. 3095 * 3096 * This indicates the block map entry for the logical block is either unmapped or corrupted. 3097 */ 3098 static void clear_mapped_location(struct data_vio *data_vio) 3099 { 3100 data_vio->mapped = (struct zoned_pbn) { 3101 .state = VDO_MAPPING_STATE_UNMAPPED, 3102 }; 3103 } 3104 3105 /** 3106 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a 3107 * data_vio. 3108 * 3109 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any 3110 * other failure 3111 */ 3112 static int __must_check set_mapped_location(struct data_vio *data_vio, 3113 const struct block_map_entry *entry) 3114 { 3115 /* Unpack the PBN for logging purposes even if the entry is invalid. */ 3116 struct data_location mapped = vdo_unpack_block_map_entry(entry); 3117 3118 if (vdo_is_valid_location(&mapped)) { 3119 int result; 3120 3121 result = vdo_get_physical_zone(vdo_from_data_vio(data_vio), 3122 mapped.pbn, &data_vio->mapped.zone); 3123 if (result == VDO_SUCCESS) { 3124 data_vio->mapped.pbn = mapped.pbn; 3125 data_vio->mapped.state = mapped.state; 3126 return VDO_SUCCESS; 3127 } 3128 3129 /* 3130 * Return all errors not specifically known to be errors from validating the 3131 * location. 3132 */ 3133 if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING)) 3134 return result; 3135 } 3136 3137 /* 3138 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases 3139 * to VDO_BAD_MAPPING. 3140 */ 3141 vdo_log_error_strerror(VDO_BAD_MAPPING, 3142 "PBN %llu with state %u read from the block map was invalid", 3143 (unsigned long long) mapped.pbn, mapped.state); 3144 3145 /* 3146 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding 3147 * known data loss. 3148 */ 3149 if (!data_vio->write) 3150 return VDO_BAD_MAPPING; 3151 3152 /* 3153 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped 3154 * entry rather than fail the write. 3155 */ 3156 clear_mapped_location(data_vio); 3157 return VDO_SUCCESS; 3158 } 3159 3160 /* This callback is registered in vdo_get_mapped_block(). */ 3161 static void get_mapping_from_fetched_page(struct vdo_completion *completion) 3162 { 3163 int result; 3164 struct vdo_page_completion *vpc = as_vdo_page_completion(completion); 3165 const struct block_map_page *page; 3166 const struct block_map_entry *entry; 3167 struct data_vio *data_vio = as_data_vio(completion->parent); 3168 struct block_map_tree_slot *tree_slot; 3169 3170 if (completion->result != VDO_SUCCESS) { 3171 finish_processing_page(completion, completion->result); 3172 return; 3173 } 3174 3175 result = validate_completed_page(vpc, false); 3176 if (result != VDO_SUCCESS) { 3177 finish_processing_page(completion, result); 3178 return; 3179 } 3180 3181 page = (const struct block_map_page *) get_page_buffer(vpc->info); 3182 tree_slot = &data_vio->tree_lock.tree_slots[0]; 3183 entry = &page->entries[tree_slot->block_map_slot.slot]; 3184 3185 result = set_mapped_location(data_vio, entry); 3186 finish_processing_page(completion, result); 3187 } 3188 3189 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio, 3190 physical_block_number_t pbn, 3191 enum block_mapping_state mapping_state, 3192 sequence_number_t *recovery_lock) 3193 { 3194 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone; 3195 struct block_map *block_map = zone->block_map; 3196 struct recovery_journal *journal = block_map->journal; 3197 sequence_number_t old_locked, new_locked; 3198 struct tree_lock *tree_lock = &data_vio->tree_lock; 3199 3200 /* Encode the new mapping. */ 3201 page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] = 3202 vdo_pack_block_map_entry(pbn, mapping_state); 3203 3204 /* Adjust references on the recovery journal blocks. */ 3205 old_locked = *recovery_lock; 3206 new_locked = data_vio->recovery_sequence_number; 3207 3208 if ((old_locked == 0) || (old_locked > new_locked)) { 3209 vdo_acquire_recovery_journal_block_reference(journal, new_locked, 3210 VDO_ZONE_TYPE_LOGICAL, 3211 zone->zone_number); 3212 3213 if (old_locked > 0) { 3214 vdo_release_recovery_journal_block_reference(journal, old_locked, 3215 VDO_ZONE_TYPE_LOGICAL, 3216 zone->zone_number); 3217 } 3218 3219 *recovery_lock = new_locked; 3220 } 3221 3222 /* 3223 * FIXME: explain this more 3224 * Release the transferred lock from the data_vio. 3225 */ 3226 vdo_release_journal_entry_lock(journal, new_locked); 3227 data_vio->recovery_sequence_number = 0; 3228 } 3229 3230 static void put_mapping_in_fetched_page(struct vdo_completion *completion) 3231 { 3232 struct data_vio *data_vio = as_data_vio(completion->parent); 3233 sequence_number_t old_lock; 3234 struct vdo_page_completion *vpc; 3235 struct page_info *info; 3236 int result; 3237 3238 if (completion->result != VDO_SUCCESS) { 3239 finish_processing_page(completion, completion->result); 3240 return; 3241 } 3242 3243 vpc = as_vdo_page_completion(completion); 3244 result = validate_completed_page(vpc, true); 3245 if (result != VDO_SUCCESS) { 3246 finish_processing_page(completion, result); 3247 return; 3248 } 3249 3250 info = vpc->info; 3251 old_lock = info->recovery_lock; 3252 vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info), 3253 data_vio, data_vio->new_mapped.pbn, 3254 data_vio->new_mapped.state, &info->recovery_lock); 3255 set_info_state(info, PS_DIRTY); 3256 add_to_dirty_lists(info->cache->zone, &info->state_entry, 3257 VDO_CACHE_PAGE, old_lock, info->recovery_lock); 3258 finish_processing_page(completion, VDO_SUCCESS); 3259 } 3260 3261 /* Read a stored block mapping into a data_vio. */ 3262 void vdo_get_mapped_block(struct data_vio *data_vio) 3263 { 3264 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 3265 /* 3266 * We know that the block map page for this LBN has not been allocated, so the 3267 * block must be unmapped. 3268 */ 3269 clear_mapped_location(data_vio); 3270 continue_data_vio(data_vio); 3271 return; 3272 } 3273 3274 fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page); 3275 } 3276 3277 /* Update a stored block mapping to reflect a data_vio's new mapping. */ 3278 void vdo_put_mapped_block(struct data_vio *data_vio) 3279 { 3280 fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page); 3281 } 3282 3283 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map) 3284 { 3285 zone_count_t zone = 0; 3286 struct block_map_statistics totals; 3287 3288 memset(&totals, 0, sizeof(struct block_map_statistics)); 3289 for (zone = 0; zone < map->zone_count; zone++) { 3290 const struct block_map_statistics *stats = 3291 &(map->zones[zone].page_cache.stats); 3292 3293 totals.dirty_pages += READ_ONCE(stats->dirty_pages); 3294 totals.clean_pages += READ_ONCE(stats->clean_pages); 3295 totals.free_pages += READ_ONCE(stats->free_pages); 3296 totals.failed_pages += READ_ONCE(stats->failed_pages); 3297 totals.incoming_pages += READ_ONCE(stats->incoming_pages); 3298 totals.outgoing_pages += READ_ONCE(stats->outgoing_pages); 3299 totals.cache_pressure += READ_ONCE(stats->cache_pressure); 3300 totals.read_count += READ_ONCE(stats->read_count); 3301 totals.write_count += READ_ONCE(stats->write_count); 3302 totals.failed_reads += READ_ONCE(stats->failed_reads); 3303 totals.failed_writes += READ_ONCE(stats->failed_writes); 3304 totals.reclaimed += READ_ONCE(stats->reclaimed); 3305 totals.read_outgoing += READ_ONCE(stats->read_outgoing); 3306 totals.found_in_cache += READ_ONCE(stats->found_in_cache); 3307 totals.discard_required += READ_ONCE(stats->discard_required); 3308 totals.wait_for_page += READ_ONCE(stats->wait_for_page); 3309 totals.fetch_required += READ_ONCE(stats->fetch_required); 3310 totals.pages_loaded += READ_ONCE(stats->pages_loaded); 3311 totals.pages_saved += READ_ONCE(stats->pages_saved); 3312 totals.flush_count += READ_ONCE(stats->flush_count); 3313 } 3314 3315 return totals; 3316 } 3317