1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "slab-depot.h" 7 8 #include <linux/atomic.h> 9 #include <linux/bio.h> 10 #include <linux/err.h> 11 #include <linux/log2.h> 12 #include <linux/min_heap.h> 13 #include <linux/minmax.h> 14 15 #include "logger.h" 16 #include "memory-alloc.h" 17 #include "numeric.h" 18 #include "permassert.h" 19 #include "string-utils.h" 20 21 #include "action-manager.h" 22 #include "admin-state.h" 23 #include "completion.h" 24 #include "constants.h" 25 #include "data-vio.h" 26 #include "encodings.h" 27 #include "io-submitter.h" 28 #include "physical-zone.h" 29 #include "priority-table.h" 30 #include "recovery-journal.h" 31 #include "repair.h" 32 #include "status-codes.h" 33 #include "types.h" 34 #include "vdo.h" 35 #include "vio.h" 36 #include "wait-queue.h" 37 38 static const u64 BYTES_PER_WORD = sizeof(u64); 39 static const bool NORMAL_OPERATION = true; 40 41 /** 42 * get_lock() - Get the lock object for a slab journal block by sequence number. 43 * @journal: vdo_slab journal to retrieve from. 44 * @sequence_number: Sequence number of the block. 45 * 46 * Return: The lock object for the given sequence number. 47 */ 48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal, 49 sequence_number_t sequence_number) 50 { 51 return &journal->locks[sequence_number % journal->size]; 52 } 53 54 static bool is_slab_open(struct vdo_slab *slab) 55 { 56 return (!vdo_is_state_quiescing(&slab->state) && 57 !vdo_is_state_quiescent(&slab->state)); 58 } 59 60 /** 61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush. 62 * @journal: The journal to check. 63 * 64 * Return: true if there are no entry waiters, or if the slab is unrecovered. 65 */ 66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal) 67 { 68 return ((journal->slab->status != VDO_SLAB_REBUILDING) && 69 vdo_waitq_has_waiters(&journal->entry_waiters)); 70 } 71 72 /** 73 * is_reaping() - Check whether a reap is currently in progress. 74 * @journal: The journal which may be reaping. 75 * 76 * Return: true if the journal is reaping. 77 */ 78 static inline bool __must_check is_reaping(struct slab_journal *journal) 79 { 80 return (journal->head != journal->unreapable); 81 } 82 83 /** 84 * initialize_tail_block() - Initialize tail block as a new block. 85 * @journal: The journal whose tail block is being initialized. 86 */ 87 static void initialize_tail_block(struct slab_journal *journal) 88 { 89 struct slab_journal_block_header *header = &journal->tail_header; 90 91 header->sequence_number = journal->tail; 92 header->entry_count = 0; 93 header->has_block_map_increments = false; 94 } 95 96 /** 97 * initialize_journal_state() - Set all journal fields appropriately to start journaling. 98 * @journal: The journal to be reset, based on its tail sequence number. 99 */ 100 static void initialize_journal_state(struct slab_journal *journal) 101 { 102 journal->unreapable = journal->head; 103 journal->reap_lock = get_lock(journal, journal->unreapable); 104 journal->next_commit = journal->tail; 105 journal->summarized = journal->last_summarized = journal->tail; 106 initialize_tail_block(journal); 107 } 108 109 /** 110 * block_is_full() - Check whether a journal block is full. 111 * @journal: The slab journal for the block. 112 * 113 * Return: true if the tail block is full. 114 */ 115 static bool __must_check block_is_full(struct slab_journal *journal) 116 { 117 journal_entry_count_t count = journal->tail_header.entry_count; 118 119 return (journal->tail_header.has_block_map_increments ? 120 (journal->full_entries_per_block == count) : 121 (journal->entries_per_block == count)); 122 } 123 124 static void add_entries(struct slab_journal *journal); 125 static void update_tail_block_location(struct slab_journal *journal); 126 static void release_journal_locks(struct vdo_waiter *waiter, void *context); 127 128 /** 129 * is_slab_journal_blank() - Check whether a slab's journal is blank. 130 * 131 * A slab journal is blank if it has never had any entries recorded in it. 132 * 133 * Return: true if the slab's journal has never been modified. 134 */ 135 static bool is_slab_journal_blank(const struct vdo_slab *slab) 136 { 137 return ((slab->journal.tail == 1) && 138 (slab->journal.tail_header.entry_count == 0)); 139 } 140 141 /** 142 * mark_slab_journal_dirty() - Put a slab journal on the dirty list of its allocator in the correct 143 * order. 144 * @journal: The journal to be marked dirty. 145 * @lock: The recovery journal lock held by the slab journal. 146 */ 147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock) 148 { 149 struct slab_journal *dirty_journal; 150 struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals; 151 152 VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean"); 153 154 journal->recovery_lock = lock; 155 list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) { 156 if (dirty_journal->recovery_lock <= journal->recovery_lock) 157 break; 158 } 159 160 list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next); 161 } 162 163 static void mark_slab_journal_clean(struct slab_journal *journal) 164 { 165 journal->recovery_lock = 0; 166 list_del_init(&journal->dirty_entry); 167 } 168 169 static void check_if_slab_drained(struct vdo_slab *slab) 170 { 171 bool read_only; 172 struct slab_journal *journal = &slab->journal; 173 const struct admin_state_code *code; 174 175 if (!vdo_is_state_draining(&slab->state) || 176 must_make_entries_to_flush(journal) || 177 is_reaping(journal) || 178 journal->waiting_to_commit || 179 !list_empty(&journal->uncommitted_blocks) || 180 journal->updating_slab_summary || 181 (slab->active_count > 0)) 182 return; 183 184 /* When not suspending or recovering, the slab must be clean. */ 185 code = vdo_get_admin_state_code(&slab->state); 186 read_only = vdo_is_read_only(slab->allocator->depot->vdo); 187 if (!read_only && 188 vdo_waitq_has_waiters(&slab->dirty_blocks) && 189 (code != VDO_ADMIN_STATE_SUSPENDING) && 190 (code != VDO_ADMIN_STATE_RECOVERING)) 191 return; 192 193 vdo_finish_draining_with_result(&slab->state, 194 (read_only ? VDO_READ_ONLY : VDO_SUCCESS)); 195 } 196 197 /* FULLNESS HINT COMPUTATION */ 198 199 /** 200 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be 201 * stored in a slab_summary_entry's 7 bits that are dedicated to its free 202 * count. 203 * @depot: The depot whose summary being updated. 204 * @free_blocks: The number of free blocks. 205 * 206 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though 207 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least 208 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might 209 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f 210 * is 0, which would make it impossible to distinguish completely full from completely empty. 211 * 212 * Return: A fullness hint, which can be stored in 7 bits. 213 */ 214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot, 215 block_count_t free_blocks) 216 { 217 block_count_t hint; 218 219 VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23"); 220 221 if (free_blocks == 0) 222 return 0; 223 224 hint = free_blocks >> depot->hint_shift; 225 return ((hint == 0) ? 1 : hint); 226 } 227 228 /** 229 * check_summary_drain_complete() - Check whether an allocators summary has finished draining. 230 */ 231 static void check_summary_drain_complete(struct block_allocator *allocator) 232 { 233 if (!vdo_is_state_draining(&allocator->summary_state) || 234 (allocator->summary_write_count > 0)) 235 return; 236 237 vdo_finish_operation(&allocator->summary_state, 238 (vdo_is_read_only(allocator->depot->vdo) ? 239 VDO_READ_ONLY : VDO_SUCCESS)); 240 } 241 242 /** 243 * notify_summary_waiters() - Wake all the waiters in a given queue. 244 * @allocator: The block allocator summary which owns the queue. 245 * @queue: The queue to notify. 246 */ 247 static void notify_summary_waiters(struct block_allocator *allocator, 248 struct vdo_wait_queue *queue) 249 { 250 int result = (vdo_is_read_only(allocator->depot->vdo) ? 251 VDO_READ_ONLY : VDO_SUCCESS); 252 253 vdo_waitq_notify_all_waiters(queue, NULL, &result); 254 } 255 256 static void launch_write(struct slab_summary_block *summary_block); 257 258 /** 259 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write, 260 * whether or not the attempt succeeded. 261 * @block: The block. 262 */ 263 static void finish_updating_slab_summary_block(struct slab_summary_block *block) 264 { 265 notify_summary_waiters(block->allocator, &block->current_update_waiters); 266 block->writing = false; 267 block->allocator->summary_write_count--; 268 if (vdo_waitq_has_waiters(&block->next_update_waiters)) 269 launch_write(block); 270 else 271 check_summary_drain_complete(block->allocator); 272 } 273 274 /** 275 * finish_update() - This is the callback for a successful summary block write. 276 * @completion: The write vio. 277 */ 278 static void finish_update(struct vdo_completion *completion) 279 { 280 struct slab_summary_block *block = 281 container_of(as_vio(completion), struct slab_summary_block, vio); 282 283 atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written); 284 finish_updating_slab_summary_block(block); 285 } 286 287 /** 288 * handle_write_error() - Handle an error writing a slab summary block. 289 * @completion: The write VIO. 290 */ 291 static void handle_write_error(struct vdo_completion *completion) 292 { 293 struct slab_summary_block *block = 294 container_of(as_vio(completion), struct slab_summary_block, vio); 295 296 vio_record_metadata_io_error(as_vio(completion)); 297 vdo_enter_read_only_mode(completion->vdo, completion->result); 298 finish_updating_slab_summary_block(block); 299 } 300 301 static void write_slab_summary_endio(struct bio *bio) 302 { 303 struct vio *vio = bio->bi_private; 304 struct slab_summary_block *block = 305 container_of(vio, struct slab_summary_block, vio); 306 307 continue_vio_after_io(vio, finish_update, block->allocator->thread_id); 308 } 309 310 /** 311 * launch_write() - Write a slab summary block unless it is currently out for writing. 312 * @block: The block that needs to be committed. 313 */ 314 static void launch_write(struct slab_summary_block *block) 315 { 316 struct block_allocator *allocator = block->allocator; 317 struct slab_depot *depot = allocator->depot; 318 physical_block_number_t pbn; 319 320 if (block->writing) 321 return; 322 323 allocator->summary_write_count++; 324 vdo_waitq_transfer_all_waiters(&block->next_update_waiters, 325 &block->current_update_waiters); 326 block->writing = true; 327 328 if (vdo_is_read_only(depot->vdo)) { 329 finish_updating_slab_summary_block(block); 330 return; 331 } 332 333 memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE); 334 335 /* 336 * Flush before writing to ensure that the slab journal tail blocks and reference updates 337 * covered by this summary update are stable. Otherwise, a subsequent recovery could 338 * encounter a slab summary update that refers to a slab journal tail block that has not 339 * actually been written. In such cases, the slab journal referenced will be treated as 340 * empty, causing any data within the slab which predates the existing recovery journal 341 * entries to be lost. 342 */ 343 pbn = (depot->summary_origin + 344 (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) + 345 block->index); 346 vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio, 347 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH); 348 } 349 350 /** 351 * update_slab_summary_entry() - Update the entry for a slab. 352 * @slab: The slab whose entry is to be updated 353 * @waiter: The waiter that is updating the summary. 354 * @tail_block_offset: The offset of the slab journal's tail block. 355 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load. 356 * @is_clean: Whether the slab is clean. 357 * @free_blocks: The number of free blocks. 358 */ 359 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter, 360 tail_block_offset_t tail_block_offset, 361 bool load_ref_counts, bool is_clean, 362 block_count_t free_blocks) 363 { 364 u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK; 365 struct block_allocator *allocator = slab->allocator; 366 struct slab_summary_block *block = &allocator->summary_blocks[index]; 367 int result; 368 struct slab_summary_entry *entry; 369 370 if (vdo_is_read_only(block->vio.completion.vdo)) { 371 result = VDO_READ_ONLY; 372 waiter->callback(waiter, &result); 373 return; 374 } 375 376 if (vdo_is_state_draining(&allocator->summary_state) || 377 vdo_is_state_quiescent(&allocator->summary_state)) { 378 result = VDO_INVALID_ADMIN_STATE; 379 waiter->callback(waiter, &result); 380 return; 381 } 382 383 entry = &allocator->summary_entries[slab->slab_number]; 384 *entry = (struct slab_summary_entry) { 385 .tail_block_offset = tail_block_offset, 386 .load_ref_counts = (entry->load_ref_counts || load_ref_counts), 387 .is_dirty = !is_clean, 388 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks), 389 }; 390 vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter); 391 launch_write(block); 392 } 393 394 /** 395 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are 396 * complete. 397 * @journal: The journal to be reaped. 398 */ 399 static void finish_reaping(struct slab_journal *journal) 400 { 401 journal->head = journal->unreapable; 402 add_entries(journal); 403 check_if_slab_drained(journal->slab); 404 } 405 406 static void reap_slab_journal(struct slab_journal *journal); 407 408 /** 409 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try 410 * reaping again in case we deferred reaping due to an outstanding vio. 411 * @completion: The flush vio. 412 */ 413 static void complete_reaping(struct vdo_completion *completion) 414 { 415 struct slab_journal *journal = completion->parent; 416 417 return_vio_to_pool(vio_as_pooled_vio(as_vio(completion))); 418 finish_reaping(journal); 419 reap_slab_journal(journal); 420 } 421 422 /** 423 * handle_flush_error() - Handle an error flushing the lower layer. 424 * @completion: The flush vio. 425 */ 426 static void handle_flush_error(struct vdo_completion *completion) 427 { 428 vio_record_metadata_io_error(as_vio(completion)); 429 vdo_enter_read_only_mode(completion->vdo, completion->result); 430 complete_reaping(completion); 431 } 432 433 static void flush_endio(struct bio *bio) 434 { 435 struct vio *vio = bio->bi_private; 436 struct slab_journal *journal = vio->completion.parent; 437 438 continue_vio_after_io(vio, complete_reaping, 439 journal->slab->allocator->thread_id); 440 } 441 442 /** 443 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer 444 * prior to reaping. 445 * @waiter: The journal as a flush waiter. 446 * @context: The newly acquired flush vio. 447 */ 448 static void flush_for_reaping(struct vdo_waiter *waiter, void *context) 449 { 450 struct slab_journal *journal = 451 container_of(waiter, struct slab_journal, flush_waiter); 452 struct pooled_vio *pooled = context; 453 struct vio *vio = &pooled->vio; 454 455 vio->completion.parent = journal; 456 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 457 } 458 459 /** 460 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks. 461 * @journal: The slab journal. 462 */ 463 static void reap_slab_journal(struct slab_journal *journal) 464 { 465 bool reaped = false; 466 467 if (is_reaping(journal)) { 468 /* We already have a reap in progress so wait for it to finish. */ 469 return; 470 } 471 472 if ((journal->slab->status != VDO_SLAB_REBUILT) || 473 !vdo_is_state_normal(&journal->slab->state) || 474 vdo_is_read_only(journal->slab->allocator->depot->vdo)) { 475 /* 476 * We must not reap in the first two cases, and there's no point in read-only mode. 477 */ 478 return; 479 } 480 481 /* 482 * Start reclaiming blocks only when the journal head has no references. Then stop when a 483 * block is referenced or reap reaches the most recently written block, referenced by the 484 * slab summary, which has the sequence number just before the tail. 485 */ 486 while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) { 487 reaped = true; 488 journal->unreapable++; 489 journal->reap_lock++; 490 if (journal->reap_lock == &journal->locks[journal->size]) 491 journal->reap_lock = &journal->locks[0]; 492 } 493 494 if (!reaped) 495 return; 496 497 /* 498 * It is never safe to reap a slab journal block without first issuing a flush, regardless 499 * of whether a user flush has been received or not. In the absence of the flush, the 500 * reference block write which released the locks allowing the slab journal to reap may not 501 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab 502 * journal block writes can be issued while previous slab summary updates have not yet been 503 * made. Even though those slab journal block writes will be ignored if the slab summary 504 * update is not persisted, they may still overwrite the to-be-reaped slab journal block 505 * resulting in a loss of reference count updates. 506 */ 507 journal->flush_waiter.callback = flush_for_reaping; 508 acquire_vio_from_pool(journal->slab->allocator->vio_pool, 509 &journal->flush_waiter); 510 } 511 512 /** 513 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block. 514 * @journal: The slab journal. 515 * @sequence_number: The journal sequence number of the referenced block. 516 * @adjustment: Amount to adjust the reference counter. 517 * 518 * Note that when the adjustment is negative, the slab journal will be reaped. 519 */ 520 static void adjust_slab_journal_block_reference(struct slab_journal *journal, 521 sequence_number_t sequence_number, 522 int adjustment) 523 { 524 struct journal_lock *lock; 525 526 if (sequence_number == 0) 527 return; 528 529 if (journal->slab->status == VDO_SLAB_REPLAYING) { 530 /* Locks should not be used during offline replay. */ 531 return; 532 } 533 534 VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero"); 535 lock = get_lock(journal, sequence_number); 536 if (adjustment < 0) { 537 VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count), 538 "adjustment %d of lock count %u for slab journal block %llu must not underflow", 539 adjustment, lock->count, 540 (unsigned long long) sequence_number); 541 } 542 543 lock->count += adjustment; 544 if (lock->count == 0) 545 reap_slab_journal(journal); 546 } 547 548 /** 549 * release_journal_locks() - Callback invoked after a slab summary update completes. 550 * @waiter: The slab summary waiter that has just been notified. 551 * @context: The result code of the update. 552 * 553 * Registered in the constructor on behalf of update_tail_block_location(). 554 * 555 * Implements waiter_callback_fn. 556 */ 557 static void release_journal_locks(struct vdo_waiter *waiter, void *context) 558 { 559 sequence_number_t first, i; 560 struct slab_journal *journal = 561 container_of(waiter, struct slab_journal, slab_summary_waiter); 562 int result = *((int *) context); 563 564 if (result != VDO_SUCCESS) { 565 if (result != VDO_READ_ONLY) { 566 /* 567 * Don't bother logging what might be lots of errors if we are already in 568 * read-only mode. 569 */ 570 vdo_log_error_strerror(result, "failed slab summary update %llu", 571 (unsigned long long) journal->summarized); 572 } 573 574 journal->updating_slab_summary = false; 575 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 576 check_if_slab_drained(journal->slab); 577 return; 578 } 579 580 if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) { 581 journal->partial_write_in_progress = false; 582 add_entries(journal); 583 } 584 585 first = journal->last_summarized; 586 journal->last_summarized = journal->summarized; 587 for (i = journal->summarized - 1; i >= first; i--) { 588 /* 589 * Release the lock the summarized block held on the recovery journal. (During 590 * replay, recovery_start will always be 0.) 591 */ 592 if (journal->recovery_journal != NULL) { 593 zone_count_t zone_number = journal->slab->allocator->zone_number; 594 struct journal_lock *lock = get_lock(journal, i); 595 596 vdo_release_recovery_journal_block_reference(journal->recovery_journal, 597 lock->recovery_start, 598 VDO_ZONE_TYPE_PHYSICAL, 599 zone_number); 600 } 601 602 /* 603 * Release our own lock against reaping for blocks that are committed. (This 604 * function will not change locks during replay.) 605 */ 606 adjust_slab_journal_block_reference(journal, i, -1); 607 } 608 609 journal->updating_slab_summary = false; 610 611 reap_slab_journal(journal); 612 613 /* Check if the slab summary needs to be updated again. */ 614 update_tail_block_location(journal); 615 } 616 617 /** 618 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary. 619 * @journal: The slab journal that is updating its tail block location. 620 */ 621 static void update_tail_block_location(struct slab_journal *journal) 622 { 623 block_count_t free_block_count; 624 struct vdo_slab *slab = journal->slab; 625 626 if (journal->updating_slab_summary || 627 vdo_is_read_only(journal->slab->allocator->depot->vdo) || 628 (journal->last_summarized >= journal->next_commit)) { 629 check_if_slab_drained(slab); 630 return; 631 } 632 633 if (slab->status != VDO_SLAB_REBUILT) { 634 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint; 635 636 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift; 637 } else { 638 free_block_count = slab->free_blocks; 639 } 640 641 journal->summarized = journal->next_commit; 642 journal->updating_slab_summary = true; 643 644 /* 645 * Update slab summary as dirty. 646 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this 647 * slab have been written to the layer. Therefore, indicate that the ref counts must be 648 * loaded when the journal head has reaped past sequence number 1. 649 */ 650 update_slab_summary_entry(slab, &journal->slab_summary_waiter, 651 journal->summarized % journal->size, 652 (journal->head > 1), false, free_block_count); 653 } 654 655 /** 656 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries. 657 */ 658 static void reopen_slab_journal(struct vdo_slab *slab) 659 { 660 struct slab_journal *journal = &slab->journal; 661 sequence_number_t block; 662 663 VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0, 664 "vdo_slab journal's active block empty before reopening"); 665 journal->head = journal->tail; 666 initialize_journal_state(journal); 667 668 /* Ensure no locks are spuriously held on an empty journal. */ 669 for (block = 1; block <= journal->size; block++) { 670 VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0), 671 "Scrubbed journal's block %llu is not locked", 672 (unsigned long long) block); 673 } 674 675 add_entries(journal); 676 } 677 678 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio) 679 { 680 const struct packed_slab_journal_block *block = 681 (const struct packed_slab_journal_block *) vio->vio.data; 682 683 return __le64_to_cpu(block->header.sequence_number); 684 } 685 686 /** 687 * complete_write() - Handle post-commit processing. 688 * @completion: The write vio as a completion. 689 * 690 * This is the callback registered by write_slab_journal_block(). 691 */ 692 static void complete_write(struct vdo_completion *completion) 693 { 694 int result = completion->result; 695 struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion)); 696 struct slab_journal *journal = completion->parent; 697 sequence_number_t committed = get_committing_sequence_number(pooled); 698 699 list_del_init(&pooled->list_entry); 700 return_vio_to_pool(pooled); 701 702 if (result != VDO_SUCCESS) { 703 vio_record_metadata_io_error(as_vio(completion)); 704 vdo_log_error_strerror(result, "cannot write slab journal block %llu", 705 (unsigned long long) committed); 706 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 707 check_if_slab_drained(journal->slab); 708 return; 709 } 710 711 WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1); 712 713 if (list_empty(&journal->uncommitted_blocks)) { 714 /* If no blocks are outstanding, then the commit point is at the tail. */ 715 journal->next_commit = journal->tail; 716 } else { 717 /* The commit point is always the beginning of the oldest incomplete block. */ 718 pooled = container_of(journal->uncommitted_blocks.next, 719 struct pooled_vio, list_entry); 720 journal->next_commit = get_committing_sequence_number(pooled); 721 } 722 723 update_tail_block_location(journal); 724 } 725 726 static void write_slab_journal_endio(struct bio *bio) 727 { 728 struct vio *vio = bio->bi_private; 729 struct slab_journal *journal = vio->completion.parent; 730 731 continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id); 732 } 733 734 /** 735 * write_slab_journal_block() - Write a slab journal block. 736 * @waiter: The vio pool waiter which was just notified. 737 * @context: The vio pool entry for the write. 738 * 739 * Callback from acquire_vio_from_pool() registered in commit_tail(). 740 */ 741 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context) 742 { 743 struct pooled_vio *pooled = context; 744 struct vio *vio = &pooled->vio; 745 struct slab_journal *journal = 746 container_of(waiter, struct slab_journal, resource_waiter); 747 struct slab_journal_block_header *header = &journal->tail_header; 748 int unused_entries = journal->entries_per_block - header->entry_count; 749 physical_block_number_t block_number; 750 const struct admin_state_code *operation; 751 752 header->head = journal->head; 753 list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks); 754 vdo_pack_slab_journal_block_header(header, &journal->block->header); 755 756 /* Copy the tail block into the vio. */ 757 memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE); 758 759 VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull"); 760 if (unused_entries > 0) { 761 /* 762 * Release the per-entry locks for any unused entries in the block we are about to 763 * write. 764 */ 765 adjust_slab_journal_block_reference(journal, header->sequence_number, 766 -unused_entries); 767 journal->partial_write_in_progress = !block_is_full(journal); 768 } 769 770 block_number = journal->slab->journal_origin + 771 (header->sequence_number % journal->size); 772 vio->completion.parent = journal; 773 774 /* 775 * This block won't be read in recovery until the slab summary is updated to refer to it. 776 * The slab summary update does a flush which is sufficient to protect us from corruption 777 * due to out of order slab journal, reference block, or block map writes. 778 */ 779 vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio, 780 complete_write, REQ_OP_WRITE); 781 782 /* Since the write is submitted, the tail block structure can be reused. */ 783 journal->tail++; 784 initialize_tail_block(journal); 785 journal->waiting_to_commit = false; 786 787 operation = vdo_get_admin_state_code(&journal->slab->state); 788 if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) { 789 vdo_finish_operation(&journal->slab->state, 790 (vdo_is_read_only(journal->slab->allocator->depot->vdo) ? 791 VDO_READ_ONLY : VDO_SUCCESS)); 792 return; 793 } 794 795 add_entries(journal); 796 } 797 798 /** 799 * commit_tail() - Commit the tail block of the slab journal. 800 * @journal: The journal whose tail block should be committed. 801 */ 802 static void commit_tail(struct slab_journal *journal) 803 { 804 if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) { 805 /* 806 * There are no entries at the moment, but there are some waiters, so defer 807 * initiating the flush until those entries are ready to write. 808 */ 809 return; 810 } 811 812 if (vdo_is_read_only(journal->slab->allocator->depot->vdo) || 813 journal->waiting_to_commit || 814 (journal->tail_header.entry_count == 0)) { 815 /* 816 * There is nothing to do since the tail block is empty, or writing, or the journal 817 * is in read-only mode. 818 */ 819 return; 820 } 821 822 /* 823 * Since we are about to commit the tail block, this journal no longer needs to be on the 824 * list of journals which the recovery journal might ask to commit. 825 */ 826 mark_slab_journal_clean(journal); 827 828 journal->waiting_to_commit = true; 829 830 journal->resource_waiter.callback = write_slab_journal_block; 831 acquire_vio_from_pool(journal->slab->allocator->vio_pool, 832 &journal->resource_waiter); 833 } 834 835 /** 836 * encode_slab_journal_entry() - Encode a slab journal entry. 837 * @tail_header: The unpacked header for the block. 838 * @payload: The journal block payload to hold the entry. 839 * @sbn: The slab block number of the entry to encode. 840 * @operation: The type of the entry. 841 * @increment: True if this is an increment. 842 * 843 * Exposed for unit tests. 844 */ 845 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header, 846 slab_journal_payload *payload, 847 slab_block_number sbn, 848 enum journal_operation operation, 849 bool increment) 850 { 851 journal_entry_count_t entry_number = tail_header->entry_count++; 852 853 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 854 if (!tail_header->has_block_map_increments) { 855 memset(payload->full_entries.entry_types, 0, 856 VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE); 857 tail_header->has_block_map_increments = true; 858 } 859 860 payload->full_entries.entry_types[entry_number / 8] |= 861 ((u8)1 << (entry_number % 8)); 862 } 863 864 vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment); 865 } 866 867 /** 868 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an 869 * increment and a decrement to a single point which refers to one or the 870 * other. 871 * @recovery_point: The journal point to convert. 872 * @increment: Whether the current entry is an increment. 873 * 874 * Return: The expanded journal point 875 * 876 * Because each data_vio has but a single recovery journal point, but may need to make both 877 * increment and decrement entries in the same slab journal. In order to distinguish the two 878 * entries, the entry count of the expanded journal point is twice the actual recovery journal 879 * entry count for increments, and one more than that for decrements. 880 */ 881 static struct journal_point expand_journal_point(struct journal_point recovery_point, 882 bool increment) 883 { 884 recovery_point.entry_count *= 2; 885 if (!increment) 886 recovery_point.entry_count++; 887 888 return recovery_point; 889 } 890 891 /** 892 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a 893 * block becomes full. 894 * @journal: The slab journal to append to. 895 * @pbn: The pbn being adjusted. 896 * @operation: The type of entry to make. 897 * @increment: True if this is an increment. 898 * @recovery_point: The expanded recovery point. 899 * 900 * This function is synchronous. 901 */ 902 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn, 903 enum journal_operation operation, bool increment, 904 struct journal_point recovery_point) 905 { 906 struct packed_slab_journal_block *block = journal->block; 907 int result; 908 909 result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point, 910 &recovery_point), 911 "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u", 912 (unsigned long long) recovery_point.sequence_number, 913 recovery_point.entry_count, 914 (unsigned long long) journal->tail_header.recovery_point.sequence_number, 915 journal->tail_header.recovery_point.entry_count); 916 if (result != VDO_SUCCESS) { 917 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 918 return; 919 } 920 921 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 922 result = VDO_ASSERT((journal->tail_header.entry_count < 923 journal->full_entries_per_block), 924 "block has room for full entries"); 925 if (result != VDO_SUCCESS) { 926 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, 927 result); 928 return; 929 } 930 } 931 932 encode_slab_journal_entry(&journal->tail_header, &block->payload, 933 pbn - journal->slab->start, operation, increment); 934 journal->tail_header.recovery_point = recovery_point; 935 if (block_is_full(journal)) 936 commit_tail(journal); 937 } 938 939 static inline block_count_t journal_length(const struct slab_journal *journal) 940 { 941 return journal->tail - journal->head; 942 } 943 944 /** 945 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal. 946 * @slab: The slab to play into. 947 * @pbn: The PBN for the entry. 948 * @operation: The type of entry to add. 949 * @increment: True if this entry is an increment. 950 * @recovery_point: The recovery journal point corresponding to this entry. 951 * @parent: The completion to notify when there is space to add the entry if the entry could not be 952 * added immediately. 953 * 954 * Return: true if the entry was added immediately. 955 */ 956 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn, 957 enum journal_operation operation, bool increment, 958 struct journal_point *recovery_point, 959 struct vdo_completion *parent) 960 { 961 struct slab_journal *journal = &slab->journal; 962 struct slab_journal_block_header *header = &journal->tail_header; 963 struct journal_point expanded = expand_journal_point(*recovery_point, increment); 964 965 /* Only accept entries after the current recovery point. */ 966 if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded)) 967 return true; 968 969 if ((header->entry_count >= journal->full_entries_per_block) && 970 (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) { 971 /* 972 * The tail block does not have room for the entry we are attempting to add so 973 * commit the tail block now. 974 */ 975 commit_tail(journal); 976 } 977 978 if (journal->waiting_to_commit) { 979 vdo_start_operation_with_waiter(&journal->slab->state, 980 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY, 981 parent, NULL); 982 return false; 983 } 984 985 if (journal_length(journal) >= journal->size) { 986 /* 987 * We must have reaped the current head before the crash, since the blocked 988 * threshold keeps us from having more entries than fit in a slab journal; hence we 989 * can just advance the head (and unreapable block), as needed. 990 */ 991 journal->head++; 992 journal->unreapable++; 993 } 994 995 if (journal->slab->status == VDO_SLAB_REBUILT) 996 journal->slab->status = VDO_SLAB_REPLAYING; 997 998 add_entry(journal, pbn, operation, increment, expanded); 999 return true; 1000 } 1001 1002 /** 1003 * requires_reaping() - Check whether the journal must be reaped before adding new entries. 1004 * @journal: The journal to check. 1005 * 1006 * Return: true if the journal must be reaped. 1007 */ 1008 static bool requires_reaping(const struct slab_journal *journal) 1009 { 1010 return (journal_length(journal) >= journal->blocking_threshold); 1011 } 1012 1013 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */ 1014 static void finish_summary_update(struct vdo_waiter *waiter, void *context) 1015 { 1016 struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter); 1017 int result = *((int *) context); 1018 1019 slab->active_count--; 1020 1021 if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) { 1022 vdo_log_error_strerror(result, "failed to update slab summary"); 1023 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result); 1024 } 1025 1026 check_if_slab_drained(slab); 1027 } 1028 1029 static void write_reference_block(struct vdo_waiter *waiter, void *context); 1030 1031 /** 1032 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring 1033 * a VIO for it from the pool. 1034 * @waiter: The waiter of the block which is starting to write. 1035 * @context: The parent slab of the block. 1036 * 1037 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are 1038 * currently in use. 1039 */ 1040 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context) 1041 { 1042 struct vdo_slab *slab = context; 1043 1044 if (vdo_is_read_only(slab->allocator->depot->vdo)) 1045 return; 1046 1047 slab->active_count++; 1048 container_of(waiter, struct reference_block, waiter)->is_writing = true; 1049 waiter->callback = write_reference_block; 1050 acquire_vio_from_pool(slab->allocator->vio_pool, waiter); 1051 } 1052 1053 static void save_dirty_reference_blocks(struct vdo_slab *slab) 1054 { 1055 vdo_waitq_notify_all_waiters(&slab->dirty_blocks, 1056 launch_reference_block_write, slab); 1057 check_if_slab_drained(slab); 1058 } 1059 1060 /** 1061 * finish_reference_block_write() - After a reference block has written, clean it, release its 1062 * locks, and return its VIO to the pool. 1063 * @completion: The VIO that just finished writing. 1064 */ 1065 static void finish_reference_block_write(struct vdo_completion *completion) 1066 { 1067 struct vio *vio = as_vio(completion); 1068 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1069 struct reference_block *block = completion->parent; 1070 struct vdo_slab *slab = block->slab; 1071 tail_block_offset_t offset; 1072 1073 slab->active_count--; 1074 1075 /* Release the slab journal lock. */ 1076 adjust_slab_journal_block_reference(&slab->journal, 1077 block->slab_journal_lock_to_release, -1); 1078 return_vio_to_pool(pooled); 1079 1080 /* 1081 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause 1082 * us to be dirtied again, but we don't want to double enqueue. 1083 */ 1084 block->is_writing = false; 1085 1086 if (vdo_is_read_only(completion->vdo)) { 1087 check_if_slab_drained(slab); 1088 return; 1089 } 1090 1091 /* Re-queue the block if it was re-dirtied while it was writing. */ 1092 if (block->is_dirty) { 1093 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter); 1094 if (vdo_is_state_draining(&slab->state)) { 1095 /* We must be saving, and this block will otherwise not be relaunched. */ 1096 save_dirty_reference_blocks(slab); 1097 } 1098 1099 return; 1100 } 1101 1102 /* 1103 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks 1104 * and no summary update in progress. 1105 */ 1106 if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) { 1107 check_if_slab_drained(slab); 1108 return; 1109 } 1110 1111 offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 1112 slab->active_count++; 1113 slab->summary_waiter.callback = finish_summary_update; 1114 update_slab_summary_entry(slab, &slab->summary_waiter, offset, 1115 true, true, slab->free_blocks); 1116 } 1117 1118 /** 1119 * get_reference_counters_for_block() - Find the reference counters for a given block. 1120 * @block: The reference_block in question. 1121 * 1122 * Return: A pointer to the reference counters for this block. 1123 */ 1124 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block) 1125 { 1126 size_t block_index = block - block->slab->reference_blocks; 1127 1128 return &block->slab->counters[block_index * COUNTS_PER_BLOCK]; 1129 } 1130 1131 /** 1132 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out. 1133 * @block: The block to copy. 1134 * @buffer: The char buffer to fill with the packed block. 1135 */ 1136 static void pack_reference_block(struct reference_block *block, void *buffer) 1137 { 1138 struct packed_reference_block *packed = buffer; 1139 vdo_refcount_t *counters = get_reference_counters_for_block(block); 1140 sector_count_t i; 1141 struct packed_journal_point commit_point; 1142 1143 vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point); 1144 1145 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) { 1146 packed->sectors[i].commit_point = commit_point; 1147 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR), 1148 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR)); 1149 } 1150 } 1151 1152 static void write_reference_block_endio(struct bio *bio) 1153 { 1154 struct vio *vio = bio->bi_private; 1155 struct reference_block *block = vio->completion.parent; 1156 thread_id_t thread_id = block->slab->allocator->thread_id; 1157 1158 continue_vio_after_io(vio, finish_reference_block_write, thread_id); 1159 } 1160 1161 /** 1162 * handle_io_error() - Handle an I/O error reading or writing a reference count block. 1163 * @completion: The VIO doing the I/O as a completion. 1164 */ 1165 static void handle_io_error(struct vdo_completion *completion) 1166 { 1167 int result = completion->result; 1168 struct vio *vio = as_vio(completion); 1169 struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab; 1170 1171 vio_record_metadata_io_error(vio); 1172 return_vio_to_pool(vio_as_pooled_vio(vio)); 1173 slab->active_count -= vio->io_size / VDO_BLOCK_SIZE; 1174 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result); 1175 check_if_slab_drained(slab); 1176 } 1177 1178 /** 1179 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy 1180 * its counters and associated data into the VIO, and launch the write. 1181 * @waiter: The waiter of the dirty block. 1182 * @context: The VIO returned by the pool. 1183 */ 1184 static void write_reference_block(struct vdo_waiter *waiter, void *context) 1185 { 1186 size_t block_offset; 1187 physical_block_number_t pbn; 1188 struct pooled_vio *pooled = context; 1189 struct vdo_completion *completion = &pooled->vio.completion; 1190 struct reference_block *block = container_of(waiter, struct reference_block, 1191 waiter); 1192 1193 pack_reference_block(block, pooled->vio.data); 1194 block_offset = (block - block->slab->reference_blocks); 1195 pbn = (block->slab->ref_counts_origin + block_offset); 1196 block->slab_journal_lock_to_release = block->slab_journal_lock; 1197 completion->parent = block; 1198 1199 /* 1200 * Mark the block as clean, since we won't be committing any updates that happen after this 1201 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not 1202 * cause complications. 1203 */ 1204 block->is_dirty = false; 1205 1206 /* 1207 * Flush before writing to ensure that the recovery journal and slab journal entries which 1208 * cover this reference update are stable. This prevents data corruption that can be caused 1209 * by out of order writes. 1210 */ 1211 WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written, 1212 block->slab->allocator->ref_counts_statistics.blocks_written + 1); 1213 1214 completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id; 1215 vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio, 1216 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH); 1217 } 1218 1219 static void reclaim_journal_space(struct slab_journal *journal) 1220 { 1221 block_count_t length = journal_length(journal); 1222 struct vdo_slab *slab = journal->slab; 1223 block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks); 1224 block_count_t written; 1225 1226 if ((length < journal->flushing_threshold) || (write_count == 0)) 1227 return; 1228 1229 /* The slab journal is over the first threshold, schedule some reference block writes. */ 1230 WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1); 1231 if (length < journal->flushing_deadline) { 1232 /* Schedule more writes the closer to the deadline we get. */ 1233 write_count /= journal->flushing_deadline - length + 1; 1234 write_count = max_t(block_count_t, write_count, 1); 1235 } 1236 1237 for (written = 0; written < write_count; written++) { 1238 vdo_waitq_notify_next_waiter(&slab->dirty_blocks, 1239 launch_reference_block_write, slab); 1240 } 1241 } 1242 1243 /** 1244 * reference_count_to_status() - Convert a reference count to a reference status. 1245 * @count: The count to convert. 1246 * 1247 * Return: The appropriate reference status. 1248 */ 1249 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count) 1250 { 1251 if (count == EMPTY_REFERENCE_COUNT) 1252 return RS_FREE; 1253 else if (count == 1) 1254 return RS_SINGLE; 1255 else if (count == PROVISIONAL_REFERENCE_COUNT) 1256 return RS_PROVISIONAL; 1257 else 1258 return RS_SHARED; 1259 } 1260 1261 /** 1262 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue 1263 * if it wasn't already dirty. 1264 * @block: The reference block to mark as dirty. 1265 */ 1266 static void dirty_block(struct reference_block *block) 1267 { 1268 if (block->is_dirty) 1269 return; 1270 1271 block->is_dirty = true; 1272 if (!block->is_writing) 1273 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter); 1274 } 1275 1276 /** 1277 * get_reference_block() - Get the reference block that covers the given block index. 1278 */ 1279 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab, 1280 slab_block_number index) 1281 { 1282 return &slab->reference_blocks[index / COUNTS_PER_BLOCK]; 1283 } 1284 1285 /** 1286 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical 1287 * block number. 1288 * @slab: The slab. 1289 * @pbn: The physical block number. 1290 * @slab_block_number_ptr: A pointer to the slab block number. 1291 * 1292 * Return: VDO_SUCCESS or an error code. 1293 */ 1294 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab, 1295 physical_block_number_t pbn, 1296 slab_block_number *slab_block_number_ptr) 1297 { 1298 u64 slab_block_number; 1299 1300 if (pbn < slab->start) 1301 return VDO_OUT_OF_RANGE; 1302 1303 slab_block_number = pbn - slab->start; 1304 if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks) 1305 return VDO_OUT_OF_RANGE; 1306 1307 *slab_block_number_ptr = slab_block_number; 1308 return VDO_SUCCESS; 1309 } 1310 1311 /** 1312 * get_reference_counter() - Get the reference counter that covers the given physical block number. 1313 * @slab: The slab to query. 1314 * @pbn: The physical block number. 1315 * @counter_ptr: A pointer to the reference counter. 1316 */ 1317 static int __must_check get_reference_counter(struct vdo_slab *slab, 1318 physical_block_number_t pbn, 1319 vdo_refcount_t **counter_ptr) 1320 { 1321 slab_block_number index; 1322 int result = slab_block_number_from_pbn(slab, pbn, &index); 1323 1324 if (result != VDO_SUCCESS) 1325 return result; 1326 1327 *counter_ptr = &slab->counters[index]; 1328 1329 return VDO_SUCCESS; 1330 } 1331 1332 static unsigned int calculate_slab_priority(struct vdo_slab *slab) 1333 { 1334 block_count_t free_blocks = slab->free_blocks; 1335 unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority; 1336 unsigned int priority; 1337 1338 /* 1339 * Wholly full slabs must be the only ones with lowest priority, 0. 1340 * 1341 * Slabs that have never been opened (empty, newly initialized, and never been written to) 1342 * have lower priority than previously opened slabs that have a significant number of free 1343 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time 1344 * unless there are very few free blocks that have been previously written to. 1345 * 1346 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO 1347 * a better client of any underlying storage that is thinly-provisioned (though discarding 1348 * would be better). 1349 * 1350 * For all other slabs, the priority is derived from the logarithm of the number of free 1351 * blocks. Slabs with the same order of magnitude of free blocks have the same priority. 1352 * With 2^23 blocks, the priority will range from 1 to 25. The reserved 1353 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping. 1354 */ 1355 1356 if (free_blocks == 0) 1357 return 0; 1358 1359 if (is_slab_journal_blank(slab)) 1360 return unopened_slab_priority; 1361 1362 priority = (1 + ilog2(free_blocks)); 1363 return ((priority < unopened_slab_priority) ? priority : priority + 1); 1364 } 1365 1366 /* 1367 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab 1368 * so slabs with lots of free blocks will be opened for allocation before slabs that have few free 1369 * blocks. 1370 */ 1371 static void prioritize_slab(struct vdo_slab *slab) 1372 { 1373 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry), 1374 "a slab must not already be on a list when prioritizing"); 1375 slab->priority = calculate_slab_priority(slab); 1376 vdo_priority_table_enqueue(slab->allocator->prioritized_slabs, 1377 slab->priority, &slab->allocq_entry); 1378 } 1379 1380 /** 1381 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab. 1382 * @incremented: true if the free block count went up. 1383 */ 1384 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented) 1385 { 1386 struct block_allocator *allocator = slab->allocator; 1387 1388 WRITE_ONCE(allocator->allocated_blocks, 1389 allocator->allocated_blocks + (incremented ? -1 : 1)); 1390 1391 /* The open slab doesn't need to be reprioritized until it is closed. */ 1392 if (slab == allocator->open_slab) 1393 return; 1394 1395 /* Don't bother adjusting the priority table if unneeded. */ 1396 if (slab->priority == calculate_slab_priority(slab)) 1397 return; 1398 1399 /* 1400 * Reprioritize the slab to reflect the new free block count by removing it from the table 1401 * and re-enqueuing it with the new priority. 1402 */ 1403 vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry); 1404 prioritize_slab(slab); 1405 } 1406 1407 /** 1408 * increment_for_data() - Increment the reference count for a data block. 1409 * @slab: The slab which owns the block. 1410 * @block: The reference block which contains the block being updated. 1411 * @block_number: The block to update. 1412 * @old_status: The reference status of the data block before this increment. 1413 * @lock: The pbn_lock associated with this increment (may be NULL). 1414 * @counter_ptr: A pointer to the count for the data block (in, out). 1415 * @adjust_block_count: Whether to update the allocator's free block count. 1416 * 1417 * Return: VDO_SUCCESS or an error. 1418 */ 1419 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block, 1420 slab_block_number block_number, 1421 enum reference_status old_status, 1422 struct pbn_lock *lock, vdo_refcount_t *counter_ptr, 1423 bool adjust_block_count) 1424 { 1425 switch (old_status) { 1426 case RS_FREE: 1427 *counter_ptr = 1; 1428 block->allocated_count++; 1429 slab->free_blocks--; 1430 if (adjust_block_count) 1431 adjust_free_block_count(slab, false); 1432 1433 break; 1434 1435 case RS_PROVISIONAL: 1436 *counter_ptr = 1; 1437 break; 1438 1439 default: 1440 /* Single or shared */ 1441 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) { 1442 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1443 "Incrementing a block already having 254 references (slab %u, offset %u)", 1444 slab->slab_number, block_number); 1445 } 1446 (*counter_ptr)++; 1447 } 1448 1449 if (lock != NULL) 1450 vdo_unassign_pbn_lock_provisional_reference(lock); 1451 return VDO_SUCCESS; 1452 } 1453 1454 /** 1455 * decrement_for_data() - Decrement the reference count for a data block. 1456 * @slab: The slab which owns the block. 1457 * @block: The reference block which contains the block being updated. 1458 * @block_number: The block to update. 1459 * @old_status: The reference status of the data block before this decrement. 1460 * @updater: The reference updater doing this operation in case we need to look up the pbn lock. 1461 * @counter_ptr: A pointer to the count for the data block (in, out). 1462 * @adjust_block_count: Whether to update the allocator's free block count. 1463 * 1464 * Return: VDO_SUCCESS or an error. 1465 */ 1466 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block, 1467 slab_block_number block_number, 1468 enum reference_status old_status, 1469 struct reference_updater *updater, 1470 vdo_refcount_t *counter_ptr, bool adjust_block_count) 1471 { 1472 switch (old_status) { 1473 case RS_FREE: 1474 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1475 "Decrementing free block at offset %u in slab %u", 1476 block_number, slab->slab_number); 1477 1478 case RS_PROVISIONAL: 1479 case RS_SINGLE: 1480 if (updater->zpbn.zone != NULL) { 1481 struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone, 1482 updater->zpbn.pbn); 1483 1484 if (lock != NULL) { 1485 /* 1486 * There is a read lock on this block, so the block must not become 1487 * unreferenced. 1488 */ 1489 *counter_ptr = PROVISIONAL_REFERENCE_COUNT; 1490 vdo_assign_pbn_lock_provisional_reference(lock); 1491 break; 1492 } 1493 } 1494 1495 *counter_ptr = EMPTY_REFERENCE_COUNT; 1496 block->allocated_count--; 1497 slab->free_blocks++; 1498 if (adjust_block_count) 1499 adjust_free_block_count(slab, true); 1500 1501 break; 1502 1503 default: 1504 /* Shared */ 1505 (*counter_ptr)--; 1506 } 1507 1508 return VDO_SUCCESS; 1509 } 1510 1511 /** 1512 * increment_for_block_map() - Increment the reference count for a block map page. 1513 * @slab: The slab which owns the block. 1514 * @block: The reference block which contains the block being updated. 1515 * @block_number: The block to update. 1516 * @old_status: The reference status of the block before this increment. 1517 * @lock: The pbn_lock associated with this increment (may be NULL). 1518 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild. 1519 * @counter_ptr: A pointer to the count for the block (in, out). 1520 * @adjust_block_count: Whether to update the allocator's free block count. 1521 * 1522 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map 1523 * blocks never dedupe they should never be adjusted from any other state. The adjustment always 1524 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map 1525 * blocks. 1526 * 1527 * Return: VDO_SUCCESS or an error. 1528 */ 1529 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block, 1530 slab_block_number block_number, 1531 enum reference_status old_status, 1532 struct pbn_lock *lock, bool normal_operation, 1533 vdo_refcount_t *counter_ptr, bool adjust_block_count) 1534 { 1535 switch (old_status) { 1536 case RS_FREE: 1537 if (normal_operation) { 1538 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1539 "Incrementing unallocated block map block (slab %u, offset %u)", 1540 slab->slab_number, block_number); 1541 } 1542 1543 *counter_ptr = MAXIMUM_REFERENCE_COUNT; 1544 block->allocated_count++; 1545 slab->free_blocks--; 1546 if (adjust_block_count) 1547 adjust_free_block_count(slab, false); 1548 1549 return VDO_SUCCESS; 1550 1551 case RS_PROVISIONAL: 1552 if (!normal_operation) 1553 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1554 "Block map block had provisional reference during replay (slab %u, offset %u)", 1555 slab->slab_number, block_number); 1556 1557 *counter_ptr = MAXIMUM_REFERENCE_COUNT; 1558 if (lock != NULL) 1559 vdo_unassign_pbn_lock_provisional_reference(lock); 1560 return VDO_SUCCESS; 1561 1562 default: 1563 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1564 "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)", 1565 *counter_ptr, slab->slab_number, 1566 block_number); 1567 } 1568 } 1569 1570 static bool __must_check is_valid_journal_point(const struct journal_point *point) 1571 { 1572 return ((point != NULL) && (point->sequence_number > 0)); 1573 } 1574 1575 /** 1576 * update_reference_count() - Update the reference count of a block. 1577 * @slab: The slab which owns the block. 1578 * @block: The reference block which contains the block being updated. 1579 * @block_number: The block to update. 1580 * @slab_journal_point: The slab journal point at which this update is journaled. 1581 * @updater: The reference updater. 1582 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild. 1583 * @adjust_block_count: Whether to update the slab's free block count. 1584 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement 1585 * of a provisional reference. 1586 * 1587 * Return: VDO_SUCCESS or an error. 1588 */ 1589 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block, 1590 slab_block_number block_number, 1591 const struct journal_point *slab_journal_point, 1592 struct reference_updater *updater, 1593 bool normal_operation, bool adjust_block_count, 1594 bool *provisional_decrement_ptr) 1595 { 1596 vdo_refcount_t *counter_ptr = &slab->counters[block_number]; 1597 enum reference_status old_status = reference_count_to_status(*counter_ptr); 1598 int result; 1599 1600 if (!updater->increment) { 1601 result = decrement_for_data(slab, block, block_number, old_status, 1602 updater, counter_ptr, adjust_block_count); 1603 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) { 1604 if (provisional_decrement_ptr != NULL) 1605 *provisional_decrement_ptr = true; 1606 return VDO_SUCCESS; 1607 } 1608 } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) { 1609 result = increment_for_data(slab, block, block_number, old_status, 1610 updater->lock, counter_ptr, adjust_block_count); 1611 } else { 1612 result = increment_for_block_map(slab, block, block_number, old_status, 1613 updater->lock, normal_operation, 1614 counter_ptr, adjust_block_count); 1615 } 1616 1617 if (result != VDO_SUCCESS) 1618 return result; 1619 1620 if (is_valid_journal_point(slab_journal_point)) 1621 slab->slab_journal_point = *slab_journal_point; 1622 1623 return VDO_SUCCESS; 1624 } 1625 1626 static int __must_check adjust_reference_count(struct vdo_slab *slab, 1627 struct reference_updater *updater, 1628 const struct journal_point *slab_journal_point) 1629 { 1630 slab_block_number block_number; 1631 int result; 1632 struct reference_block *block; 1633 bool provisional_decrement = false; 1634 1635 if (!is_slab_open(slab)) 1636 return VDO_INVALID_ADMIN_STATE; 1637 1638 result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number); 1639 if (result != VDO_SUCCESS) 1640 return result; 1641 1642 block = get_reference_block(slab, block_number); 1643 result = update_reference_count(slab, block, block_number, slab_journal_point, 1644 updater, NORMAL_OPERATION, true, 1645 &provisional_decrement); 1646 if ((result != VDO_SUCCESS) || provisional_decrement) 1647 return result; 1648 1649 if (block->is_dirty && (block->slab_journal_lock > 0)) { 1650 sequence_number_t entry_lock = slab_journal_point->sequence_number; 1651 /* 1652 * This block is already dirty and a slab journal entry has been made for it since 1653 * the last time it was clean. We must release the per-entry slab journal lock for 1654 * the entry associated with the update we are now doing. 1655 */ 1656 result = VDO_ASSERT(is_valid_journal_point(slab_journal_point), 1657 "Reference count adjustments need slab journal points."); 1658 if (result != VDO_SUCCESS) 1659 return result; 1660 1661 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1); 1662 return VDO_SUCCESS; 1663 } 1664 1665 /* 1666 * This may be the first time we are applying an update for which there is a slab journal 1667 * entry to this block since the block was cleaned. Therefore, we convert the per-entry 1668 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock. 1669 */ 1670 if (is_valid_journal_point(slab_journal_point)) 1671 block->slab_journal_lock = slab_journal_point->sequence_number; 1672 else 1673 block->slab_journal_lock = 0; 1674 1675 dirty_block(block); 1676 return VDO_SUCCESS; 1677 } 1678 1679 /** 1680 * add_entry_from_waiter() - Add an entry to the slab journal. 1681 * @waiter: The vio which should make an entry now. 1682 * @context: The slab journal to make an entry in. 1683 * 1684 * This callback is invoked by add_entries() once it has determined that we are ready to make 1685 * another entry in the slab journal. Implements waiter_callback_fn. 1686 */ 1687 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context) 1688 { 1689 int result; 1690 struct reference_updater *updater = 1691 container_of(waiter, struct reference_updater, waiter); 1692 struct data_vio *data_vio = data_vio_from_reference_updater(updater); 1693 struct slab_journal *journal = context; 1694 struct slab_journal_block_header *header = &journal->tail_header; 1695 struct journal_point slab_journal_point = { 1696 .sequence_number = header->sequence_number, 1697 .entry_count = header->entry_count, 1698 }; 1699 sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number; 1700 1701 if (header->entry_count == 0) { 1702 /* 1703 * This is the first entry in the current tail block, so get a lock on the recovery 1704 * journal which we will hold until this tail block is committed. 1705 */ 1706 get_lock(journal, header->sequence_number)->recovery_start = recovery_block; 1707 if (journal->recovery_journal != NULL) { 1708 zone_count_t zone_number = journal->slab->allocator->zone_number; 1709 1710 vdo_acquire_recovery_journal_block_reference(journal->recovery_journal, 1711 recovery_block, 1712 VDO_ZONE_TYPE_PHYSICAL, 1713 zone_number); 1714 } 1715 1716 mark_slab_journal_dirty(journal, recovery_block); 1717 reclaim_journal_space(journal); 1718 } 1719 1720 add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment, 1721 expand_journal_point(data_vio->recovery_journal_point, 1722 updater->increment)); 1723 1724 if (journal->slab->status != VDO_SLAB_REBUILT) { 1725 /* 1726 * If the slab is unrecovered, scrubbing will take care of the count since the 1727 * update is now recorded in the journal. 1728 */ 1729 adjust_slab_journal_block_reference(journal, 1730 slab_journal_point.sequence_number, -1); 1731 result = VDO_SUCCESS; 1732 } else { 1733 /* Now that an entry has been made in the slab journal, update the counter. */ 1734 result = adjust_reference_count(journal->slab, updater, 1735 &slab_journal_point); 1736 } 1737 1738 if (updater->increment) 1739 continue_data_vio_with_error(data_vio, result); 1740 else 1741 vdo_continue_completion(&data_vio->decrement_completion, result); 1742 } 1743 1744 /** 1745 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map 1746 * increment. 1747 * @journal: The journal. 1748 * 1749 * Return: true if the first entry waiter's operation is a block map increment. 1750 */ 1751 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal) 1752 { 1753 struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters); 1754 struct reference_updater *updater = 1755 container_of(waiter, struct reference_updater, waiter); 1756 1757 return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING); 1758 } 1759 1760 /** 1761 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries. 1762 * @journal: The journal to which entries may be added. 1763 * 1764 * By processing the queue in order, we ensure that slab journal entries are made in the same order 1765 * as recovery journal entries for the same increment or decrement. 1766 */ 1767 static void add_entries(struct slab_journal *journal) 1768 { 1769 if (journal->adding_entries) { 1770 /* Protect against re-entrancy. */ 1771 return; 1772 } 1773 1774 journal->adding_entries = true; 1775 while (vdo_waitq_has_waiters(&journal->entry_waiters)) { 1776 struct slab_journal_block_header *header = &journal->tail_header; 1777 1778 if (journal->partial_write_in_progress || 1779 (journal->slab->status == VDO_SLAB_REBUILDING)) { 1780 /* 1781 * Don't add entries while rebuilding or while a partial write is 1782 * outstanding, as it could result in reference count corruption. 1783 */ 1784 break; 1785 } 1786 1787 if (journal->waiting_to_commit) { 1788 /* 1789 * If we are waiting for resources to write the tail block, and the tail 1790 * block is full, we can't make another entry. 1791 */ 1792 WRITE_ONCE(journal->events->tail_busy_count, 1793 journal->events->tail_busy_count + 1); 1794 break; 1795 } else if (is_next_entry_a_block_map_increment(journal) && 1796 (header->entry_count >= journal->full_entries_per_block)) { 1797 /* 1798 * The tail block does not have room for a block map increment, so commit 1799 * it now. 1800 */ 1801 commit_tail(journal); 1802 if (journal->waiting_to_commit) { 1803 WRITE_ONCE(journal->events->tail_busy_count, 1804 journal->events->tail_busy_count + 1); 1805 break; 1806 } 1807 } 1808 1809 /* If the slab is over the blocking threshold, make the vio wait. */ 1810 if (requires_reaping(journal)) { 1811 WRITE_ONCE(journal->events->blocked_count, 1812 journal->events->blocked_count + 1); 1813 save_dirty_reference_blocks(journal->slab); 1814 break; 1815 } 1816 1817 if (header->entry_count == 0) { 1818 struct journal_lock *lock = 1819 get_lock(journal, header->sequence_number); 1820 1821 /* 1822 * Check if the on disk slab journal is full. Because of the blocking and 1823 * scrubbing thresholds, this should never happen. 1824 */ 1825 if (lock->count > 0) { 1826 VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail, 1827 "New block has locks, but journal is not full"); 1828 1829 /* 1830 * The blocking threshold must let the journal fill up if the new 1831 * block has locks; if the blocking threshold is smaller than the 1832 * journal size, the new block cannot possibly have locks already. 1833 */ 1834 VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size), 1835 "New block can have locks already iff blocking threshold is at the end of the journal"); 1836 1837 WRITE_ONCE(journal->events->disk_full_count, 1838 journal->events->disk_full_count + 1); 1839 save_dirty_reference_blocks(journal->slab); 1840 break; 1841 } 1842 1843 /* 1844 * Don't allow the new block to be reaped until all of the reference count 1845 * blocks are written and the journal block has been fully committed as 1846 * well. 1847 */ 1848 lock->count = journal->entries_per_block + 1; 1849 1850 if (header->sequence_number == 1) { 1851 struct vdo_slab *slab = journal->slab; 1852 block_count_t i; 1853 1854 /* 1855 * This is the first entry in this slab journal, ever. Dirty all of 1856 * the reference count blocks. Each will acquire a lock on the tail 1857 * block so that the journal won't be reaped until the reference 1858 * counts are initialized. The lock acquisition must be done by the 1859 * ref_counts since here we don't know how many reference blocks 1860 * the ref_counts has. 1861 */ 1862 for (i = 0; i < slab->reference_block_count; i++) { 1863 slab->reference_blocks[i].slab_journal_lock = 1; 1864 dirty_block(&slab->reference_blocks[i]); 1865 } 1866 1867 adjust_slab_journal_block_reference(journal, 1, 1868 slab->reference_block_count); 1869 } 1870 } 1871 1872 vdo_waitq_notify_next_waiter(&journal->entry_waiters, 1873 add_entry_from_waiter, journal); 1874 } 1875 1876 journal->adding_entries = false; 1877 1878 /* If there are no waiters, and we are flushing or saving, commit the tail block. */ 1879 if (vdo_is_state_draining(&journal->slab->state) && 1880 !vdo_is_state_suspending(&journal->slab->state) && 1881 !vdo_waitq_has_waiters(&journal->entry_waiters)) 1882 commit_tail(journal); 1883 } 1884 1885 /** 1886 * reset_search_cursor() - Reset the free block search back to the first reference counter in the 1887 * first reference block of a slab. 1888 */ 1889 static void reset_search_cursor(struct vdo_slab *slab) 1890 { 1891 struct search_cursor *cursor = &slab->search_cursor; 1892 1893 cursor->block = cursor->first_block; 1894 cursor->index = 0; 1895 /* Unit tests have slabs with only one reference block (and it's a runt). */ 1896 cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count); 1897 } 1898 1899 /** 1900 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in 1901 * a slab, 1902 * 1903 * Wraps around to the first reference block if the current block is the last reference block. 1904 * 1905 * Return: true unless the cursor was at the last reference block. 1906 */ 1907 static bool advance_search_cursor(struct vdo_slab *slab) 1908 { 1909 struct search_cursor *cursor = &slab->search_cursor; 1910 1911 /* 1912 * If we just finished searching the last reference block, then wrap back around to the 1913 * start of the array. 1914 */ 1915 if (cursor->block == cursor->last_block) { 1916 reset_search_cursor(slab); 1917 return false; 1918 } 1919 1920 /* We're not already at the end, so advance to cursor to the next block. */ 1921 cursor->block++; 1922 cursor->index = cursor->end_index; 1923 1924 if (cursor->block == cursor->last_block) { 1925 /* The last reference block will usually be a runt. */ 1926 cursor->end_index = slab->block_count; 1927 } else { 1928 cursor->end_index += COUNTS_PER_BLOCK; 1929 } 1930 1931 return true; 1932 } 1933 1934 /** 1935 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild. 1936 * 1937 * Return: VDO_SUCCESS or an error. 1938 */ 1939 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot, 1940 physical_block_number_t pbn, 1941 enum journal_operation operation) 1942 { 1943 int result; 1944 slab_block_number block_number; 1945 struct reference_block *block; 1946 struct vdo_slab *slab = vdo_get_slab(depot, pbn); 1947 struct reference_updater updater = { 1948 .operation = operation, 1949 .increment = true, 1950 }; 1951 1952 result = slab_block_number_from_pbn(slab, pbn, &block_number); 1953 if (result != VDO_SUCCESS) 1954 return result; 1955 1956 block = get_reference_block(slab, block_number); 1957 result = update_reference_count(slab, block, block_number, NULL, 1958 &updater, !NORMAL_OPERATION, false, NULL); 1959 if (result != VDO_SUCCESS) 1960 return result; 1961 1962 dirty_block(block); 1963 return VDO_SUCCESS; 1964 } 1965 1966 /** 1967 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal 1968 * entry into the reference count for a block. 1969 * @slab: The slab. 1970 * @entry_point: The slab journal point for the entry. 1971 * @entry: The slab journal entry being replayed. 1972 * 1973 * The adjustment will be ignored if it was already recorded in the reference count. 1974 * 1975 * Return: VDO_SUCCESS or an error code. 1976 */ 1977 static int replay_reference_count_change(struct vdo_slab *slab, 1978 const struct journal_point *entry_point, 1979 struct slab_journal_entry entry) 1980 { 1981 int result; 1982 struct reference_block *block = get_reference_block(slab, entry.sbn); 1983 sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR; 1984 struct reference_updater updater = { 1985 .operation = entry.operation, 1986 .increment = entry.increment, 1987 }; 1988 1989 if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) { 1990 /* This entry is already reflected in the existing counts, so do nothing. */ 1991 return VDO_SUCCESS; 1992 } 1993 1994 /* This entry is not yet counted in the reference counts. */ 1995 result = update_reference_count(slab, block, entry.sbn, entry_point, 1996 &updater, !NORMAL_OPERATION, false, NULL); 1997 if (result != VDO_SUCCESS) 1998 return result; 1999 2000 dirty_block(block); 2001 return VDO_SUCCESS; 2002 } 2003 2004 /** 2005 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of 2006 * reference counters. 2007 * @word_ptr: A pointer to the eight counter bytes to check. 2008 * @start_index: The array index corresponding to word_ptr[0]. 2009 * @fail_index: The array index to return if no zero byte is found. 2010 * 2011 * The search does no bounds checking; the function relies on the array being sufficiently padded. 2012 * 2013 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if 2014 * no zero byte was found. 2015 */ 2016 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr, 2017 slab_block_number start_index, 2018 slab_block_number fail_index) 2019 { 2020 u64 word = get_unaligned_le64(word_ptr); 2021 2022 /* This looks like a loop, but GCC will unroll the eight iterations for us. */ 2023 unsigned int offset; 2024 2025 for (offset = 0; offset < BYTES_PER_WORD; offset++) { 2026 /* Assumes little-endian byte order, which we have on X86. */ 2027 if ((word & 0xFF) == 0) 2028 return (start_index + offset); 2029 word >>= 8; 2030 } 2031 2032 return fail_index; 2033 } 2034 2035 /** 2036 * find_free_block() - Find the first block with a reference count of zero in the specified 2037 * range of reference counter indexes. 2038 * @slab: The slab counters to scan. 2039 * @index_ptr: A pointer to hold the array index of the free block. 2040 * 2041 * Exposed for unit testing. 2042 * 2043 * Return: true if a free block was found in the specified range. 2044 */ 2045 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr) 2046 { 2047 slab_block_number zero_index; 2048 slab_block_number next_index = slab->search_cursor.index; 2049 slab_block_number end_index = slab->search_cursor.end_index; 2050 u8 *next_counter = &slab->counters[next_index]; 2051 u8 *end_counter = &slab->counters[end_index]; 2052 2053 /* 2054 * Search every byte of the first unaligned word. (Array is padded so reading past end is 2055 * safe.) 2056 */ 2057 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index); 2058 if (zero_index < end_index) { 2059 *index_ptr = zero_index; 2060 return true; 2061 } 2062 2063 /* 2064 * On architectures where unaligned word access is expensive, this would be a good place to 2065 * advance to an alignment boundary. 2066 */ 2067 next_index += BYTES_PER_WORD; 2068 next_counter += BYTES_PER_WORD; 2069 2070 /* 2071 * Now we're word-aligned; check an word at a time until we find a word containing a zero. 2072 * (Array is padded so reading past end is safe.) 2073 */ 2074 while (next_counter < end_counter) { 2075 /* 2076 * The following code is currently an exact copy of the code preceding the loop, 2077 * but if you try to merge them by using a do loop, it runs slower because a jump 2078 * instruction gets added at the start of the iteration. 2079 */ 2080 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index); 2081 if (zero_index < end_index) { 2082 *index_ptr = zero_index; 2083 return true; 2084 } 2085 2086 next_index += BYTES_PER_WORD; 2087 next_counter += BYTES_PER_WORD; 2088 } 2089 2090 return false; 2091 } 2092 2093 /** 2094 * search_current_reference_block() - Search the reference block currently saved in the search 2095 * cursor for a reference count of zero, starting at the saved 2096 * counter index. 2097 * @slab: The slab to search. 2098 * @free_index_ptr: A pointer to receive the array index of the zero reference count. 2099 * 2100 * Return: true if an unreferenced counter was found. 2101 */ 2102 static bool search_current_reference_block(const struct vdo_slab *slab, 2103 slab_block_number *free_index_ptr) 2104 { 2105 /* Don't bother searching if the current block is known to be full. */ 2106 return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) && 2107 find_free_block(slab, free_index_ptr)); 2108 } 2109 2110 /** 2111 * search_reference_blocks() - Search each reference block for a reference count of zero. 2112 * @slab: The slab to search. 2113 * @free_index_ptr: A pointer to receive the array index of the zero reference count. 2114 * 2115 * Searches each reference block for a reference count of zero, starting at the reference block and 2116 * counter index saved in the search cursor and searching up to the end of the last reference 2117 * block. The search does not wrap. 2118 * 2119 * Return: true if an unreferenced counter was found. 2120 */ 2121 static bool search_reference_blocks(struct vdo_slab *slab, 2122 slab_block_number *free_index_ptr) 2123 { 2124 /* Start searching at the saved search position in the current block. */ 2125 if (search_current_reference_block(slab, free_index_ptr)) 2126 return true; 2127 2128 /* Search each reference block up to the end of the slab. */ 2129 while (advance_search_cursor(slab)) { 2130 if (search_current_reference_block(slab, free_index_ptr)) 2131 return true; 2132 } 2133 2134 return false; 2135 } 2136 2137 /** 2138 * make_provisional_reference() - Do the bookkeeping for making a provisional reference. 2139 */ 2140 static void make_provisional_reference(struct vdo_slab *slab, 2141 slab_block_number block_number) 2142 { 2143 struct reference_block *block = get_reference_block(slab, block_number); 2144 2145 /* 2146 * Make the initial transition from an unreferenced block to a 2147 * provisionally allocated block. 2148 */ 2149 slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT; 2150 2151 /* Account for the allocation. */ 2152 block->allocated_count++; 2153 slab->free_blocks--; 2154 } 2155 2156 /** 2157 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty. 2158 */ 2159 static void dirty_all_reference_blocks(struct vdo_slab *slab) 2160 { 2161 block_count_t i; 2162 2163 for (i = 0; i < slab->reference_block_count; i++) 2164 dirty_block(&slab->reference_blocks[i]); 2165 } 2166 2167 static inline bool journal_points_equal(struct journal_point first, 2168 struct journal_point second) 2169 { 2170 return ((first.sequence_number == second.sequence_number) && 2171 (first.entry_count == second.entry_count)); 2172 } 2173 2174 /** 2175 * match_bytes() - Check an 8-byte word for bytes matching the value specified 2176 * @input: A word to examine the bytes of 2177 * @match: The byte value sought 2178 * 2179 * Return: 1 in each byte when the corresponding input byte matched, 0 otherwise 2180 */ 2181 static inline u64 match_bytes(u64 input, u8 match) 2182 { 2183 u64 temp = input ^ (match * 0x0101010101010101ULL); 2184 /* top bit of each byte is set iff top bit of temp byte is clear; rest are 0 */ 2185 u64 test_top_bits = ~temp & 0x8080808080808080ULL; 2186 /* top bit of each byte is set iff low 7 bits of temp byte are clear; rest are useless */ 2187 u64 test_low_bits = 0x8080808080808080ULL - (temp & 0x7f7f7f7f7f7f7f7fULL); 2188 /* return 1 when both tests indicate temp byte is 0 */ 2189 return (test_top_bits & test_low_bits) >> 7; 2190 } 2191 2192 /** 2193 * count_valid_references() - Process a newly loaded refcount array 2194 * @counters: the array of counters from a metadata block 2195 * 2196 * Scan a 8-byte-aligned array of counters, fixing up any "provisional" values that weren't 2197 * cleaned up at shutdown, changing them internally to "empty". 2198 * 2199 * Return: the number of blocks that are referenced (counters not "empty") 2200 */ 2201 static unsigned int count_valid_references(vdo_refcount_t *counters) 2202 { 2203 u64 *words = (u64 *)counters; 2204 /* It's easier to count occurrences of a specific byte than its absences. */ 2205 unsigned int empty_count = 0; 2206 /* For speed, we process 8 bytes at once. */ 2207 unsigned int words_left = COUNTS_PER_BLOCK / sizeof(u64); 2208 2209 /* 2210 * Sanity check assumptions used for optimizing this code: Counters are bytes. The counter 2211 * array is a multiple of the word size. 2212 */ 2213 BUILD_BUG_ON(sizeof(vdo_refcount_t) != 1); 2214 BUILD_BUG_ON((COUNTS_PER_BLOCK % sizeof(u64)) != 0); 2215 2216 while (words_left > 0) { 2217 /* 2218 * This is used effectively as 8 byte-size counters. Byte 0 counts how many words 2219 * had the target value found in byte 0, etc. We just have to avoid overflow. 2220 */ 2221 u64 split_count = 0; 2222 /* 2223 * The counter "% 255" trick used below to fold split_count into empty_count 2224 * imposes a limit of 254 bytes examined each iteration of the outer loop. We 2225 * process a word at a time, so that limit gets rounded down to 31 u64 words. 2226 */ 2227 const unsigned int max_words_per_iteration = 254 / sizeof(u64); 2228 unsigned int iter_words_left = min_t(unsigned int, words_left, 2229 max_words_per_iteration); 2230 2231 words_left -= iter_words_left; 2232 2233 while (iter_words_left--) { 2234 u64 word = *words; 2235 u64 temp; 2236 2237 /* First, if we have any provisional refcount values, clear them. */ 2238 temp = match_bytes(word, PROVISIONAL_REFERENCE_COUNT); 2239 if (temp) { 2240 /* 2241 * 'temp' has 0x01 bytes where 'word' has PROVISIONAL; this xor 2242 * will alter just those bytes, changing PROVISIONAL to EMPTY. 2243 */ 2244 word ^= temp * (PROVISIONAL_REFERENCE_COUNT ^ EMPTY_REFERENCE_COUNT); 2245 *words = word; 2246 } 2247 2248 /* Now count the EMPTY_REFERENCE_COUNT bytes, updating the 8 counters. */ 2249 split_count += match_bytes(word, EMPTY_REFERENCE_COUNT); 2250 words++; 2251 } 2252 empty_count += split_count % 255; 2253 } 2254 2255 return COUNTS_PER_BLOCK - empty_count; 2256 } 2257 2258 /** 2259 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure. 2260 * @packed: The written reference block to be unpacked. 2261 * @block: The internal reference block to be loaded. 2262 */ 2263 static void unpack_reference_block(struct packed_reference_block *packed, 2264 struct reference_block *block) 2265 { 2266 sector_count_t i; 2267 struct vdo_slab *slab = block->slab; 2268 vdo_refcount_t *counters = get_reference_counters_for_block(block); 2269 2270 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) { 2271 struct packed_reference_sector *sector = &packed->sectors[i]; 2272 2273 vdo_unpack_journal_point(§or->commit_point, &block->commit_points[i]); 2274 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts, 2275 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR)); 2276 /* The slab_journal_point must be the latest point found in any sector. */ 2277 if (vdo_before_journal_point(&slab->slab_journal_point, 2278 &block->commit_points[i])) 2279 slab->slab_journal_point = block->commit_points[i]; 2280 2281 if ((i > 0) && 2282 !journal_points_equal(block->commit_points[0], 2283 block->commit_points[i])) { 2284 size_t block_index = block - block->slab->reference_blocks; 2285 2286 vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u", 2287 i, block_index, block->slab->slab_number); 2288 } 2289 } 2290 2291 block->allocated_count = count_valid_references(counters); 2292 } 2293 2294 /** 2295 * finish_reference_block_load() - After a reference block has been read, unpack it. 2296 * @completion: The VIO that just finished reading. 2297 */ 2298 static void finish_reference_block_load(struct vdo_completion *completion) 2299 { 2300 struct vio *vio = as_vio(completion); 2301 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 2302 struct reference_block *block = completion->parent; 2303 struct vdo_slab *slab = block->slab; 2304 unsigned int block_count = vio->io_size / VDO_BLOCK_SIZE; 2305 unsigned int i; 2306 char *data = vio->data; 2307 2308 for (i = 0; i < block_count; i++, block++, data += VDO_BLOCK_SIZE) { 2309 struct packed_reference_block *packed = (struct packed_reference_block *) data; 2310 2311 unpack_reference_block(packed, block); 2312 slab->free_blocks -= block->allocated_count; 2313 } 2314 return_vio_to_pool(pooled); 2315 slab->active_count -= block_count; 2316 2317 check_if_slab_drained(slab); 2318 } 2319 2320 static void load_reference_block_endio(struct bio *bio) 2321 { 2322 struct vio *vio = bio->bi_private; 2323 struct reference_block *block = vio->completion.parent; 2324 2325 continue_vio_after_io(vio, finish_reference_block_load, 2326 block->slab->allocator->thread_id); 2327 } 2328 2329 /** 2330 * load_reference_block_group() - After a block waiter has gotten a VIO from the VIO pool, load 2331 * a set of blocks. 2332 * @waiter: The waiter of the first block to load. 2333 * @context: The VIO returned by the pool. 2334 */ 2335 static void load_reference_block_group(struct vdo_waiter *waiter, void *context) 2336 { 2337 struct pooled_vio *pooled = context; 2338 struct vio *vio = &pooled->vio; 2339 struct reference_block *block = 2340 container_of(waiter, struct reference_block, waiter); 2341 u32 block_offset = block - block->slab->reference_blocks; 2342 u32 max_block_count = block->slab->reference_block_count - block_offset; 2343 u32 block_count = min_t(int, vio->block_count, max_block_count); 2344 2345 vio->completion.parent = block; 2346 vdo_submit_metadata_vio_with_size(vio, block->slab->ref_counts_origin + block_offset, 2347 load_reference_block_endio, handle_io_error, 2348 REQ_OP_READ, block_count * VDO_BLOCK_SIZE); 2349 } 2350 2351 /** 2352 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a 2353 * pre-allocated reference counter. 2354 */ 2355 static void load_reference_blocks(struct vdo_slab *slab) 2356 { 2357 block_count_t i; 2358 u64 blocks_per_vio = slab->allocator->refcount_blocks_per_big_vio; 2359 struct vio_pool *pool = slab->allocator->refcount_big_vio_pool; 2360 2361 if (!pool) { 2362 pool = slab->allocator->vio_pool; 2363 blocks_per_vio = 1; 2364 } 2365 2366 slab->free_blocks = slab->block_count; 2367 slab->active_count = slab->reference_block_count; 2368 for (i = 0; i < slab->reference_block_count; i += blocks_per_vio) { 2369 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter; 2370 2371 waiter->callback = load_reference_block_group; 2372 acquire_vio_from_pool(pool, waiter); 2373 } 2374 } 2375 2376 /** 2377 * drain_slab() - Drain all reference count I/O. 2378 * 2379 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the 2380 * reference blocks may be loaded from disk or dirty reference blocks may be written out. 2381 */ 2382 static void drain_slab(struct vdo_slab *slab) 2383 { 2384 bool save; 2385 bool load; 2386 const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state); 2387 2388 if (state == VDO_ADMIN_STATE_SUSPENDING) 2389 return; 2390 2391 if ((state != VDO_ADMIN_STATE_REBUILDING) && 2392 (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING)) 2393 commit_tail(&slab->journal); 2394 2395 if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL)) 2396 return; 2397 2398 save = false; 2399 load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts; 2400 if (state == VDO_ADMIN_STATE_SCRUBBING) { 2401 if (load) { 2402 load_reference_blocks(slab); 2403 return; 2404 } 2405 } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) { 2406 if (!load) { 2407 /* These reference counts were never written, so mark them all dirty. */ 2408 dirty_all_reference_blocks(slab); 2409 } 2410 save = true; 2411 } else if (state == VDO_ADMIN_STATE_REBUILDING) { 2412 /* 2413 * Write out the counters if the slab has written them before, or it has any 2414 * non-zero reference counts, or there are any slab journal blocks. 2415 */ 2416 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks; 2417 2418 if (load || (slab->free_blocks != data_blocks) || 2419 !is_slab_journal_blank(slab)) { 2420 dirty_all_reference_blocks(slab); 2421 save = true; 2422 } 2423 } else if (state == VDO_ADMIN_STATE_SAVING) { 2424 save = (slab->status == VDO_SLAB_REBUILT); 2425 } else { 2426 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS); 2427 return; 2428 } 2429 2430 if (save) 2431 save_dirty_reference_blocks(slab); 2432 } 2433 2434 static int allocate_slab_counters(struct vdo_slab *slab) 2435 { 2436 int result; 2437 size_t index, bytes; 2438 2439 result = VDO_ASSERT(slab->reference_blocks == NULL, 2440 "vdo_slab %u doesn't allocate refcounts twice", 2441 slab->slab_number); 2442 if (result != VDO_SUCCESS) 2443 return result; 2444 2445 result = vdo_allocate(slab->reference_block_count, struct reference_block, 2446 __func__, &slab->reference_blocks); 2447 if (result != VDO_SUCCESS) 2448 return result; 2449 2450 /* 2451 * Allocate such that the runt slab has a full-length memory array, plus a little padding 2452 * so we can word-search even at the very end. 2453 */ 2454 bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD); 2455 result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array", 2456 &slab->counters); 2457 if (result != VDO_SUCCESS) { 2458 vdo_free(vdo_forget(slab->reference_blocks)); 2459 return result; 2460 } 2461 2462 slab->search_cursor.first_block = slab->reference_blocks; 2463 slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1]; 2464 reset_search_cursor(slab); 2465 2466 for (index = 0; index < slab->reference_block_count; index++) { 2467 slab->reference_blocks[index] = (struct reference_block) { 2468 .slab = slab, 2469 }; 2470 } 2471 2472 return VDO_SUCCESS; 2473 } 2474 2475 static int allocate_counters_if_clean(struct vdo_slab *slab) 2476 { 2477 if (vdo_is_state_clean_load(&slab->state)) 2478 return allocate_slab_counters(slab); 2479 2480 return VDO_SUCCESS; 2481 } 2482 2483 static void finish_loading_journal(struct vdo_completion *completion) 2484 { 2485 struct vio *vio = as_vio(completion); 2486 struct slab_journal *journal = completion->parent; 2487 struct vdo_slab *slab = journal->slab; 2488 struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data; 2489 struct slab_journal_block_header header; 2490 2491 vdo_unpack_slab_journal_block_header(&block->header, &header); 2492 2493 /* FIXME: should it be an error if the following conditional fails? */ 2494 if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) && 2495 (header.nonce == slab->allocator->nonce)) { 2496 journal->tail = header.sequence_number + 1; 2497 2498 /* 2499 * If the slab is clean, this implies the slab journal is empty, so advance the 2500 * head appropriately. 2501 */ 2502 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ? 2503 header.head : journal->tail); 2504 journal->tail_header = header; 2505 initialize_journal_state(journal); 2506 } 2507 2508 return_vio_to_pool(vio_as_pooled_vio(vio)); 2509 vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab)); 2510 } 2511 2512 static void read_slab_journal_tail_endio(struct bio *bio) 2513 { 2514 struct vio *vio = bio->bi_private; 2515 struct slab_journal *journal = vio->completion.parent; 2516 2517 continue_vio_after_io(vio, finish_loading_journal, 2518 journal->slab->allocator->thread_id); 2519 } 2520 2521 static void handle_load_error(struct vdo_completion *completion) 2522 { 2523 int result = completion->result; 2524 struct slab_journal *journal = completion->parent; 2525 struct vio *vio = as_vio(completion); 2526 2527 vio_record_metadata_io_error(vio); 2528 return_vio_to_pool(vio_as_pooled_vio(vio)); 2529 vdo_finish_loading_with_result(&journal->slab->state, result); 2530 } 2531 2532 /** 2533 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio 2534 * pool. 2535 * @waiter: The vio pool waiter which has just been notified. 2536 * @context: The vio pool entry given to the waiter. 2537 * 2538 * This is the success callback from acquire_vio_from_pool() when loading a slab journal. 2539 */ 2540 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context) 2541 { 2542 struct slab_journal *journal = 2543 container_of(waiter, struct slab_journal, resource_waiter); 2544 struct vdo_slab *slab = journal->slab; 2545 struct pooled_vio *pooled = context; 2546 struct vio *vio = &pooled->vio; 2547 tail_block_offset_t last_commit_point = 2548 slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 2549 2550 /* 2551 * Slab summary keeps the commit point offset, so the tail block is the block before that. 2552 * Calculation supports small journals in unit tests. 2553 */ 2554 tail_block_offset_t tail_block = ((last_commit_point == 0) ? 2555 (tail_block_offset_t)(journal->size - 1) : 2556 (last_commit_point - 1)); 2557 2558 vio->completion.parent = journal; 2559 vio->completion.callback_thread_id = slab->allocator->thread_id; 2560 vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block, 2561 read_slab_journal_tail_endio, handle_load_error, 2562 REQ_OP_READ); 2563 } 2564 2565 /** 2566 * load_slab_journal() - Load a slab's journal by reading the journal's tail. 2567 */ 2568 static void load_slab_journal(struct vdo_slab *slab) 2569 { 2570 struct slab_journal *journal = &slab->journal; 2571 tail_block_offset_t last_commit_point; 2572 2573 last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 2574 if ((last_commit_point == 0) && 2575 !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) { 2576 /* 2577 * This slab claims that it has a tail block at (journal->size - 1), but a head of 2578 * 1. This is impossible, due to the scrubbing threshold, on a real system, so 2579 * don't bother reading the (bogus) data off disk. 2580 */ 2581 VDO_ASSERT_LOG_ONLY(((journal->size < 16) || 2582 (journal->scrubbing_threshold < (journal->size - 1))), 2583 "Scrubbing threshold protects against reads of unwritten slab journal blocks"); 2584 vdo_finish_loading_with_result(&slab->state, 2585 allocate_counters_if_clean(slab)); 2586 return; 2587 } 2588 2589 journal->resource_waiter.callback = read_slab_journal_tail; 2590 acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter); 2591 } 2592 2593 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority) 2594 { 2595 struct slab_scrubber *scrubber = &slab->allocator->scrubber; 2596 2597 VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT), 2598 "slab to be scrubbed is unrecovered"); 2599 2600 if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING) 2601 return; 2602 2603 list_del_init(&slab->allocq_entry); 2604 if (!slab->was_queued_for_scrubbing) { 2605 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1); 2606 slab->was_queued_for_scrubbing = true; 2607 } 2608 2609 if (high_priority) { 2610 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING; 2611 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs); 2612 return; 2613 } 2614 2615 list_add_tail(&slab->allocq_entry, &scrubber->slabs); 2616 } 2617 2618 /* Queue a slab for allocation or scrubbing. */ 2619 static void queue_slab(struct vdo_slab *slab) 2620 { 2621 struct block_allocator *allocator = slab->allocator; 2622 block_count_t free_blocks; 2623 int result; 2624 2625 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry), 2626 "a requeued slab must not already be on a list"); 2627 2628 if (vdo_is_read_only(allocator->depot->vdo)) 2629 return; 2630 2631 free_blocks = slab->free_blocks; 2632 result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks), 2633 "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)", 2634 slab->slab_number, (unsigned long long) free_blocks, 2635 (unsigned long long) allocator->depot->slab_config.data_blocks); 2636 if (result != VDO_SUCCESS) { 2637 vdo_enter_read_only_mode(allocator->depot->vdo, result); 2638 return; 2639 } 2640 2641 if (slab->status != VDO_SLAB_REBUILT) { 2642 register_slab_for_scrubbing(slab, false); 2643 return; 2644 } 2645 2646 if (!vdo_is_state_resuming(&slab->state)) { 2647 /* 2648 * If the slab is resuming, we've already accounted for it here, so don't do it 2649 * again. 2650 * FIXME: under what situation would the slab be resuming here? 2651 */ 2652 WRITE_ONCE(allocator->allocated_blocks, 2653 allocator->allocated_blocks - free_blocks); 2654 if (!is_slab_journal_blank(slab)) { 2655 WRITE_ONCE(allocator->statistics.slabs_opened, 2656 allocator->statistics.slabs_opened + 1); 2657 } 2658 } 2659 2660 if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING) 2661 reopen_slab_journal(slab); 2662 2663 prioritize_slab(slab); 2664 } 2665 2666 /** 2667 * initiate_slab_action() - Initiate a slab action. 2668 * 2669 * Implements vdo_admin_initiator_fn. 2670 */ 2671 static void initiate_slab_action(struct admin_state *state) 2672 { 2673 struct vdo_slab *slab = container_of(state, struct vdo_slab, state); 2674 2675 if (vdo_is_state_draining(state)) { 2676 const struct admin_state_code *operation = vdo_get_admin_state_code(state); 2677 2678 if (operation == VDO_ADMIN_STATE_SCRUBBING) 2679 slab->status = VDO_SLAB_REBUILDING; 2680 2681 drain_slab(slab); 2682 check_if_slab_drained(slab); 2683 return; 2684 } 2685 2686 if (vdo_is_state_loading(state)) { 2687 load_slab_journal(slab); 2688 return; 2689 } 2690 2691 if (vdo_is_state_resuming(state)) { 2692 queue_slab(slab); 2693 vdo_finish_resuming(state); 2694 return; 2695 } 2696 2697 vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE); 2698 } 2699 2700 /** 2701 * get_next_slab() - Get the next slab to scrub. 2702 * @scrubber: The slab scrubber. 2703 * 2704 * Return: The next slab to scrub or NULL if there are none. 2705 */ 2706 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber) 2707 { 2708 struct vdo_slab *slab; 2709 2710 slab = list_first_entry_or_null(&scrubber->high_priority_slabs, 2711 struct vdo_slab, allocq_entry); 2712 if (slab != NULL) 2713 return slab; 2714 2715 return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab, 2716 allocq_entry); 2717 } 2718 2719 /** 2720 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub. 2721 * @scrubber: The scrubber to check. 2722 * 2723 * Return: true if the scrubber has slabs to scrub. 2724 */ 2725 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber) 2726 { 2727 return (get_next_slab(scrubber) != NULL); 2728 } 2729 2730 /** 2731 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio. 2732 * @scrubber: The scrubber. 2733 */ 2734 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber) 2735 { 2736 vdo_free(vdo_forget(scrubber->vio.data)); 2737 free_vio_components(&scrubber->vio); 2738 } 2739 2740 /** 2741 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because 2742 * there's been an error. 2743 * @scrubber: The scrubber. 2744 */ 2745 static void finish_scrubbing(struct slab_scrubber *scrubber, int result) 2746 { 2747 bool notify = vdo_waitq_has_waiters(&scrubber->waiters); 2748 bool done = !has_slabs_to_scrub(scrubber); 2749 struct block_allocator *allocator = 2750 container_of(scrubber, struct block_allocator, scrubber); 2751 2752 if (done) 2753 uninitialize_scrubber_vio(scrubber); 2754 2755 if (scrubber->high_priority_only) { 2756 scrubber->high_priority_only = false; 2757 vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result); 2758 } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) { 2759 /* All of our slabs were scrubbed, and we're the last allocator to finish. */ 2760 enum vdo_state prior_state = 2761 atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING, 2762 VDO_DIRTY); 2763 2764 /* 2765 * To be safe, even if the CAS failed, ensure anything that follows is ordered with 2766 * respect to whatever state change did happen. 2767 */ 2768 smp_mb__after_atomic(); 2769 2770 /* 2771 * We must check the VDO state here and not the depot's read_only_notifier since 2772 * the compare-swap-above could have failed due to a read-only entry which our own 2773 * thread does not yet know about. 2774 */ 2775 if (prior_state == VDO_DIRTY) 2776 vdo_log_info("VDO commencing normal operation"); 2777 else if (prior_state == VDO_RECOVERING) 2778 vdo_log_info("Exiting recovery mode"); 2779 free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool)); 2780 } 2781 2782 /* 2783 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to 2784 * happen. 2785 */ 2786 if (!vdo_finish_draining(&scrubber->admin_state)) 2787 WRITE_ONCE(scrubber->admin_state.current_state, 2788 VDO_ADMIN_STATE_SUSPENDED); 2789 2790 /* 2791 * We can't notify waiters until after we've finished draining or they'll just requeue. 2792 * Fortunately if there were waiters, we can't have been freed yet. 2793 */ 2794 if (notify) 2795 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL); 2796 } 2797 2798 static void scrub_next_slab(struct slab_scrubber *scrubber); 2799 2800 /** 2801 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed. 2802 * @completion: The slab rebuild completion. 2803 * 2804 * This callback is registered in apply_journal_entries(). 2805 */ 2806 static void slab_scrubbed(struct vdo_completion *completion) 2807 { 2808 struct slab_scrubber *scrubber = 2809 container_of(as_vio(completion), struct slab_scrubber, vio); 2810 struct vdo_slab *slab = scrubber->slab; 2811 2812 slab->status = VDO_SLAB_REBUILT; 2813 queue_slab(slab); 2814 reopen_slab_journal(slab); 2815 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1); 2816 scrub_next_slab(scrubber); 2817 } 2818 2819 /** 2820 * abort_scrubbing() - Abort scrubbing due to an error. 2821 * @scrubber: The slab scrubber. 2822 * @result: The error. 2823 */ 2824 static void abort_scrubbing(struct slab_scrubber *scrubber, int result) 2825 { 2826 vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result); 2827 finish_scrubbing(scrubber, result); 2828 } 2829 2830 /** 2831 * handle_scrubber_error() - Handle errors while rebuilding a slab. 2832 * @completion: The slab rebuild completion. 2833 */ 2834 static void handle_scrubber_error(struct vdo_completion *completion) 2835 { 2836 struct vio *vio = as_vio(completion); 2837 2838 vio_record_metadata_io_error(vio); 2839 abort_scrubbing(container_of(vio, struct slab_scrubber, vio), 2840 completion->result); 2841 } 2842 2843 /** 2844 * apply_block_entries() - Apply all the entries in a block to the reference counts. 2845 * @block: A block with entries to apply. 2846 * @entry_count: The number of entries to apply. 2847 * @block_number: The sequence number of the block. 2848 * @slab: The slab to apply the entries to. 2849 * 2850 * Return: VDO_SUCCESS or an error code. 2851 */ 2852 static int apply_block_entries(struct packed_slab_journal_block *block, 2853 journal_entry_count_t entry_count, 2854 sequence_number_t block_number, struct vdo_slab *slab) 2855 { 2856 struct journal_point entry_point = { 2857 .sequence_number = block_number, 2858 .entry_count = 0, 2859 }; 2860 int result; 2861 slab_block_number max_sbn = slab->end - slab->start; 2862 2863 while (entry_point.entry_count < entry_count) { 2864 struct slab_journal_entry entry = 2865 vdo_decode_slab_journal_entry(block, entry_point.entry_count); 2866 2867 if (entry.sbn > max_sbn) { 2868 /* This entry is out of bounds. */ 2869 return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL, 2870 "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)", 2871 (unsigned long long) block_number, 2872 entry_point.entry_count, 2873 entry.sbn, max_sbn); 2874 } 2875 2876 result = replay_reference_count_change(slab, &entry_point, entry); 2877 if (result != VDO_SUCCESS) { 2878 vdo_log_error_strerror(result, 2879 "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u", 2880 (unsigned long long) block_number, 2881 entry_point.entry_count, 2882 vdo_get_journal_operation_name(entry.operation), 2883 entry.sbn, slab->slab_number); 2884 return result; 2885 } 2886 entry_point.entry_count++; 2887 } 2888 2889 return VDO_SUCCESS; 2890 } 2891 2892 /** 2893 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries. 2894 * @completion: The metadata read vio completion. 2895 * 2896 * This is a callback registered in start_scrubbing(). 2897 */ 2898 static void apply_journal_entries(struct vdo_completion *completion) 2899 { 2900 int result; 2901 struct slab_scrubber *scrubber = 2902 container_of(as_vio(completion), struct slab_scrubber, vio); 2903 struct vdo_slab *slab = scrubber->slab; 2904 struct slab_journal *journal = &slab->journal; 2905 2906 /* Find the boundaries of the useful part of the journal. */ 2907 sequence_number_t tail = journal->tail; 2908 tail_block_offset_t end_index = (tail - 1) % journal->size; 2909 char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE); 2910 struct packed_slab_journal_block *end_block = 2911 (struct packed_slab_journal_block *) end_data; 2912 2913 sequence_number_t head = __le64_to_cpu(end_block->header.head); 2914 tail_block_offset_t head_index = head % journal->size; 2915 block_count_t index = head_index; 2916 2917 struct journal_point ref_counts_point = slab->slab_journal_point; 2918 struct journal_point last_entry_applied = ref_counts_point; 2919 sequence_number_t sequence; 2920 2921 for (sequence = head; sequence < tail; sequence++) { 2922 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE); 2923 struct packed_slab_journal_block *block = 2924 (struct packed_slab_journal_block *) block_data; 2925 struct slab_journal_block_header header; 2926 2927 vdo_unpack_slab_journal_block_header(&block->header, &header); 2928 2929 if ((header.nonce != slab->allocator->nonce) || 2930 (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) || 2931 (header.sequence_number != sequence) || 2932 (header.entry_count > journal->entries_per_block) || 2933 (header.has_block_map_increments && 2934 (header.entry_count > journal->full_entries_per_block))) { 2935 /* The block is not what we expect it to be. */ 2936 vdo_log_error("vdo_slab journal block for slab %u was invalid", 2937 slab->slab_number); 2938 abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL); 2939 return; 2940 } 2941 2942 result = apply_block_entries(block, header.entry_count, sequence, slab); 2943 if (result != VDO_SUCCESS) { 2944 abort_scrubbing(scrubber, result); 2945 return; 2946 } 2947 2948 last_entry_applied.sequence_number = sequence; 2949 last_entry_applied.entry_count = header.entry_count - 1; 2950 index++; 2951 if (index == journal->size) 2952 index = 0; 2953 } 2954 2955 /* 2956 * At the end of rebuild, the reference counters should be accurate to the end of the 2957 * journal we just applied. 2958 */ 2959 result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied, 2960 &ref_counts_point), 2961 "Refcounts are not more accurate than the slab journal"); 2962 if (result != VDO_SUCCESS) { 2963 abort_scrubbing(scrubber, result); 2964 return; 2965 } 2966 2967 /* Save out the rebuilt reference blocks. */ 2968 vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error, 2969 slab->allocator->thread_id, completion->parent); 2970 vdo_start_operation_with_waiter(&slab->state, 2971 VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING, 2972 completion, initiate_slab_action); 2973 } 2974 2975 static void read_slab_journal_endio(struct bio *bio) 2976 { 2977 struct vio *vio = bio->bi_private; 2978 struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio); 2979 2980 continue_vio_after_io(bio->bi_private, apply_journal_entries, 2981 scrubber->slab->allocator->thread_id); 2982 } 2983 2984 /** 2985 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed. 2986 * @completion: The scrubber's vio completion. 2987 * 2988 * This callback is registered in scrub_next_slab(). 2989 */ 2990 static void start_scrubbing(struct vdo_completion *completion) 2991 { 2992 struct slab_scrubber *scrubber = 2993 container_of(as_vio(completion), struct slab_scrubber, vio); 2994 struct vdo_slab *slab = scrubber->slab; 2995 2996 if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) { 2997 slab_scrubbed(completion); 2998 return; 2999 } 3000 3001 vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin, 3002 read_slab_journal_endio, handle_scrubber_error, 3003 REQ_OP_READ); 3004 } 3005 3006 /** 3007 * scrub_next_slab() - Scrub the next slab if there is one. 3008 * @scrubber: The scrubber. 3009 */ 3010 static void scrub_next_slab(struct slab_scrubber *scrubber) 3011 { 3012 struct vdo_completion *completion = &scrubber->vio.completion; 3013 struct vdo_slab *slab; 3014 3015 /* 3016 * Note: this notify call is always safe only because scrubbing can only be started when 3017 * the VDO is quiescent. 3018 */ 3019 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL); 3020 3021 if (vdo_is_read_only(completion->vdo)) { 3022 finish_scrubbing(scrubber, VDO_READ_ONLY); 3023 return; 3024 } 3025 3026 slab = get_next_slab(scrubber); 3027 if ((slab == NULL) || 3028 (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) { 3029 finish_scrubbing(scrubber, VDO_SUCCESS); 3030 return; 3031 } 3032 3033 if (vdo_finish_draining(&scrubber->admin_state)) 3034 return; 3035 3036 list_del_init(&slab->allocq_entry); 3037 scrubber->slab = slab; 3038 vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error, 3039 slab->allocator->thread_id, completion->parent); 3040 vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING, 3041 completion, initiate_slab_action); 3042 } 3043 3044 /** 3045 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing. 3046 * @allocator: The block_allocator to scrub. 3047 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL. 3048 */ 3049 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent) 3050 { 3051 struct slab_scrubber *scrubber = &allocator->scrubber; 3052 3053 scrubber->vio.completion.parent = parent; 3054 scrubber->high_priority_only = (parent != NULL); 3055 if (!has_slabs_to_scrub(scrubber)) { 3056 finish_scrubbing(scrubber, VDO_SUCCESS); 3057 return; 3058 } 3059 3060 if (scrubber->high_priority_only && 3061 vdo_is_priority_table_empty(allocator->prioritized_slabs) && 3062 list_empty(&scrubber->high_priority_slabs)) 3063 register_slab_for_scrubbing(get_next_slab(scrubber), true); 3064 3065 vdo_resume_if_quiescent(&scrubber->admin_state); 3066 scrub_next_slab(scrubber); 3067 } 3068 3069 static inline void assert_on_allocator_thread(thread_id_t thread_id, 3070 const char *function_name) 3071 { 3072 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id), 3073 "%s called on correct thread", function_name); 3074 } 3075 3076 static void register_slab_with_allocator(struct block_allocator *allocator, 3077 struct vdo_slab *slab) 3078 { 3079 allocator->slab_count++; 3080 allocator->last_slab = slab->slab_number; 3081 } 3082 3083 /** 3084 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot. 3085 * @depot: The depot over which to iterate. 3086 * @start: The number of the slab to start iterating from. 3087 * @end: The number of the last slab which may be returned. 3088 * @stride: The difference in slab number between successive slabs. 3089 * 3090 * Iteration always occurs from higher to lower numbered slabs. 3091 * 3092 * Return: An initialized iterator structure. 3093 */ 3094 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot, 3095 slab_count_t start, slab_count_t end, 3096 slab_count_t stride) 3097 { 3098 struct vdo_slab **slabs = depot->slabs; 3099 3100 return (struct slab_iterator) { 3101 .slabs = slabs, 3102 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]), 3103 .end = end, 3104 .stride = stride, 3105 }; 3106 } 3107 3108 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator) 3109 { 3110 return get_depot_slab_iterator(allocator->depot, allocator->last_slab, 3111 allocator->zone_number, 3112 allocator->depot->zone_count); 3113 } 3114 3115 /** 3116 * next_slab() - Get the next slab from a slab_iterator and advance the iterator 3117 * @iterator: The slab_iterator. 3118 * 3119 * Return: The next slab or NULL if the iterator is exhausted. 3120 */ 3121 static struct vdo_slab *next_slab(struct slab_iterator *iterator) 3122 { 3123 struct vdo_slab *slab = iterator->next; 3124 3125 if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride)) 3126 iterator->next = NULL; 3127 else 3128 iterator->next = iterator->slabs[slab->slab_number - iterator->stride]; 3129 3130 return slab; 3131 } 3132 3133 /** 3134 * abort_waiter() - Abort vios waiting to make journal entries when read-only. 3135 * 3136 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone 3137 * into read-only mode. Implements waiter_callback_fn. 3138 */ 3139 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused) 3140 { 3141 struct reference_updater *updater = 3142 container_of(waiter, struct reference_updater, waiter); 3143 struct data_vio *data_vio = data_vio_from_reference_updater(updater); 3144 3145 if (updater->increment) { 3146 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 3147 return; 3148 } 3149 3150 vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY); 3151 } 3152 3153 /* Implements vdo_read_only_notification_fn. */ 3154 static void notify_block_allocator_of_read_only_mode(void *listener, 3155 struct vdo_completion *parent) 3156 { 3157 struct block_allocator *allocator = listener; 3158 struct slab_iterator iterator; 3159 3160 assert_on_allocator_thread(allocator->thread_id, __func__); 3161 iterator = get_slab_iterator(allocator); 3162 while (iterator.next != NULL) { 3163 struct vdo_slab *slab = next_slab(&iterator); 3164 3165 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters, 3166 abort_waiter, &slab->journal); 3167 check_if_slab_drained(slab); 3168 } 3169 3170 vdo_finish_completion(parent); 3171 } 3172 3173 /** 3174 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if 3175 * the block it locks is unreferenced. 3176 * @slab: The slab which contains the block. 3177 * @pbn: The physical block to reference. 3178 * @lock: The lock. 3179 * 3180 * Return: VDO_SUCCESS or an error. 3181 */ 3182 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn, 3183 struct pbn_lock *lock) 3184 { 3185 slab_block_number block_number; 3186 int result; 3187 3188 if (vdo_pbn_lock_has_provisional_reference(lock)) 3189 return VDO_SUCCESS; 3190 3191 if (!is_slab_open(slab)) 3192 return VDO_INVALID_ADMIN_STATE; 3193 3194 result = slab_block_number_from_pbn(slab, pbn, &block_number); 3195 if (result != VDO_SUCCESS) 3196 return result; 3197 3198 if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) { 3199 make_provisional_reference(slab, block_number); 3200 if (lock != NULL) 3201 vdo_assign_pbn_lock_provisional_reference(lock); 3202 } 3203 3204 if (vdo_pbn_lock_has_provisional_reference(lock)) 3205 adjust_free_block_count(slab, false); 3206 3207 return VDO_SUCCESS; 3208 } 3209 3210 static int __must_check allocate_slab_block(struct vdo_slab *slab, 3211 physical_block_number_t *block_number_ptr) 3212 { 3213 slab_block_number free_index; 3214 3215 if (!is_slab_open(slab)) 3216 return VDO_INVALID_ADMIN_STATE; 3217 3218 if (!search_reference_blocks(slab, &free_index)) 3219 return VDO_NO_SPACE; 3220 3221 VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT), 3222 "free block must have ref count of zero"); 3223 make_provisional_reference(slab, free_index); 3224 adjust_free_block_count(slab, false); 3225 3226 /* 3227 * Update the search hint so the next search will start at the array index just past the 3228 * free block we just found. 3229 */ 3230 slab->search_cursor.index = (free_index + 1); 3231 3232 *block_number_ptr = slab->start + free_index; 3233 return VDO_SUCCESS; 3234 } 3235 3236 /** 3237 * open_slab() - Prepare a slab to be allocated from. 3238 * @slab: The slab. 3239 */ 3240 static void open_slab(struct vdo_slab *slab) 3241 { 3242 reset_search_cursor(slab); 3243 if (is_slab_journal_blank(slab)) { 3244 WRITE_ONCE(slab->allocator->statistics.slabs_opened, 3245 slab->allocator->statistics.slabs_opened + 1); 3246 dirty_all_reference_blocks(slab); 3247 } else { 3248 WRITE_ONCE(slab->allocator->statistics.slabs_reopened, 3249 slab->allocator->statistics.slabs_reopened + 1); 3250 } 3251 3252 slab->allocator->open_slab = slab; 3253 } 3254 3255 3256 /* 3257 * The block allocated will have a provisional reference and the reference must be either confirmed 3258 * with a subsequent increment or vacated with a subsequent decrement via 3259 * vdo_release_block_reference(). 3260 */ 3261 int vdo_allocate_block(struct block_allocator *allocator, 3262 physical_block_number_t *block_number_ptr) 3263 { 3264 int result; 3265 3266 if (allocator->open_slab != NULL) { 3267 /* Try to allocate the next block in the currently open slab. */ 3268 result = allocate_slab_block(allocator->open_slab, block_number_ptr); 3269 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE)) 3270 return result; 3271 3272 /* Put the exhausted open slab back into the priority table. */ 3273 prioritize_slab(allocator->open_slab); 3274 } 3275 3276 /* Remove the highest priority slab from the priority table and make it the open slab. */ 3277 open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs), 3278 struct vdo_slab, allocq_entry)); 3279 3280 /* 3281 * Try allocating again. If we're out of space immediately after opening a slab, then every 3282 * slab must be fully allocated. 3283 */ 3284 return allocate_slab_block(allocator->open_slab, block_number_ptr); 3285 } 3286 3287 /** 3288 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab. 3289 * @allocator: The block_allocator on which to wait. 3290 * @waiter: The waiter. 3291 * 3292 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and 3293 * some other error otherwise. 3294 */ 3295 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator, 3296 struct vdo_waiter *waiter) 3297 { 3298 if (vdo_is_read_only(allocator->depot->vdo)) 3299 return VDO_READ_ONLY; 3300 3301 if (vdo_is_state_quiescent(&allocator->scrubber.admin_state)) 3302 return VDO_NO_SPACE; 3303 3304 vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter); 3305 return VDO_SUCCESS; 3306 } 3307 3308 /** 3309 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab 3310 * journal entry and then updating the reference counter. 3311 * @completion: The data_vio completion for which to add the entry. 3312 * @updater: Which of the data_vio's reference updaters is being submitted. 3313 */ 3314 void vdo_modify_reference_count(struct vdo_completion *completion, 3315 struct reference_updater *updater) 3316 { 3317 struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn); 3318 3319 if (!is_slab_open(slab)) { 3320 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE); 3321 return; 3322 } 3323 3324 if (vdo_is_read_only(completion->vdo)) { 3325 vdo_continue_completion(completion, VDO_READ_ONLY); 3326 return; 3327 } 3328 3329 vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter); 3330 if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal)) 3331 register_slab_for_scrubbing(slab, true); 3332 3333 add_entries(&slab->journal); 3334 } 3335 3336 /* Release an unused provisional reference. */ 3337 int vdo_release_block_reference(struct block_allocator *allocator, 3338 physical_block_number_t pbn) 3339 { 3340 struct reference_updater updater; 3341 3342 if (pbn == VDO_ZERO_BLOCK) 3343 return VDO_SUCCESS; 3344 3345 updater = (struct reference_updater) { 3346 .operation = VDO_JOURNAL_DATA_REMAPPING, 3347 .increment = false, 3348 .zpbn = { 3349 .pbn = pbn, 3350 }, 3351 }; 3352 3353 return adjust_reference_count(vdo_get_slab(allocator->depot, pbn), 3354 &updater, NULL); 3355 } 3356 3357 /* 3358 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as 3359 * the primary key and the 'emptiness' field as the secondary key. 3360 * 3361 * Slabs need to be pushed onto the lists in the same order they are to be popped off. Popping 3362 * should always get the most empty first, so pushing should be from most empty to least empty. 3363 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements 3364 * before larger ones. 3365 */ 3366 static bool slab_status_is_less_than(const void *item1, const void *item2, 3367 void __always_unused *args) 3368 { 3369 const struct slab_status *info1 = item1; 3370 const struct slab_status *info2 = item2; 3371 3372 if (info1->is_clean != info2->is_clean) 3373 return info1->is_clean; 3374 if (info1->emptiness != info2->emptiness) 3375 return info1->emptiness > info2->emptiness; 3376 return info1->slab_number < info2->slab_number; 3377 } 3378 3379 static const struct min_heap_callbacks slab_status_min_heap = { 3380 .less = slab_status_is_less_than, 3381 .swp = NULL, 3382 }; 3383 3384 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */ 3385 static void slab_action_callback(struct vdo_completion *completion) 3386 { 3387 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3388 struct slab_actor *actor = &allocator->slab_actor; 3389 3390 if (--actor->slab_action_count == 0) { 3391 actor->callback(completion); 3392 return; 3393 } 3394 3395 vdo_reset_completion(completion); 3396 } 3397 3398 /* Preserve the error from part of an action and continue. */ 3399 static void handle_operation_error(struct vdo_completion *completion) 3400 { 3401 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3402 3403 if (allocator->state.waiter != NULL) 3404 vdo_set_completion_result(allocator->state.waiter, completion->result); 3405 completion->callback(completion); 3406 } 3407 3408 /* Perform an action on each of an allocator's slabs in parallel. */ 3409 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback) 3410 { 3411 struct slab_iterator iterator; 3412 3413 vdo_prepare_completion(&allocator->completion, slab_action_callback, 3414 handle_operation_error, allocator->thread_id, NULL); 3415 allocator->completion.requeue = false; 3416 3417 /* 3418 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so 3419 * clear it. 3420 */ 3421 allocator->open_slab = NULL; 3422 3423 /* Ensure that we don't finish before we're done starting. */ 3424 allocator->slab_actor = (struct slab_actor) { 3425 .slab_action_count = 1, 3426 .callback = callback, 3427 }; 3428 3429 iterator = get_slab_iterator(allocator); 3430 while (iterator.next != NULL) { 3431 const struct admin_state_code *operation = 3432 vdo_get_admin_state_code(&allocator->state); 3433 struct vdo_slab *slab = next_slab(&iterator); 3434 3435 list_del_init(&slab->allocq_entry); 3436 allocator->slab_actor.slab_action_count++; 3437 vdo_start_operation_with_waiter(&slab->state, operation, 3438 &allocator->completion, 3439 initiate_slab_action); 3440 } 3441 3442 slab_action_callback(&allocator->completion); 3443 } 3444 3445 static void finish_loading_allocator(struct vdo_completion *completion) 3446 { 3447 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3448 const struct admin_state_code *operation = 3449 vdo_get_admin_state_code(&allocator->state); 3450 3451 if (allocator->eraser != NULL) 3452 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser)); 3453 3454 if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) { 3455 void *context = 3456 vdo_get_current_action_context(allocator->depot->action_manager); 3457 3458 vdo_replay_into_slab_journals(allocator, context); 3459 return; 3460 } 3461 3462 vdo_finish_loading(&allocator->state); 3463 } 3464 3465 static void erase_next_slab_journal(struct block_allocator *allocator); 3466 3467 static void copy_callback(int read_err, unsigned long write_err, void *context) 3468 { 3469 struct block_allocator *allocator = context; 3470 int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO); 3471 3472 if (result != VDO_SUCCESS) { 3473 vdo_fail_completion(&allocator->completion, result); 3474 return; 3475 } 3476 3477 erase_next_slab_journal(allocator); 3478 } 3479 3480 /* erase_next_slab_journal() - Erase the next slab journal. */ 3481 static void erase_next_slab_journal(struct block_allocator *allocator) 3482 { 3483 struct vdo_slab *slab; 3484 physical_block_number_t pbn; 3485 struct dm_io_region regions[1]; 3486 struct slab_depot *depot = allocator->depot; 3487 block_count_t blocks = depot->slab_config.slab_journal_blocks; 3488 3489 if (allocator->slabs_to_erase.next == NULL) { 3490 vdo_finish_completion(&allocator->completion); 3491 return; 3492 } 3493 3494 slab = next_slab(&allocator->slabs_to_erase); 3495 pbn = slab->journal_origin - depot->vdo->geometry.bio_offset; 3496 regions[0] = (struct dm_io_region) { 3497 .bdev = vdo_get_backing_device(depot->vdo), 3498 .sector = pbn * VDO_SECTORS_PER_BLOCK, 3499 .count = blocks * VDO_SECTORS_PER_BLOCK, 3500 }; 3501 dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator); 3502 } 3503 3504 /* Implements vdo_admin_initiator_fn. */ 3505 static void initiate_load(struct admin_state *state) 3506 { 3507 struct block_allocator *allocator = 3508 container_of(state, struct block_allocator, state); 3509 const struct admin_state_code *operation = vdo_get_admin_state_code(state); 3510 3511 if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) { 3512 /* 3513 * Must requeue because the kcopyd client cannot be freed in the same stack frame 3514 * as the kcopyd callback, lest it deadlock. 3515 */ 3516 vdo_prepare_completion_for_requeue(&allocator->completion, 3517 finish_loading_allocator, 3518 handle_operation_error, 3519 allocator->thread_id, NULL); 3520 allocator->eraser = dm_kcopyd_client_create(NULL); 3521 if (IS_ERR(allocator->eraser)) { 3522 vdo_fail_completion(&allocator->completion, 3523 PTR_ERR(allocator->eraser)); 3524 allocator->eraser = NULL; 3525 return; 3526 } 3527 allocator->slabs_to_erase = get_slab_iterator(allocator); 3528 3529 erase_next_slab_journal(allocator); 3530 return; 3531 } 3532 3533 apply_to_slabs(allocator, finish_loading_allocator); 3534 } 3535 3536 /** 3537 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have 3538 * been recovered from the recovery journal. 3539 * @completion The allocator completion 3540 */ 3541 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion) 3542 { 3543 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3544 3545 vdo_finish_loading_with_result(&allocator->state, completion->result); 3546 } 3547 3548 static int get_slab_statuses(struct block_allocator *allocator, 3549 struct slab_status **statuses_ptr) 3550 { 3551 int result; 3552 struct slab_status *statuses; 3553 struct slab_iterator iterator = get_slab_iterator(allocator); 3554 3555 result = vdo_allocate(allocator->slab_count, struct slab_status, __func__, 3556 &statuses); 3557 if (result != VDO_SUCCESS) 3558 return result; 3559 3560 *statuses_ptr = statuses; 3561 3562 while (iterator.next != NULL) { 3563 slab_count_t slab_number = next_slab(&iterator)->slab_number; 3564 3565 *statuses++ = (struct slab_status) { 3566 .slab_number = slab_number, 3567 .is_clean = !allocator->summary_entries[slab_number].is_dirty, 3568 .emptiness = allocator->summary_entries[slab_number].fullness_hint, 3569 }; 3570 } 3571 3572 return VDO_SUCCESS; 3573 } 3574 3575 /* Prepare slabs for allocation or scrubbing. */ 3576 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator) 3577 { 3578 struct slab_status current_slab_status; 3579 DEFINE_MIN_HEAP(struct slab_status, heap) heap; 3580 int result; 3581 struct slab_status *slab_statuses; 3582 struct slab_depot *depot = allocator->depot; 3583 3584 WRITE_ONCE(allocator->allocated_blocks, 3585 allocator->slab_count * depot->slab_config.data_blocks); 3586 result = get_slab_statuses(allocator, &slab_statuses); 3587 if (result != VDO_SUCCESS) 3588 return result; 3589 3590 /* Sort the slabs by cleanliness, then by emptiness hint. */ 3591 heap = (struct heap) { 3592 .data = slab_statuses, 3593 .nr = allocator->slab_count, 3594 .size = allocator->slab_count, 3595 }; 3596 min_heapify_all(&heap, &slab_status_min_heap, NULL); 3597 3598 while (heap.nr > 0) { 3599 bool high_priority; 3600 struct vdo_slab *slab; 3601 struct slab_journal *journal; 3602 3603 current_slab_status = slab_statuses[0]; 3604 min_heap_pop(&heap, &slab_status_min_heap, NULL); 3605 slab = depot->slabs[current_slab_status.slab_number]; 3606 3607 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) || 3608 (!allocator->summary_entries[slab->slab_number].load_ref_counts && 3609 current_slab_status.is_clean)) { 3610 queue_slab(slab); 3611 continue; 3612 } 3613 3614 slab->status = VDO_SLAB_REQUIRES_SCRUBBING; 3615 journal = &slab->journal; 3616 high_priority = ((current_slab_status.is_clean && 3617 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) || 3618 (journal_length(journal) >= journal->scrubbing_threshold)); 3619 register_slab_for_scrubbing(slab, high_priority); 3620 } 3621 3622 vdo_free(slab_statuses); 3623 return VDO_SUCCESS; 3624 } 3625 3626 static const char *status_to_string(enum slab_rebuild_status status) 3627 { 3628 switch (status) { 3629 case VDO_SLAB_REBUILT: 3630 return "REBUILT"; 3631 case VDO_SLAB_REQUIRES_SCRUBBING: 3632 return "SCRUBBING"; 3633 case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING: 3634 return "PRIORITY_SCRUBBING"; 3635 case VDO_SLAB_REBUILDING: 3636 return "REBUILDING"; 3637 case VDO_SLAB_REPLAYING: 3638 return "REPLAYING"; 3639 default: 3640 return "UNKNOWN"; 3641 } 3642 } 3643 3644 void vdo_dump_block_allocator(const struct block_allocator *allocator) 3645 { 3646 unsigned int pause_counter = 0; 3647 struct slab_iterator iterator = get_slab_iterator(allocator); 3648 const struct slab_scrubber *scrubber = &allocator->scrubber; 3649 3650 vdo_log_info("block_allocator zone %u", allocator->zone_number); 3651 while (iterator.next != NULL) { 3652 struct vdo_slab *slab = next_slab(&iterator); 3653 struct slab_journal *journal = &slab->journal; 3654 3655 if (slab->reference_blocks != NULL) { 3656 /* Terse because there are a lot of slabs to dump and syslog is lossy. */ 3657 vdo_log_info("slab %u: P%u, %llu free", slab->slab_number, 3658 slab->priority, 3659 (unsigned long long) slab->free_blocks); 3660 } else { 3661 vdo_log_info("slab %u: status %s", slab->slab_number, 3662 status_to_string(slab->status)); 3663 } 3664 3665 vdo_log_info(" slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s", 3666 vdo_waitq_num_waiters(&journal->entry_waiters), 3667 vdo_bool_to_string(journal->waiting_to_commit), 3668 vdo_bool_to_string(journal->updating_slab_summary), 3669 (unsigned long long) journal->head, 3670 (unsigned long long) journal->unreapable, 3671 (unsigned long long) journal->tail, 3672 (unsigned long long) journal->next_commit, 3673 (unsigned long long) journal->summarized, 3674 (unsigned long long) journal->last_summarized, 3675 (unsigned long long) journal->recovery_lock, 3676 vdo_bool_to_string(journal->recovery_lock != 0)); 3677 /* 3678 * Given the frequency with which the locks are just a tiny bit off, it might be 3679 * worth dumping all the locks, but that might be too much logging. 3680 */ 3681 3682 if (slab->counters != NULL) { 3683 /* Terse because there are a lot of slabs to dump and syslog is lossy. */ 3684 vdo_log_info(" slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)", 3685 slab->free_blocks, slab->block_count, 3686 slab->reference_block_count, 3687 vdo_waitq_num_waiters(&slab->dirty_blocks), 3688 slab->active_count, 3689 (unsigned long long) slab->slab_journal_point.sequence_number, 3690 slab->slab_journal_point.entry_count); 3691 } else { 3692 vdo_log_info(" no counters"); 3693 } 3694 3695 /* 3696 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number, 3697 * allowing the kernel log a chance to be flushed instead of being overrun. 3698 */ 3699 if (pause_counter++ == 31) { 3700 pause_counter = 0; 3701 vdo_pause_for_logger(); 3702 } 3703 } 3704 3705 vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s", 3706 READ_ONCE(scrubber->slab_count), 3707 vdo_waitq_num_waiters(&scrubber->waiters), 3708 vdo_get_admin_state_code(&scrubber->admin_state)->name, 3709 scrubber->high_priority_only ? ", high_priority_only " : ""); 3710 } 3711 3712 static void free_slab(struct vdo_slab *slab) 3713 { 3714 if (slab == NULL) 3715 return; 3716 3717 list_del(&slab->allocq_entry); 3718 vdo_free(vdo_forget(slab->journal.block)); 3719 vdo_free(vdo_forget(slab->journal.locks)); 3720 vdo_free(vdo_forget(slab->counters)); 3721 vdo_free(vdo_forget(slab->reference_blocks)); 3722 vdo_free(slab); 3723 } 3724 3725 static int initialize_slab_journal(struct vdo_slab *slab) 3726 { 3727 struct slab_journal *journal = &slab->journal; 3728 const struct slab_config *slab_config = &slab->allocator->depot->slab_config; 3729 int result; 3730 3731 result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock, 3732 __func__, &journal->locks); 3733 if (result != VDO_SUCCESS) 3734 return result; 3735 3736 result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block", 3737 (char **) &journal->block); 3738 if (result != VDO_SUCCESS) 3739 return result; 3740 3741 journal->slab = slab; 3742 journal->size = slab_config->slab_journal_blocks; 3743 journal->flushing_threshold = slab_config->slab_journal_flushing_threshold; 3744 journal->blocking_threshold = slab_config->slab_journal_blocking_threshold; 3745 journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold; 3746 journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK; 3747 journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK; 3748 journal->events = &slab->allocator->slab_journal_statistics; 3749 journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal; 3750 journal->tail = 1; 3751 journal->head = 1; 3752 3753 journal->flushing_deadline = journal->flushing_threshold; 3754 /* 3755 * Set there to be some time between the deadline and the blocking threshold, so that 3756 * hopefully all are done before blocking. 3757 */ 3758 if ((journal->blocking_threshold - journal->flushing_threshold) > 5) 3759 journal->flushing_deadline = journal->blocking_threshold - 5; 3760 3761 journal->slab_summary_waiter.callback = release_journal_locks; 3762 3763 INIT_LIST_HEAD(&journal->dirty_entry); 3764 INIT_LIST_HEAD(&journal->uncommitted_blocks); 3765 3766 journal->tail_header.nonce = slab->allocator->nonce; 3767 journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL; 3768 initialize_journal_state(journal); 3769 return VDO_SUCCESS; 3770 } 3771 3772 /** 3773 * make_slab() - Construct a new, empty slab. 3774 * @slab_origin: The physical block number within the block allocator partition of the first block 3775 * in the slab. 3776 * @allocator: The block allocator to which the slab belongs. 3777 * @slab_number: The slab number of the slab. 3778 * @is_new: true if this slab is being allocated as part of a resize. 3779 * @slab_ptr: A pointer to receive the new slab. 3780 * 3781 * Return: VDO_SUCCESS or an error code. 3782 */ 3783 static int __must_check make_slab(physical_block_number_t slab_origin, 3784 struct block_allocator *allocator, 3785 slab_count_t slab_number, bool is_new, 3786 struct vdo_slab **slab_ptr) 3787 { 3788 const struct slab_config *slab_config = &allocator->depot->slab_config; 3789 struct vdo_slab *slab; 3790 int result; 3791 3792 result = vdo_allocate(1, struct vdo_slab, __func__, &slab); 3793 if (result != VDO_SUCCESS) 3794 return result; 3795 3796 *slab = (struct vdo_slab) { 3797 .allocator = allocator, 3798 .start = slab_origin, 3799 .end = slab_origin + slab_config->slab_blocks, 3800 .slab_number = slab_number, 3801 .ref_counts_origin = slab_origin + slab_config->data_blocks, 3802 .journal_origin = 3803 vdo_get_slab_journal_start_block(slab_config, slab_origin), 3804 .block_count = slab_config->data_blocks, 3805 .free_blocks = slab_config->data_blocks, 3806 .reference_block_count = 3807 vdo_get_saved_reference_count_size(slab_config->data_blocks), 3808 }; 3809 INIT_LIST_HEAD(&slab->allocq_entry); 3810 3811 result = initialize_slab_journal(slab); 3812 if (result != VDO_SUCCESS) { 3813 free_slab(slab); 3814 return result; 3815 } 3816 3817 if (is_new) { 3818 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW); 3819 result = allocate_slab_counters(slab); 3820 if (result != VDO_SUCCESS) { 3821 free_slab(slab); 3822 return result; 3823 } 3824 } else { 3825 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 3826 } 3827 3828 *slab_ptr = slab; 3829 return VDO_SUCCESS; 3830 } 3831 3832 /** 3833 * allocate_slabs() - Allocate a new slab pointer array. 3834 * @depot: The depot. 3835 * @slab_count: The number of slabs the depot should have in the new array. 3836 * 3837 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as 3838 * needed. The newly allocated slabs will not be distributed for use by the block allocators. 3839 * 3840 * Return: VDO_SUCCESS or an error code. 3841 */ 3842 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count) 3843 { 3844 block_count_t slab_size; 3845 bool resizing = false; 3846 physical_block_number_t slab_origin; 3847 int result; 3848 3849 result = vdo_allocate(slab_count, struct vdo_slab *, 3850 "slab pointer array", &depot->new_slabs); 3851 if (result != VDO_SUCCESS) 3852 return result; 3853 3854 if (depot->slabs != NULL) { 3855 memcpy(depot->new_slabs, depot->slabs, 3856 depot->slab_count * sizeof(struct vdo_slab *)); 3857 resizing = true; 3858 } 3859 3860 slab_size = depot->slab_config.slab_blocks; 3861 slab_origin = depot->first_block + (depot->slab_count * slab_size); 3862 3863 for (depot->new_slab_count = depot->slab_count; 3864 depot->new_slab_count < slab_count; 3865 depot->new_slab_count++, slab_origin += slab_size) { 3866 struct block_allocator *allocator = 3867 &depot->allocators[depot->new_slab_count % depot->zone_count]; 3868 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count]; 3869 3870 result = make_slab(slab_origin, allocator, depot->new_slab_count, 3871 resizing, slab_ptr); 3872 if (result != VDO_SUCCESS) 3873 return result; 3874 } 3875 3876 return VDO_SUCCESS; 3877 } 3878 3879 /** 3880 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed. 3881 * @depot: The depot. 3882 */ 3883 void vdo_abandon_new_slabs(struct slab_depot *depot) 3884 { 3885 slab_count_t i; 3886 3887 if (depot->new_slabs == NULL) 3888 return; 3889 3890 for (i = depot->slab_count; i < depot->new_slab_count; i++) 3891 free_slab(vdo_forget(depot->new_slabs[i])); 3892 depot->new_slab_count = 0; 3893 depot->new_size = 0; 3894 vdo_free(vdo_forget(depot->new_slabs)); 3895 } 3896 3897 /** 3898 * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates. 3899 * 3900 * Implements vdo_zone_thread_getter_fn. 3901 */ 3902 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number) 3903 { 3904 return ((struct slab_depot *) context)->allocators[zone_number].thread_id; 3905 } 3906 3907 /** 3908 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock 3909 * it may hold on a specified recovery journal block. 3910 * @journal: The slab journal. 3911 * @recovery_lock: The sequence number of the recovery journal block whose locks should be 3912 * released. 3913 * 3914 * Return: true if the journal does hold a lock on the specified block (which it will release). 3915 */ 3916 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal, 3917 sequence_number_t recovery_lock) 3918 { 3919 if (recovery_lock > journal->recovery_lock) { 3920 VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock), 3921 "slab journal recovery lock is not older than the recovery journal head"); 3922 return false; 3923 } 3924 3925 if ((recovery_lock < journal->recovery_lock) || 3926 vdo_is_read_only(journal->slab->allocator->depot->vdo)) 3927 return false; 3928 3929 /* All locks are held by the block which is in progress; write it. */ 3930 commit_tail(journal); 3931 return true; 3932 } 3933 3934 /* 3935 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot 3936 * is seeking to release. 3937 * 3938 * Implements vdo_zone_action_fn. 3939 */ 3940 static void release_tail_block_locks(void *context, zone_count_t zone_number, 3941 struct vdo_completion *parent) 3942 { 3943 struct slab_journal *journal, *tmp; 3944 struct slab_depot *depot = context; 3945 struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals; 3946 3947 list_for_each_entry_safe(journal, tmp, list, dirty_entry) { 3948 if (!release_recovery_journal_lock(journal, 3949 depot->active_release_request)) 3950 break; 3951 } 3952 3953 vdo_finish_completion(parent); 3954 } 3955 3956 /** 3957 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks. 3958 * 3959 * Implements vdo_action_preamble_fn. 3960 */ 3961 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent) 3962 { 3963 struct slab_depot *depot = context; 3964 3965 depot->active_release_request = depot->new_release_request; 3966 vdo_finish_completion(parent); 3967 } 3968 3969 /** 3970 * schedule_tail_block_commit() - Schedule a tail block commit if necessary. 3971 * 3972 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the 3973 * depot's action manager. 3974 * 3975 * Implements vdo_action_scheduler_fn. 3976 */ 3977 static bool schedule_tail_block_commit(void *context) 3978 { 3979 struct slab_depot *depot = context; 3980 3981 if (depot->new_release_request == depot->active_release_request) 3982 return false; 3983 3984 return vdo_schedule_action(depot->action_manager, 3985 prepare_for_tail_block_commit, 3986 release_tail_block_locks, 3987 NULL, NULL); 3988 } 3989 3990 /** 3991 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber. 3992 * @allocator: The allocator being initialized 3993 * 3994 * Return: VDO_SUCCESS or an error. 3995 */ 3996 static int initialize_slab_scrubber(struct block_allocator *allocator) 3997 { 3998 struct slab_scrubber *scrubber = &allocator->scrubber; 3999 block_count_t slab_journal_size = 4000 allocator->depot->slab_config.slab_journal_blocks; 4001 char *journal_data; 4002 int result; 4003 4004 result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size, 4005 char, __func__, &journal_data); 4006 if (result != VDO_SUCCESS) 4007 return result; 4008 4009 result = allocate_vio_components(allocator->completion.vdo, 4010 VIO_TYPE_SLAB_JOURNAL, 4011 VIO_PRIORITY_METADATA, 4012 allocator, slab_journal_size, 4013 journal_data, &scrubber->vio); 4014 if (result != VDO_SUCCESS) { 4015 vdo_free(journal_data); 4016 return result; 4017 } 4018 4019 INIT_LIST_HEAD(&scrubber->high_priority_slabs); 4020 INIT_LIST_HEAD(&scrubber->slabs); 4021 vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED); 4022 return VDO_SUCCESS; 4023 } 4024 4025 /** 4026 * initialize_slab_summary_block() - Initialize a slab_summary_block. 4027 * @allocator: The allocator which owns the block. 4028 * @index: The index of this block in its zone's summary. 4029 * 4030 * Return: VDO_SUCCESS or an error. 4031 */ 4032 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator, 4033 block_count_t index) 4034 { 4035 struct slab_summary_block *block = &allocator->summary_blocks[index]; 4036 int result; 4037 4038 result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries); 4039 if (result != VDO_SUCCESS) 4040 return result; 4041 4042 result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY, 4043 VIO_PRIORITY_METADATA, NULL, 1, 4044 block->outgoing_entries, &block->vio); 4045 if (result != VDO_SUCCESS) 4046 return result; 4047 4048 block->allocator = allocator; 4049 block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index]; 4050 block->index = index; 4051 return VDO_SUCCESS; 4052 } 4053 4054 static int __must_check initialize_block_allocator(struct slab_depot *depot, 4055 zone_count_t zone) 4056 { 4057 int result; 4058 block_count_t i; 4059 struct block_allocator *allocator = &depot->allocators[zone]; 4060 struct vdo *vdo = depot->vdo; 4061 block_count_t max_free_blocks = depot->slab_config.data_blocks; 4062 unsigned int max_priority = (2 + ilog2(max_free_blocks)); 4063 u32 reference_block_count, refcount_reads_needed, refcount_blocks_per_vio; 4064 4065 *allocator = (struct block_allocator) { 4066 .depot = depot, 4067 .zone_number = zone, 4068 .thread_id = vdo->thread_config.physical_threads[zone], 4069 .nonce = vdo->states.vdo.nonce, 4070 }; 4071 4072 INIT_LIST_HEAD(&allocator->dirty_slab_journals); 4073 vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 4074 result = vdo_register_read_only_listener(vdo, allocator, 4075 notify_block_allocator_of_read_only_mode, 4076 allocator->thread_id); 4077 if (result != VDO_SUCCESS) 4078 return result; 4079 4080 vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION); 4081 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, 1, allocator->thread_id, 4082 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA, 4083 allocator, &allocator->vio_pool); 4084 if (result != VDO_SUCCESS) 4085 return result; 4086 4087 /* Initialize the refcount-reading vio pool. */ 4088 reference_block_count = vdo_get_saved_reference_count_size(depot->slab_config.slab_blocks); 4089 refcount_reads_needed = DIV_ROUND_UP(reference_block_count, MAX_BLOCKS_PER_VIO); 4090 refcount_blocks_per_vio = DIV_ROUND_UP(reference_block_count, refcount_reads_needed); 4091 allocator->refcount_blocks_per_big_vio = refcount_blocks_per_vio; 4092 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_REFCOUNT_VIO_POOL_SIZE, 4093 allocator->refcount_blocks_per_big_vio, allocator->thread_id, 4094 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA, 4095 NULL, &allocator->refcount_big_vio_pool); 4096 if (result != VDO_SUCCESS) 4097 return result; 4098 4099 result = initialize_slab_scrubber(allocator); 4100 if (result != VDO_SUCCESS) 4101 return result; 4102 4103 result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs); 4104 if (result != VDO_SUCCESS) 4105 return result; 4106 4107 result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE, 4108 struct slab_summary_block, __func__, 4109 &allocator->summary_blocks); 4110 if (result != VDO_SUCCESS) 4111 return result; 4112 4113 vdo_set_admin_state_code(&allocator->summary_state, 4114 VDO_ADMIN_STATE_NORMAL_OPERATION); 4115 allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone); 4116 4117 /* Initialize each summary block. */ 4118 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) { 4119 result = initialize_slab_summary_block(allocator, i); 4120 if (result != VDO_SUCCESS) 4121 return result; 4122 } 4123 4124 /* 4125 * Performing well atop thin provisioned storage requires either that VDO discards freed 4126 * blocks, or that the block allocator try to use slabs that already have allocated blocks 4127 * in preference to slabs that have never been opened. For reasons we have not been able to 4128 * fully understand, some SSD machines have been have been very sensitive (50% reduction in 4129 * test throughput) to very slight differences in the timing and locality of block 4130 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be 4131 * ideal for the story, but anything less than a very high threshold (max_priority - 1) 4132 * hurts on these machines. 4133 * 4134 * This sets the free block threshold for preferring to open an unopened slab to the binary 4135 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate 4136 * to about half the slab size. 4137 */ 4138 allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4)); 4139 4140 return VDO_SUCCESS; 4141 } 4142 4143 static int allocate_components(struct slab_depot *depot, 4144 struct partition *summary_partition) 4145 { 4146 int result; 4147 zone_count_t zone; 4148 slab_count_t slab_count; 4149 u8 hint; 4150 u32 i; 4151 const struct thread_config *thread_config = &depot->vdo->thread_config; 4152 4153 result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id, 4154 thread_config->journal_thread, depot, 4155 schedule_tail_block_commit, 4156 depot->vdo, &depot->action_manager); 4157 if (result != VDO_SUCCESS) 4158 return result; 4159 4160 depot->origin = depot->first_block; 4161 4162 /* block size must be a multiple of entry size */ 4163 BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0); 4164 4165 depot->summary_origin = summary_partition->offset; 4166 depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift); 4167 result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES, 4168 struct slab_summary_entry, __func__, 4169 &depot->summary_entries); 4170 if (result != VDO_SUCCESS) 4171 return result; 4172 4173 4174 /* Initialize all the entries. */ 4175 hint = compute_fullness_hint(depot, depot->slab_config.data_blocks); 4176 for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) { 4177 /* 4178 * This default tail block offset must be reflected in 4179 * slabJournal.c::read_slab_journal_tail(). 4180 */ 4181 depot->summary_entries[i] = (struct slab_summary_entry) { 4182 .tail_block_offset = 0, 4183 .fullness_hint = hint, 4184 .load_ref_counts = false, 4185 .is_dirty = false, 4186 }; 4187 } 4188 4189 slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block, 4190 depot->slab_size_shift); 4191 if (thread_config->physical_zone_count > slab_count) { 4192 return vdo_log_error_strerror(VDO_BAD_CONFIGURATION, 4193 "%u physical zones exceeds slab count %u", 4194 thread_config->physical_zone_count, 4195 slab_count); 4196 } 4197 4198 /* Initialize the block allocators. */ 4199 for (zone = 0; zone < depot->zone_count; zone++) { 4200 result = initialize_block_allocator(depot, zone); 4201 if (result != VDO_SUCCESS) 4202 return result; 4203 } 4204 4205 /* Allocate slabs. */ 4206 result = allocate_slabs(depot, slab_count); 4207 if (result != VDO_SUCCESS) 4208 return result; 4209 4210 /* Use the new slabs. */ 4211 for (i = depot->slab_count; i < depot->new_slab_count; i++) { 4212 struct vdo_slab *slab = depot->new_slabs[i]; 4213 4214 register_slab_with_allocator(slab->allocator, slab); 4215 WRITE_ONCE(depot->slab_count, depot->slab_count + 1); 4216 } 4217 4218 depot->slabs = depot->new_slabs; 4219 depot->new_slabs = NULL; 4220 depot->new_slab_count = 0; 4221 4222 return VDO_SUCCESS; 4223 } 4224 4225 /** 4226 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super 4227 * block. 4228 * @state: The slab depot state from the super block. 4229 * @vdo: The VDO which will own the depot. 4230 * @summary_partition: The partition which holds the slab summary. 4231 * @depot_ptr: A pointer to hold the depot. 4232 * 4233 * Return: A success or error code. 4234 */ 4235 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo, 4236 struct partition *summary_partition, 4237 struct slab_depot **depot_ptr) 4238 { 4239 unsigned int slab_size_shift; 4240 struct slab_depot *depot; 4241 int result; 4242 4243 /* 4244 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift 4245 * requires that the slab size be a power of two. 4246 */ 4247 block_count_t slab_size = state.slab_config.slab_blocks; 4248 4249 if (!is_power_of_2(slab_size)) { 4250 return vdo_log_error_strerror(UDS_INVALID_ARGUMENT, 4251 "slab size must be a power of two"); 4252 } 4253 slab_size_shift = ilog2(slab_size); 4254 4255 result = vdo_allocate_extended(struct slab_depot, 4256 vdo->thread_config.physical_zone_count, 4257 struct block_allocator, __func__, &depot); 4258 if (result != VDO_SUCCESS) 4259 return result; 4260 4261 depot->vdo = vdo; 4262 depot->old_zone_count = state.zone_count; 4263 depot->zone_count = vdo->thread_config.physical_zone_count; 4264 depot->slab_config = state.slab_config; 4265 depot->first_block = state.first_block; 4266 depot->last_block = state.last_block; 4267 depot->slab_size_shift = slab_size_shift; 4268 4269 result = allocate_components(depot, summary_partition); 4270 if (result != VDO_SUCCESS) { 4271 vdo_free_slab_depot(depot); 4272 return result; 4273 } 4274 4275 *depot_ptr = depot; 4276 return VDO_SUCCESS; 4277 } 4278 4279 static void uninitialize_allocator_summary(struct block_allocator *allocator) 4280 { 4281 block_count_t i; 4282 4283 if (allocator->summary_blocks == NULL) 4284 return; 4285 4286 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) { 4287 free_vio_components(&allocator->summary_blocks[i].vio); 4288 vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries)); 4289 } 4290 4291 vdo_free(vdo_forget(allocator->summary_blocks)); 4292 } 4293 4294 /** 4295 * vdo_free_slab_depot() - Destroy a slab depot. 4296 * @depot: The depot to destroy. 4297 */ 4298 void vdo_free_slab_depot(struct slab_depot *depot) 4299 { 4300 zone_count_t zone = 0; 4301 4302 if (depot == NULL) 4303 return; 4304 4305 vdo_abandon_new_slabs(depot); 4306 4307 for (zone = 0; zone < depot->zone_count; zone++) { 4308 struct block_allocator *allocator = &depot->allocators[zone]; 4309 4310 if (allocator->eraser != NULL) 4311 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser)); 4312 4313 uninitialize_allocator_summary(allocator); 4314 uninitialize_scrubber_vio(&allocator->scrubber); 4315 free_vio_pool(vdo_forget(allocator->vio_pool)); 4316 free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool)); 4317 vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs)); 4318 } 4319 4320 if (depot->slabs != NULL) { 4321 slab_count_t i; 4322 4323 for (i = 0; i < depot->slab_count; i++) 4324 free_slab(vdo_forget(depot->slabs[i])); 4325 } 4326 4327 vdo_free(vdo_forget(depot->slabs)); 4328 vdo_free(vdo_forget(depot->action_manager)); 4329 vdo_free(vdo_forget(depot->summary_entries)); 4330 vdo_free(depot); 4331 } 4332 4333 /** 4334 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block. 4335 * @depot: The depot to encode. 4336 * 4337 * Return: The depot state. 4338 */ 4339 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot) 4340 { 4341 /* 4342 * If this depot is currently using 0 zones, it must have been synchronously loaded by a 4343 * tool and is now being saved. We did not load and combine the slab summary, so we still 4344 * need to do that next time we load with the old zone count rather than 0. 4345 */ 4346 struct slab_depot_state_2_0 state; 4347 zone_count_t zones_to_record = depot->zone_count; 4348 4349 if (depot->zone_count == 0) 4350 zones_to_record = depot->old_zone_count; 4351 4352 state = (struct slab_depot_state_2_0) { 4353 .slab_config = depot->slab_config, 4354 .first_block = depot->first_block, 4355 .last_block = depot->last_block, 4356 .zone_count = zones_to_record, 4357 }; 4358 4359 return state; 4360 } 4361 4362 /** 4363 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot. 4364 * 4365 * Context: This method may be called only before entering normal operation from the load thread. 4366 * 4367 * Return: VDO_SUCCESS or an error. 4368 */ 4369 int vdo_allocate_reference_counters(struct slab_depot *depot) 4370 { 4371 struct slab_iterator iterator = 4372 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1); 4373 4374 while (iterator.next != NULL) { 4375 int result = allocate_slab_counters(next_slab(&iterator)); 4376 4377 if (result != VDO_SUCCESS) 4378 return result; 4379 } 4380 4381 return VDO_SUCCESS; 4382 } 4383 4384 /** 4385 * get_slab_number() - Get the number of the slab that contains a specified block. 4386 * @depot: The slab depot. 4387 * @pbn: The physical block number. 4388 * @slab_number_ptr: A pointer to hold the slab number. 4389 * 4390 * Return: VDO_SUCCESS or an error. 4391 */ 4392 static int __must_check get_slab_number(const struct slab_depot *depot, 4393 physical_block_number_t pbn, 4394 slab_count_t *slab_number_ptr) 4395 { 4396 slab_count_t slab_number; 4397 4398 if (pbn < depot->first_block) 4399 return VDO_OUT_OF_RANGE; 4400 4401 slab_number = (pbn - depot->first_block) >> depot->slab_size_shift; 4402 if (slab_number >= depot->slab_count) 4403 return VDO_OUT_OF_RANGE; 4404 4405 *slab_number_ptr = slab_number; 4406 return VDO_SUCCESS; 4407 } 4408 4409 /** 4410 * vdo_get_slab() - Get the slab object for the slab that contains a specified block. 4411 * @depot: The slab depot. 4412 * @pbn: The physical block number. 4413 * 4414 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block. 4415 * 4416 * Return: The slab containing the block, or NULL if the block number is the zero block or 4417 * otherwise out of range. 4418 */ 4419 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot, 4420 physical_block_number_t pbn) 4421 { 4422 slab_count_t slab_number; 4423 int result; 4424 4425 if (pbn == VDO_ZERO_BLOCK) 4426 return NULL; 4427 4428 result = get_slab_number(depot, pbn, &slab_number); 4429 if (result != VDO_SUCCESS) { 4430 vdo_enter_read_only_mode(depot->vdo, result); 4431 return NULL; 4432 } 4433 4434 return depot->slabs[slab_number]; 4435 } 4436 4437 /** 4438 * vdo_get_increment_limit() - Determine how many new references a block can acquire. 4439 * @depot: The slab depot. 4440 * @pbn: The physical block number that is being queried. 4441 * 4442 * Context: This method must be called from the physical zone thread of the PBN. 4443 * 4444 * Return: The number of available references. 4445 */ 4446 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn) 4447 { 4448 struct vdo_slab *slab = vdo_get_slab(depot, pbn); 4449 vdo_refcount_t *counter_ptr = NULL; 4450 int result; 4451 4452 if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT)) 4453 return 0; 4454 4455 result = get_reference_counter(slab, pbn, &counter_ptr); 4456 if (result != VDO_SUCCESS) 4457 return 0; 4458 4459 if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT) 4460 return (MAXIMUM_REFERENCE_COUNT - 1); 4461 4462 return (MAXIMUM_REFERENCE_COUNT - *counter_ptr); 4463 } 4464 4465 /** 4466 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block. 4467 * @depot: The depot. 4468 * @pbn: The physical block number to ask about. 4469 * 4470 * Return: True if the PBN corresponds to a data block. 4471 */ 4472 bool vdo_is_physical_data_block(const struct slab_depot *depot, 4473 physical_block_number_t pbn) 4474 { 4475 slab_count_t slab_number; 4476 slab_block_number sbn; 4477 4478 return ((pbn == VDO_ZERO_BLOCK) || 4479 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) && 4480 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) == 4481 VDO_SUCCESS))); 4482 } 4483 4484 /** 4485 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all 4486 * the slabs in the depot. 4487 * @depot: The slab depot. 4488 * 4489 * This is the total number of blocks with a non-zero reference count. 4490 * 4491 * Context: This may be called from any thread. 4492 * 4493 * Return: The total number of blocks with a non-zero reference count. 4494 */ 4495 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot) 4496 { 4497 block_count_t total = 0; 4498 zone_count_t zone; 4499 4500 for (zone = 0; zone < depot->zone_count; zone++) { 4501 /* The allocators are responsible for thread safety. */ 4502 total += READ_ONCE(depot->allocators[zone].allocated_blocks); 4503 } 4504 4505 return total; 4506 } 4507 4508 /** 4509 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the 4510 * depot. 4511 * @depot: The slab depot. 4512 * 4513 * Context: This may be called from any thread. 4514 * 4515 * Return: The total number of data blocks in all slabs. 4516 */ 4517 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot) 4518 { 4519 return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks); 4520 } 4521 4522 /** 4523 * finish_combining_zones() - Clean up after saving out the combined slab summary. 4524 * @completion: The vio which was used to write the summary data. 4525 */ 4526 static void finish_combining_zones(struct vdo_completion *completion) 4527 { 4528 int result = completion->result; 4529 struct vdo_completion *parent = completion->parent; 4530 4531 free_vio(as_vio(vdo_forget(completion))); 4532 vdo_fail_completion(parent, result); 4533 } 4534 4535 static void handle_combining_error(struct vdo_completion *completion) 4536 { 4537 vio_record_metadata_io_error(as_vio(completion)); 4538 finish_combining_zones(completion); 4539 } 4540 4541 static void write_summary_endio(struct bio *bio) 4542 { 4543 struct vio *vio = bio->bi_private; 4544 struct vdo *vdo = vio->completion.vdo; 4545 4546 continue_vio_after_io(vio, finish_combining_zones, 4547 vdo->thread_config.admin_thread); 4548 } 4549 4550 /** 4551 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones, 4552 * update every zone to the correct values for every slab. 4553 * @depot: The depot whose summary entries should be combined. 4554 */ 4555 static void combine_summaries(struct slab_depot *depot) 4556 { 4557 /* 4558 * Combine all the old summary data into the portion of the buffer corresponding to the 4559 * first zone. 4560 */ 4561 zone_count_t zone = 0; 4562 struct slab_summary_entry *entries = depot->summary_entries; 4563 4564 if (depot->old_zone_count > 1) { 4565 slab_count_t entry_number; 4566 4567 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) { 4568 if (zone != 0) { 4569 memcpy(entries + entry_number, 4570 entries + (zone * MAX_VDO_SLABS) + entry_number, 4571 sizeof(struct slab_summary_entry)); 4572 } 4573 4574 zone++; 4575 if (zone == depot->old_zone_count) 4576 zone = 0; 4577 } 4578 } 4579 4580 /* Copy the combined data to each zones's region of the buffer. */ 4581 for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) { 4582 memcpy(entries + (zone * MAX_VDO_SLABS), entries, 4583 MAX_VDO_SLABS * sizeof(struct slab_summary_entry)); 4584 } 4585 } 4586 4587 /** 4588 * finish_loading_summary() - Finish loading slab summary data. 4589 * @completion: The vio which was used to read the summary data. 4590 * 4591 * Combines the slab summary data from all the previously written zones and copies the combined 4592 * summary to each partition's data region. Then writes the combined summary back out to disk. This 4593 * callback is registered in load_summary_endio(). 4594 */ 4595 static void finish_loading_summary(struct vdo_completion *completion) 4596 { 4597 struct slab_depot *depot = completion->vdo->depot; 4598 4599 /* Combine the summary from each zone so each zone is correct for all slabs. */ 4600 combine_summaries(depot); 4601 4602 /* Write the combined summary back out. */ 4603 vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin, 4604 write_summary_endio, handle_combining_error, 4605 REQ_OP_WRITE); 4606 } 4607 4608 static void load_summary_endio(struct bio *bio) 4609 { 4610 struct vio *vio = bio->bi_private; 4611 struct vdo *vdo = vio->completion.vdo; 4612 4613 continue_vio_after_io(vio, finish_loading_summary, 4614 vdo->thread_config.admin_thread); 4615 } 4616 4617 /** 4618 * load_slab_summary() - The preamble of a load operation. 4619 * 4620 * Implements vdo_action_preamble_fn. 4621 */ 4622 static void load_slab_summary(void *context, struct vdo_completion *parent) 4623 { 4624 int result; 4625 struct vio *vio; 4626 struct slab_depot *depot = context; 4627 const struct admin_state_code *operation = 4628 vdo_get_current_manager_operation(depot->action_manager); 4629 4630 result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY, 4631 VIO_PRIORITY_METADATA, parent, 4632 VDO_SLAB_SUMMARY_BLOCKS, 4633 (char *) depot->summary_entries, &vio); 4634 if (result != VDO_SUCCESS) { 4635 vdo_fail_completion(parent, result); 4636 return; 4637 } 4638 4639 if ((operation == VDO_ADMIN_STATE_FORMATTING) || 4640 (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) { 4641 finish_loading_summary(&vio->completion); 4642 return; 4643 } 4644 4645 vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio, 4646 handle_combining_error, REQ_OP_READ); 4647 } 4648 4649 /* Implements vdo_zone_action_fn. */ 4650 static void load_allocator(void *context, zone_count_t zone_number, 4651 struct vdo_completion *parent) 4652 { 4653 struct slab_depot *depot = context; 4654 4655 vdo_start_loading(&depot->allocators[zone_number].state, 4656 vdo_get_current_manager_operation(depot->action_manager), 4657 parent, initiate_load); 4658 } 4659 4660 /** 4661 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the 4662 * super_block component. 4663 * @depot: The depot to load. 4664 * @operation: The type of load to perform. 4665 * @parent: The completion to notify when the load is complete. 4666 * @context: Additional context for the load operation; may be NULL. 4667 * 4668 * This method may be called only before entering normal operation from the load thread. 4669 */ 4670 void vdo_load_slab_depot(struct slab_depot *depot, 4671 const struct admin_state_code *operation, 4672 struct vdo_completion *parent, void *context) 4673 { 4674 if (!vdo_assert_load_operation(operation, parent)) 4675 return; 4676 4677 vdo_schedule_operation_with_context(depot->action_manager, operation, 4678 load_slab_summary, load_allocator, 4679 NULL, context, parent); 4680 } 4681 4682 /* Implements vdo_zone_action_fn. */ 4683 static void prepare_to_allocate(void *context, zone_count_t zone_number, 4684 struct vdo_completion *parent) 4685 { 4686 struct slab_depot *depot = context; 4687 struct block_allocator *allocator = &depot->allocators[zone_number]; 4688 int result; 4689 4690 result = vdo_prepare_slabs_for_allocation(allocator); 4691 if (result != VDO_SUCCESS) { 4692 vdo_fail_completion(parent, result); 4693 return; 4694 } 4695 4696 scrub_slabs(allocator, parent); 4697 } 4698 4699 /** 4700 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start 4701 * allocating blocks. 4702 * @depot: The depot to prepare. 4703 * @load_type: The load type. 4704 * @parent: The completion to notify when the operation is complete. 4705 * 4706 * This method may be called only before entering normal operation from the load thread. It must be 4707 * called before allocation may proceed. 4708 */ 4709 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot, 4710 enum slab_depot_load_type load_type, 4711 struct vdo_completion *parent) 4712 { 4713 depot->load_type = load_type; 4714 atomic_set(&depot->zones_to_scrub, depot->zone_count); 4715 vdo_schedule_action(depot->action_manager, NULL, 4716 prepare_to_allocate, NULL, parent); 4717 } 4718 4719 /** 4720 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory. 4721 * @depot: The depot to update. 4722 * 4723 * This size is saved to disk as part of the super block. 4724 */ 4725 void vdo_update_slab_depot_size(struct slab_depot *depot) 4726 { 4727 depot->last_block = depot->new_last_block; 4728 } 4729 4730 /** 4731 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to 4732 * the given size. 4733 * @depot: The depot to prepare to resize. 4734 * @partition: The new depot partition 4735 * 4736 * Return: VDO_SUCCESS or an error. 4737 */ 4738 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot, 4739 const struct partition *partition) 4740 { 4741 struct slab_depot_state_2_0 new_state; 4742 int result; 4743 slab_count_t new_slab_count; 4744 4745 if ((partition->count >> depot->slab_size_shift) <= depot->slab_count) 4746 return VDO_INCREMENT_TOO_SMALL; 4747 4748 /* Generate the depot configuration for the new block count. */ 4749 VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset, 4750 "New slab depot partition doesn't change origin"); 4751 result = vdo_configure_slab_depot(partition, depot->slab_config, 4752 depot->zone_count, &new_state); 4753 if (result != VDO_SUCCESS) 4754 return result; 4755 4756 new_slab_count = vdo_compute_slab_count(depot->first_block, 4757 new_state.last_block, 4758 depot->slab_size_shift); 4759 if (new_slab_count <= depot->slab_count) 4760 return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL, 4761 "Depot can only grow"); 4762 if (new_slab_count == depot->new_slab_count) { 4763 /* Check it out, we've already got all the new slabs allocated! */ 4764 return VDO_SUCCESS; 4765 } 4766 4767 vdo_abandon_new_slabs(depot); 4768 result = allocate_slabs(depot, new_slab_count); 4769 if (result != VDO_SUCCESS) { 4770 vdo_abandon_new_slabs(depot); 4771 return result; 4772 } 4773 4774 depot->new_size = partition->count; 4775 depot->old_last_block = depot->last_block; 4776 depot->new_last_block = new_state.last_block; 4777 4778 return VDO_SUCCESS; 4779 } 4780 4781 /** 4782 * finish_registration() - Finish registering new slabs now that all of the allocators have 4783 * received their new slabs. 4784 * 4785 * Implements vdo_action_conclusion_fn. 4786 */ 4787 static int finish_registration(void *context) 4788 { 4789 struct slab_depot *depot = context; 4790 4791 WRITE_ONCE(depot->slab_count, depot->new_slab_count); 4792 vdo_free(depot->slabs); 4793 depot->slabs = depot->new_slabs; 4794 depot->new_slabs = NULL; 4795 depot->new_slab_count = 0; 4796 return VDO_SUCCESS; 4797 } 4798 4799 /* Implements vdo_zone_action_fn. */ 4800 static void register_new_slabs(void *context, zone_count_t zone_number, 4801 struct vdo_completion *parent) 4802 { 4803 struct slab_depot *depot = context; 4804 struct block_allocator *allocator = &depot->allocators[zone_number]; 4805 slab_count_t i; 4806 4807 for (i = depot->slab_count; i < depot->new_slab_count; i++) { 4808 struct vdo_slab *slab = depot->new_slabs[i]; 4809 4810 if (slab->allocator == allocator) 4811 register_slab_with_allocator(allocator, slab); 4812 } 4813 4814 vdo_finish_completion(parent); 4815 } 4816 4817 /** 4818 * vdo_use_new_slabs() - Use the new slabs allocated for resize. 4819 * @depot: The depot. 4820 * @parent: The object to notify when complete. 4821 */ 4822 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent) 4823 { 4824 VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use"); 4825 vdo_schedule_operation(depot->action_manager, 4826 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 4827 NULL, register_new_slabs, 4828 finish_registration, parent); 4829 } 4830 4831 /** 4832 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is 4833 * currently working on. 4834 * @allocator: The block allocator owning the scrubber to stop. 4835 */ 4836 static void stop_scrubbing(struct block_allocator *allocator) 4837 { 4838 struct slab_scrubber *scrubber = &allocator->scrubber; 4839 4840 if (vdo_is_state_quiescent(&scrubber->admin_state)) { 4841 vdo_finish_completion(&allocator->completion); 4842 } else { 4843 vdo_start_draining(&scrubber->admin_state, 4844 VDO_ADMIN_STATE_SUSPENDING, 4845 &allocator->completion, NULL); 4846 } 4847 } 4848 4849 /* Implements vdo_admin_initiator_fn. */ 4850 static void initiate_summary_drain(struct admin_state *state) 4851 { 4852 check_summary_drain_complete(container_of(state, struct block_allocator, 4853 summary_state)); 4854 } 4855 4856 static void do_drain_step(struct vdo_completion *completion) 4857 { 4858 struct block_allocator *allocator = vdo_as_block_allocator(completion); 4859 4860 vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step, 4861 handle_operation_error, allocator->thread_id, 4862 NULL); 4863 switch (++allocator->drain_step) { 4864 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER: 4865 stop_scrubbing(allocator); 4866 return; 4867 4868 case VDO_DRAIN_ALLOCATOR_STEP_SLABS: 4869 apply_to_slabs(allocator, do_drain_step); 4870 return; 4871 4872 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY: 4873 vdo_start_draining(&allocator->summary_state, 4874 vdo_get_admin_state_code(&allocator->state), 4875 completion, initiate_summary_drain); 4876 return; 4877 4878 case VDO_DRAIN_ALLOCATOR_STEP_FINISHED: 4879 VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool), 4880 "vio pool not busy"); 4881 vdo_finish_draining_with_result(&allocator->state, completion->result); 4882 return; 4883 4884 default: 4885 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE); 4886 } 4887 } 4888 4889 /* Implements vdo_admin_initiator_fn. */ 4890 static void initiate_drain(struct admin_state *state) 4891 { 4892 struct block_allocator *allocator = 4893 container_of(state, struct block_allocator, state); 4894 4895 allocator->drain_step = VDO_DRAIN_ALLOCATOR_START; 4896 do_drain_step(&allocator->completion); 4897 } 4898 4899 /* 4900 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be 4901 * written to disk. The type of drain will be determined from the state of the allocator's depot. 4902 * 4903 * Implements vdo_zone_action_fn. 4904 */ 4905 static void drain_allocator(void *context, zone_count_t zone_number, 4906 struct vdo_completion *parent) 4907 { 4908 struct slab_depot *depot = context; 4909 4910 vdo_start_draining(&depot->allocators[zone_number].state, 4911 vdo_get_current_manager_operation(depot->action_manager), 4912 parent, initiate_drain); 4913 } 4914 4915 /** 4916 * vdo_drain_slab_depot() - Drain all slab depot I/O. 4917 * @depot: The depot to drain. 4918 * @operation: The drain operation (flush, rebuild, suspend, or save). 4919 * @parent: The completion to finish when the drain is complete. 4920 * 4921 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending, 4922 * the depot will be left in a suspended state. 4923 */ 4924 void vdo_drain_slab_depot(struct slab_depot *depot, 4925 const struct admin_state_code *operation, 4926 struct vdo_completion *parent) 4927 { 4928 vdo_schedule_operation(depot->action_manager, operation, 4929 NULL, drain_allocator, NULL, parent); 4930 } 4931 4932 /** 4933 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped. 4934 * @allocator: The allocator being resumed. 4935 */ 4936 static void resume_scrubbing(struct block_allocator *allocator) 4937 { 4938 int result; 4939 struct slab_scrubber *scrubber = &allocator->scrubber; 4940 4941 if (!has_slabs_to_scrub(scrubber)) { 4942 vdo_finish_completion(&allocator->completion); 4943 return; 4944 } 4945 4946 result = vdo_resume_if_quiescent(&scrubber->admin_state); 4947 if (result != VDO_SUCCESS) { 4948 vdo_fail_completion(&allocator->completion, result); 4949 return; 4950 } 4951 4952 scrub_next_slab(scrubber); 4953 vdo_finish_completion(&allocator->completion); 4954 } 4955 4956 static void do_resume_step(struct vdo_completion *completion) 4957 { 4958 struct block_allocator *allocator = vdo_as_block_allocator(completion); 4959 4960 vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step, 4961 handle_operation_error, 4962 allocator->thread_id, NULL); 4963 switch (--allocator->drain_step) { 4964 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY: 4965 vdo_fail_completion(completion, 4966 vdo_resume_if_quiescent(&allocator->summary_state)); 4967 return; 4968 4969 case VDO_DRAIN_ALLOCATOR_STEP_SLABS: 4970 apply_to_slabs(allocator, do_resume_step); 4971 return; 4972 4973 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER: 4974 resume_scrubbing(allocator); 4975 return; 4976 4977 case VDO_DRAIN_ALLOCATOR_START: 4978 vdo_finish_resuming_with_result(&allocator->state, completion->result); 4979 return; 4980 4981 default: 4982 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE); 4983 } 4984 } 4985 4986 /* Implements vdo_admin_initiator_fn. */ 4987 static void initiate_resume(struct admin_state *state) 4988 { 4989 struct block_allocator *allocator = 4990 container_of(state, struct block_allocator, state); 4991 4992 allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED; 4993 do_resume_step(&allocator->completion); 4994 } 4995 4996 /* Implements vdo_zone_action_fn. */ 4997 static void resume_allocator(void *context, zone_count_t zone_number, 4998 struct vdo_completion *parent) 4999 { 5000 struct slab_depot *depot = context; 5001 5002 vdo_start_resuming(&depot->allocators[zone_number].state, 5003 vdo_get_current_manager_operation(depot->action_manager), 5004 parent, initiate_resume); 5005 } 5006 5007 /** 5008 * vdo_resume_slab_depot() - Resume a suspended slab depot. 5009 * @depot: The depot to resume. 5010 * @parent: The completion to finish when the depot has resumed. 5011 */ 5012 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent) 5013 { 5014 if (vdo_is_read_only(depot->vdo)) { 5015 vdo_continue_completion(parent, VDO_READ_ONLY); 5016 return; 5017 } 5018 5019 vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING, 5020 NULL, resume_allocator, NULL, parent); 5021 } 5022 5023 /** 5024 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a 5025 * given recovery journal block. 5026 * @depot: The depot. 5027 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be 5028 * released. 5029 * 5030 * Context: This method must be called from the journal zone thread. 5031 */ 5032 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot, 5033 sequence_number_t recovery_block_number) 5034 { 5035 if (depot == NULL) 5036 return; 5037 5038 depot->new_release_request = recovery_block_number; 5039 vdo_schedule_default_action(depot->action_manager); 5040 } 5041 5042 /* Implements vdo_zone_action_fn. */ 5043 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number, 5044 struct vdo_completion *parent) 5045 { 5046 struct slab_depot *depot = context; 5047 5048 scrub_slabs(&depot->allocators[zone_number], NULL); 5049 vdo_launch_completion(parent); 5050 } 5051 5052 /** 5053 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs. 5054 * @depot: The depot to scrub. 5055 * @parent: The object to notify when scrubbing has been launched for all zones. 5056 */ 5057 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot, 5058 struct vdo_completion *parent) 5059 { 5060 vdo_schedule_action(depot->action_manager, NULL, 5061 scrub_all_unrecovered_slabs, 5062 NULL, parent); 5063 } 5064 5065 /** 5066 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators 5067 * in the depot. 5068 * @depot: The slab depot. 5069 * 5070 * Return: The statistics from all block allocators in the depot. 5071 */ 5072 static struct block_allocator_statistics __must_check 5073 get_block_allocator_statistics(const struct slab_depot *depot) 5074 { 5075 struct block_allocator_statistics totals; 5076 zone_count_t zone; 5077 5078 memset(&totals, 0, sizeof(totals)); 5079 5080 for (zone = 0; zone < depot->zone_count; zone++) { 5081 const struct block_allocator *allocator = &depot->allocators[zone]; 5082 const struct block_allocator_statistics *stats = &allocator->statistics; 5083 5084 totals.slab_count += allocator->slab_count; 5085 totals.slabs_opened += READ_ONCE(stats->slabs_opened); 5086 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened); 5087 } 5088 5089 return totals; 5090 } 5091 5092 /** 5093 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot. 5094 * @depot: The slab depot. 5095 * 5096 * Return: The cumulative statistics for all ref_counts in the depot. 5097 */ 5098 static struct ref_counts_statistics __must_check 5099 get_ref_counts_statistics(const struct slab_depot *depot) 5100 { 5101 struct ref_counts_statistics totals; 5102 zone_count_t zone; 5103 5104 memset(&totals, 0, sizeof(totals)); 5105 5106 for (zone = 0; zone < depot->zone_count; zone++) { 5107 totals.blocks_written += 5108 READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written); 5109 } 5110 5111 return totals; 5112 } 5113 5114 /** 5115 * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot. 5116 * @depot: The slab depot. 5117 * 5118 * Return: The aggregated statistics for all slab journals in the depot. 5119 */ 5120 static struct slab_journal_statistics __must_check 5121 get_slab_journal_statistics(const struct slab_depot *depot) 5122 { 5123 struct slab_journal_statistics totals; 5124 zone_count_t zone; 5125 5126 memset(&totals, 0, sizeof(totals)); 5127 5128 for (zone = 0; zone < depot->zone_count; zone++) { 5129 const struct slab_journal_statistics *stats = 5130 &depot->allocators[zone].slab_journal_statistics; 5131 5132 totals.disk_full_count += READ_ONCE(stats->disk_full_count); 5133 totals.flush_count += READ_ONCE(stats->flush_count); 5134 totals.blocked_count += READ_ONCE(stats->blocked_count); 5135 totals.blocks_written += READ_ONCE(stats->blocks_written); 5136 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count); 5137 } 5138 5139 return totals; 5140 } 5141 5142 /** 5143 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the 5144 * slab depot. 5145 * @depot: The slab depot. 5146 * @stats: The vdo statistics structure to partially fill. 5147 */ 5148 void vdo_get_slab_depot_statistics(const struct slab_depot *depot, 5149 struct vdo_statistics *stats) 5150 { 5151 slab_count_t slab_count = READ_ONCE(depot->slab_count); 5152 slab_count_t unrecovered = 0; 5153 zone_count_t zone; 5154 5155 for (zone = 0; zone < depot->zone_count; zone++) { 5156 /* The allocators are responsible for thread safety. */ 5157 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count); 5158 } 5159 5160 stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count; 5161 stats->allocator = get_block_allocator_statistics(depot); 5162 stats->ref_counts = get_ref_counts_statistics(depot); 5163 stats->slab_journal = get_slab_journal_statistics(depot); 5164 stats->slab_summary = (struct slab_summary_statistics) { 5165 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written), 5166 }; 5167 } 5168 5169 /** 5170 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion. 5171 * @depot: The slab depot. 5172 */ 5173 void vdo_dump_slab_depot(const struct slab_depot *depot) 5174 { 5175 vdo_log_info("vdo slab depot"); 5176 vdo_log_info(" zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu", 5177 (unsigned int) depot->zone_count, 5178 (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count), 5179 (unsigned long long) depot->active_release_request, 5180 (unsigned long long) depot->new_release_request); 5181 } 5182