1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "slab-depot.h" 7 8 #include <linux/atomic.h> 9 #include <linux/bio.h> 10 #include <linux/err.h> 11 #include <linux/log2.h> 12 #include <linux/min_heap.h> 13 #include <linux/minmax.h> 14 15 #include "logger.h" 16 #include "memory-alloc.h" 17 #include "numeric.h" 18 #include "permassert.h" 19 #include "string-utils.h" 20 21 #include "action-manager.h" 22 #include "admin-state.h" 23 #include "completion.h" 24 #include "constants.h" 25 #include "data-vio.h" 26 #include "encodings.h" 27 #include "io-submitter.h" 28 #include "physical-zone.h" 29 #include "priority-table.h" 30 #include "recovery-journal.h" 31 #include "repair.h" 32 #include "status-codes.h" 33 #include "types.h" 34 #include "vdo.h" 35 #include "vio.h" 36 #include "wait-queue.h" 37 38 static const u64 BYTES_PER_WORD = sizeof(u64); 39 static const bool NORMAL_OPERATION = true; 40 41 /** 42 * get_lock() - Get the lock object for a slab journal block by sequence number. 43 * @journal: vdo_slab journal to retrieve from. 44 * @sequence_number: Sequence number of the block. 45 * 46 * Return: The lock object for the given sequence number. 47 */ 48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal, 49 sequence_number_t sequence_number) 50 { 51 return &journal->locks[sequence_number % journal->size]; 52 } 53 54 static bool is_slab_open(struct vdo_slab *slab) 55 { 56 return (!vdo_is_state_quiescing(&slab->state) && 57 !vdo_is_state_quiescent(&slab->state)); 58 } 59 60 /** 61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush. 62 * @journal: The journal to check. 63 * 64 * Return: true if there are no entry waiters, or if the slab is unrecovered. 65 */ 66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal) 67 { 68 return ((journal->slab->status != VDO_SLAB_REBUILDING) && 69 vdo_waitq_has_waiters(&journal->entry_waiters)); 70 } 71 72 /** 73 * is_reaping() - Check whether a reap is currently in progress. 74 * @journal: The journal which may be reaping. 75 * 76 * Return: true if the journal is reaping. 77 */ 78 static inline bool __must_check is_reaping(struct slab_journal *journal) 79 { 80 return (journal->head != journal->unreapable); 81 } 82 83 /** 84 * initialize_tail_block() - Initialize tail block as a new block. 85 * @journal: The journal whose tail block is being initialized. 86 */ 87 static void initialize_tail_block(struct slab_journal *journal) 88 { 89 struct slab_journal_block_header *header = &journal->tail_header; 90 91 header->sequence_number = journal->tail; 92 header->entry_count = 0; 93 header->has_block_map_increments = false; 94 } 95 96 /** 97 * initialize_journal_state() - Set all journal fields appropriately to start journaling. 98 * @journal: The journal to be reset, based on its tail sequence number. 99 */ 100 static void initialize_journal_state(struct slab_journal *journal) 101 { 102 journal->unreapable = journal->head; 103 journal->reap_lock = get_lock(journal, journal->unreapable); 104 journal->next_commit = journal->tail; 105 journal->summarized = journal->last_summarized = journal->tail; 106 initialize_tail_block(journal); 107 } 108 109 /** 110 * block_is_full() - Check whether a journal block is full. 111 * @journal: The slab journal for the block. 112 * 113 * Return: true if the tail block is full. 114 */ 115 static bool __must_check block_is_full(struct slab_journal *journal) 116 { 117 journal_entry_count_t count = journal->tail_header.entry_count; 118 119 return (journal->tail_header.has_block_map_increments ? 120 (journal->full_entries_per_block == count) : 121 (journal->entries_per_block == count)); 122 } 123 124 static void add_entries(struct slab_journal *journal); 125 static void update_tail_block_location(struct slab_journal *journal); 126 static void release_journal_locks(struct vdo_waiter *waiter, void *context); 127 128 /** 129 * is_slab_journal_blank() - Check whether a slab's journal is blank. 130 * 131 * A slab journal is blank if it has never had any entries recorded in it. 132 * 133 * Return: true if the slab's journal has never been modified. 134 */ 135 static bool is_slab_journal_blank(const struct vdo_slab *slab) 136 { 137 return ((slab->journal.tail == 1) && 138 (slab->journal.tail_header.entry_count == 0)); 139 } 140 141 /** 142 * mark_slab_journal_dirty() - Put a slab journal on the dirty ring of its allocator in the correct 143 * order. 144 * @journal: The journal to be marked dirty. 145 * @lock: The recovery journal lock held by the slab journal. 146 */ 147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock) 148 { 149 struct slab_journal *dirty_journal; 150 struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals; 151 152 VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean"); 153 154 journal->recovery_lock = lock; 155 list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) { 156 if (dirty_journal->recovery_lock <= journal->recovery_lock) 157 break; 158 } 159 160 list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next); 161 } 162 163 static void mark_slab_journal_clean(struct slab_journal *journal) 164 { 165 journal->recovery_lock = 0; 166 list_del_init(&journal->dirty_entry); 167 } 168 169 static void check_if_slab_drained(struct vdo_slab *slab) 170 { 171 bool read_only; 172 struct slab_journal *journal = &slab->journal; 173 const struct admin_state_code *code; 174 175 if (!vdo_is_state_draining(&slab->state) || 176 must_make_entries_to_flush(journal) || 177 is_reaping(journal) || 178 journal->waiting_to_commit || 179 !list_empty(&journal->uncommitted_blocks) || 180 journal->updating_slab_summary || 181 (slab->active_count > 0)) 182 return; 183 184 /* When not suspending or recovering, the slab must be clean. */ 185 code = vdo_get_admin_state_code(&slab->state); 186 read_only = vdo_is_read_only(slab->allocator->depot->vdo); 187 if (!read_only && 188 vdo_waitq_has_waiters(&slab->dirty_blocks) && 189 (code != VDO_ADMIN_STATE_SUSPENDING) && 190 (code != VDO_ADMIN_STATE_RECOVERING)) 191 return; 192 193 vdo_finish_draining_with_result(&slab->state, 194 (read_only ? VDO_READ_ONLY : VDO_SUCCESS)); 195 } 196 197 /* FULLNESS HINT COMPUTATION */ 198 199 /** 200 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be 201 * stored in a slab_summary_entry's 7 bits that are dedicated to its free 202 * count. 203 * @depot: The depot whose summary being updated. 204 * @free_blocks: The number of free blocks. 205 * 206 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though 207 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least 208 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might 209 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f 210 * is 0, which would make it impossible to distinguish completely full from completely empty. 211 * 212 * Return: A fullness hint, which can be stored in 7 bits. 213 */ 214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot, 215 block_count_t free_blocks) 216 { 217 block_count_t hint; 218 219 VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23"); 220 221 if (free_blocks == 0) 222 return 0; 223 224 hint = free_blocks >> depot->hint_shift; 225 return ((hint == 0) ? 1 : hint); 226 } 227 228 /** 229 * check_summary_drain_complete() - Check whether an allocators summary has finished draining. 230 */ 231 static void check_summary_drain_complete(struct block_allocator *allocator) 232 { 233 if (!vdo_is_state_draining(&allocator->summary_state) || 234 (allocator->summary_write_count > 0)) 235 return; 236 237 vdo_finish_operation(&allocator->summary_state, 238 (vdo_is_read_only(allocator->depot->vdo) ? 239 VDO_READ_ONLY : VDO_SUCCESS)); 240 } 241 242 /** 243 * notify_summary_waiters() - Wake all the waiters in a given queue. 244 * @allocator: The block allocator summary which owns the queue. 245 * @queue: The queue to notify. 246 */ 247 static void notify_summary_waiters(struct block_allocator *allocator, 248 struct vdo_wait_queue *queue) 249 { 250 int result = (vdo_is_read_only(allocator->depot->vdo) ? 251 VDO_READ_ONLY : VDO_SUCCESS); 252 253 vdo_waitq_notify_all_waiters(queue, NULL, &result); 254 } 255 256 static void launch_write(struct slab_summary_block *summary_block); 257 258 /** 259 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write, 260 * whether or not the attempt succeeded. 261 * @block: The block. 262 */ 263 static void finish_updating_slab_summary_block(struct slab_summary_block *block) 264 { 265 notify_summary_waiters(block->allocator, &block->current_update_waiters); 266 block->writing = false; 267 block->allocator->summary_write_count--; 268 if (vdo_waitq_has_waiters(&block->next_update_waiters)) 269 launch_write(block); 270 else 271 check_summary_drain_complete(block->allocator); 272 } 273 274 /** 275 * finish_update() - This is the callback for a successful summary block write. 276 * @completion: The write vio. 277 */ 278 static void finish_update(struct vdo_completion *completion) 279 { 280 struct slab_summary_block *block = 281 container_of(as_vio(completion), struct slab_summary_block, vio); 282 283 atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written); 284 finish_updating_slab_summary_block(block); 285 } 286 287 /** 288 * handle_write_error() - Handle an error writing a slab summary block. 289 * @completion: The write VIO. 290 */ 291 static void handle_write_error(struct vdo_completion *completion) 292 { 293 struct slab_summary_block *block = 294 container_of(as_vio(completion), struct slab_summary_block, vio); 295 296 vio_record_metadata_io_error(as_vio(completion)); 297 vdo_enter_read_only_mode(completion->vdo, completion->result); 298 finish_updating_slab_summary_block(block); 299 } 300 301 static void write_slab_summary_endio(struct bio *bio) 302 { 303 struct vio *vio = bio->bi_private; 304 struct slab_summary_block *block = 305 container_of(vio, struct slab_summary_block, vio); 306 307 continue_vio_after_io(vio, finish_update, block->allocator->thread_id); 308 } 309 310 /** 311 * launch_write() - Write a slab summary block unless it is currently out for writing. 312 * @block: The block that needs to be committed. 313 */ 314 static void launch_write(struct slab_summary_block *block) 315 { 316 struct block_allocator *allocator = block->allocator; 317 struct slab_depot *depot = allocator->depot; 318 physical_block_number_t pbn; 319 320 if (block->writing) 321 return; 322 323 allocator->summary_write_count++; 324 vdo_waitq_transfer_all_waiters(&block->next_update_waiters, 325 &block->current_update_waiters); 326 block->writing = true; 327 328 if (vdo_is_read_only(depot->vdo)) { 329 finish_updating_slab_summary_block(block); 330 return; 331 } 332 333 memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE); 334 335 /* 336 * Flush before writing to ensure that the slab journal tail blocks and reference updates 337 * covered by this summary update are stable. Otherwise, a subsequent recovery could 338 * encounter a slab summary update that refers to a slab journal tail block that has not 339 * actually been written. In such cases, the slab journal referenced will be treated as 340 * empty, causing any data within the slab which predates the existing recovery journal 341 * entries to be lost. 342 */ 343 pbn = (depot->summary_origin + 344 (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) + 345 block->index); 346 vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio, 347 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH); 348 } 349 350 /** 351 * update_slab_summary_entry() - Update the entry for a slab. 352 * @slab: The slab whose entry is to be updated 353 * @waiter: The waiter that is updating the summary. 354 * @tail_block_offset: The offset of the slab journal's tail block. 355 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load. 356 * @is_clean: Whether the slab is clean. 357 * @free_blocks: The number of free blocks. 358 */ 359 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter, 360 tail_block_offset_t tail_block_offset, 361 bool load_ref_counts, bool is_clean, 362 block_count_t free_blocks) 363 { 364 u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK; 365 struct block_allocator *allocator = slab->allocator; 366 struct slab_summary_block *block = &allocator->summary_blocks[index]; 367 int result; 368 struct slab_summary_entry *entry; 369 370 if (vdo_is_read_only(block->vio.completion.vdo)) { 371 result = VDO_READ_ONLY; 372 waiter->callback(waiter, &result); 373 return; 374 } 375 376 if (vdo_is_state_draining(&allocator->summary_state) || 377 vdo_is_state_quiescent(&allocator->summary_state)) { 378 result = VDO_INVALID_ADMIN_STATE; 379 waiter->callback(waiter, &result); 380 return; 381 } 382 383 entry = &allocator->summary_entries[slab->slab_number]; 384 *entry = (struct slab_summary_entry) { 385 .tail_block_offset = tail_block_offset, 386 .load_ref_counts = (entry->load_ref_counts || load_ref_counts), 387 .is_dirty = !is_clean, 388 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks), 389 }; 390 vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter); 391 launch_write(block); 392 } 393 394 /** 395 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are 396 * complete. 397 * @journal: The journal to be reaped. 398 */ 399 static void finish_reaping(struct slab_journal *journal) 400 { 401 journal->head = journal->unreapable; 402 add_entries(journal); 403 check_if_slab_drained(journal->slab); 404 } 405 406 static void reap_slab_journal(struct slab_journal *journal); 407 408 /** 409 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try 410 * reaping again in case we deferred reaping due to an outstanding vio. 411 * @completion: The flush vio. 412 */ 413 static void complete_reaping(struct vdo_completion *completion) 414 { 415 struct slab_journal *journal = completion->parent; 416 417 return_vio_to_pool(journal->slab->allocator->vio_pool, 418 vio_as_pooled_vio(as_vio(vdo_forget(completion)))); 419 finish_reaping(journal); 420 reap_slab_journal(journal); 421 } 422 423 /** 424 * handle_flush_error() - Handle an error flushing the lower layer. 425 * @completion: The flush vio. 426 */ 427 static void handle_flush_error(struct vdo_completion *completion) 428 { 429 vio_record_metadata_io_error(as_vio(completion)); 430 vdo_enter_read_only_mode(completion->vdo, completion->result); 431 complete_reaping(completion); 432 } 433 434 static void flush_endio(struct bio *bio) 435 { 436 struct vio *vio = bio->bi_private; 437 struct slab_journal *journal = vio->completion.parent; 438 439 continue_vio_after_io(vio, complete_reaping, 440 journal->slab->allocator->thread_id); 441 } 442 443 /** 444 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer 445 * prior to reaping. 446 * @waiter: The journal as a flush waiter. 447 * @context: The newly acquired flush vio. 448 */ 449 static void flush_for_reaping(struct vdo_waiter *waiter, void *context) 450 { 451 struct slab_journal *journal = 452 container_of(waiter, struct slab_journal, flush_waiter); 453 struct pooled_vio *pooled = context; 454 struct vio *vio = &pooled->vio; 455 456 vio->completion.parent = journal; 457 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error); 458 } 459 460 /** 461 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks. 462 * @journal: The slab journal. 463 */ 464 static void reap_slab_journal(struct slab_journal *journal) 465 { 466 bool reaped = false; 467 468 if (is_reaping(journal)) { 469 /* We already have a reap in progress so wait for it to finish. */ 470 return; 471 } 472 473 if ((journal->slab->status != VDO_SLAB_REBUILT) || 474 !vdo_is_state_normal(&journal->slab->state) || 475 vdo_is_read_only(journal->slab->allocator->depot->vdo)) { 476 /* 477 * We must not reap in the first two cases, and there's no point in read-only mode. 478 */ 479 return; 480 } 481 482 /* 483 * Start reclaiming blocks only when the journal head has no references. Then stop when a 484 * block is referenced or reap reaches the most recently written block, referenced by the 485 * slab summary, which has the sequence number just before the tail. 486 */ 487 while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) { 488 reaped = true; 489 journal->unreapable++; 490 journal->reap_lock++; 491 if (journal->reap_lock == &journal->locks[journal->size]) 492 journal->reap_lock = &journal->locks[0]; 493 } 494 495 if (!reaped) 496 return; 497 498 /* 499 * It is never safe to reap a slab journal block without first issuing a flush, regardless 500 * of whether a user flush has been received or not. In the absence of the flush, the 501 * reference block write which released the locks allowing the slab journal to reap may not 502 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab 503 * journal block writes can be issued while previous slab summary updates have not yet been 504 * made. Even though those slab journal block writes will be ignored if the slab summary 505 * update is not persisted, they may still overwrite the to-be-reaped slab journal block 506 * resulting in a loss of reference count updates. 507 */ 508 journal->flush_waiter.callback = flush_for_reaping; 509 acquire_vio_from_pool(journal->slab->allocator->vio_pool, 510 &journal->flush_waiter); 511 } 512 513 /** 514 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block. 515 * @journal: The slab journal. 516 * @sequence_number: The journal sequence number of the referenced block. 517 * @adjustment: Amount to adjust the reference counter. 518 * 519 * Note that when the adjustment is negative, the slab journal will be reaped. 520 */ 521 static void adjust_slab_journal_block_reference(struct slab_journal *journal, 522 sequence_number_t sequence_number, 523 int adjustment) 524 { 525 struct journal_lock *lock; 526 527 if (sequence_number == 0) 528 return; 529 530 if (journal->slab->status == VDO_SLAB_REPLAYING) { 531 /* Locks should not be used during offline replay. */ 532 return; 533 } 534 535 VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero"); 536 lock = get_lock(journal, sequence_number); 537 if (adjustment < 0) { 538 VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count), 539 "adjustment %d of lock count %u for slab journal block %llu must not underflow", 540 adjustment, lock->count, 541 (unsigned long long) sequence_number); 542 } 543 544 lock->count += adjustment; 545 if (lock->count == 0) 546 reap_slab_journal(journal); 547 } 548 549 /** 550 * release_journal_locks() - Callback invoked after a slab summary update completes. 551 * @waiter: The slab summary waiter that has just been notified. 552 * @context: The result code of the update. 553 * 554 * Registered in the constructor on behalf of update_tail_block_location(). 555 * 556 * Implements waiter_callback_fn. 557 */ 558 static void release_journal_locks(struct vdo_waiter *waiter, void *context) 559 { 560 sequence_number_t first, i; 561 struct slab_journal *journal = 562 container_of(waiter, struct slab_journal, slab_summary_waiter); 563 int result = *((int *) context); 564 565 if (result != VDO_SUCCESS) { 566 if (result != VDO_READ_ONLY) { 567 /* 568 * Don't bother logging what might be lots of errors if we are already in 569 * read-only mode. 570 */ 571 vdo_log_error_strerror(result, "failed slab summary update %llu", 572 (unsigned long long) journal->summarized); 573 } 574 575 journal->updating_slab_summary = false; 576 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 577 check_if_slab_drained(journal->slab); 578 return; 579 } 580 581 if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) { 582 journal->partial_write_in_progress = false; 583 add_entries(journal); 584 } 585 586 first = journal->last_summarized; 587 journal->last_summarized = journal->summarized; 588 for (i = journal->summarized - 1; i >= first; i--) { 589 /* 590 * Release the lock the summarized block held on the recovery journal. (During 591 * replay, recovery_start will always be 0.) 592 */ 593 if (journal->recovery_journal != NULL) { 594 zone_count_t zone_number = journal->slab->allocator->zone_number; 595 struct journal_lock *lock = get_lock(journal, i); 596 597 vdo_release_recovery_journal_block_reference(journal->recovery_journal, 598 lock->recovery_start, 599 VDO_ZONE_TYPE_PHYSICAL, 600 zone_number); 601 } 602 603 /* 604 * Release our own lock against reaping for blocks that are committed. (This 605 * function will not change locks during replay.) 606 */ 607 adjust_slab_journal_block_reference(journal, i, -1); 608 } 609 610 journal->updating_slab_summary = false; 611 612 reap_slab_journal(journal); 613 614 /* Check if the slab summary needs to be updated again. */ 615 update_tail_block_location(journal); 616 } 617 618 /** 619 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary. 620 * @journal: The slab journal that is updating its tail block location. 621 */ 622 static void update_tail_block_location(struct slab_journal *journal) 623 { 624 block_count_t free_block_count; 625 struct vdo_slab *slab = journal->slab; 626 627 if (journal->updating_slab_summary || 628 vdo_is_read_only(journal->slab->allocator->depot->vdo) || 629 (journal->last_summarized >= journal->next_commit)) { 630 check_if_slab_drained(slab); 631 return; 632 } 633 634 if (slab->status != VDO_SLAB_REBUILT) { 635 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint; 636 637 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift; 638 } else { 639 free_block_count = slab->free_blocks; 640 } 641 642 journal->summarized = journal->next_commit; 643 journal->updating_slab_summary = true; 644 645 /* 646 * Update slab summary as dirty. 647 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this 648 * slab have been written to the layer. Therefore, indicate that the ref counts must be 649 * loaded when the journal head has reaped past sequence number 1. 650 */ 651 update_slab_summary_entry(slab, &journal->slab_summary_waiter, 652 journal->summarized % journal->size, 653 (journal->head > 1), false, free_block_count); 654 } 655 656 /** 657 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries. 658 */ 659 static void reopen_slab_journal(struct vdo_slab *slab) 660 { 661 struct slab_journal *journal = &slab->journal; 662 sequence_number_t block; 663 664 VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0, 665 "vdo_slab journal's active block empty before reopening"); 666 journal->head = journal->tail; 667 initialize_journal_state(journal); 668 669 /* Ensure no locks are spuriously held on an empty journal. */ 670 for (block = 1; block <= journal->size; block++) { 671 VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0), 672 "Scrubbed journal's block %llu is not locked", 673 (unsigned long long) block); 674 } 675 676 add_entries(journal); 677 } 678 679 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio) 680 { 681 const struct packed_slab_journal_block *block = 682 (const struct packed_slab_journal_block *) vio->vio.data; 683 684 return __le64_to_cpu(block->header.sequence_number); 685 } 686 687 /** 688 * complete_write() - Handle post-commit processing. 689 * @completion: The write vio as a completion. 690 * 691 * This is the callback registered by write_slab_journal_block(). 692 */ 693 static void complete_write(struct vdo_completion *completion) 694 { 695 int result = completion->result; 696 struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion)); 697 struct slab_journal *journal = completion->parent; 698 sequence_number_t committed = get_committing_sequence_number(pooled); 699 700 list_del_init(&pooled->list_entry); 701 return_vio_to_pool(journal->slab->allocator->vio_pool, vdo_forget(pooled)); 702 703 if (result != VDO_SUCCESS) { 704 vio_record_metadata_io_error(as_vio(completion)); 705 vdo_log_error_strerror(result, "cannot write slab journal block %llu", 706 (unsigned long long) committed); 707 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 708 check_if_slab_drained(journal->slab); 709 return; 710 } 711 712 WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1); 713 714 if (list_empty(&journal->uncommitted_blocks)) { 715 /* If no blocks are outstanding, then the commit point is at the tail. */ 716 journal->next_commit = journal->tail; 717 } else { 718 /* The commit point is always the beginning of the oldest incomplete block. */ 719 pooled = container_of(journal->uncommitted_blocks.next, 720 struct pooled_vio, list_entry); 721 journal->next_commit = get_committing_sequence_number(pooled); 722 } 723 724 update_tail_block_location(journal); 725 } 726 727 static void write_slab_journal_endio(struct bio *bio) 728 { 729 struct vio *vio = bio->bi_private; 730 struct slab_journal *journal = vio->completion.parent; 731 732 continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id); 733 } 734 735 /** 736 * write_slab_journal_block() - Write a slab journal block. 737 * @waiter: The vio pool waiter which was just notified. 738 * @context: The vio pool entry for the write. 739 * 740 * Callback from acquire_vio_from_pool() registered in commit_tail(). 741 */ 742 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context) 743 { 744 struct pooled_vio *pooled = context; 745 struct vio *vio = &pooled->vio; 746 struct slab_journal *journal = 747 container_of(waiter, struct slab_journal, resource_waiter); 748 struct slab_journal_block_header *header = &journal->tail_header; 749 int unused_entries = journal->entries_per_block - header->entry_count; 750 physical_block_number_t block_number; 751 const struct admin_state_code *operation; 752 753 header->head = journal->head; 754 list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks); 755 vdo_pack_slab_journal_block_header(header, &journal->block->header); 756 757 /* Copy the tail block into the vio. */ 758 memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE); 759 760 VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull"); 761 if (unused_entries > 0) { 762 /* 763 * Release the per-entry locks for any unused entries in the block we are about to 764 * write. 765 */ 766 adjust_slab_journal_block_reference(journal, header->sequence_number, 767 -unused_entries); 768 journal->partial_write_in_progress = !block_is_full(journal); 769 } 770 771 block_number = journal->slab->journal_origin + 772 (header->sequence_number % journal->size); 773 vio->completion.parent = journal; 774 775 /* 776 * This block won't be read in recovery until the slab summary is updated to refer to it. 777 * The slab summary update does a flush which is sufficient to protect us from corruption 778 * due to out of order slab journal, reference block, or block map writes. 779 */ 780 vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio, 781 complete_write, REQ_OP_WRITE); 782 783 /* Since the write is submitted, the tail block structure can be reused. */ 784 journal->tail++; 785 initialize_tail_block(journal); 786 journal->waiting_to_commit = false; 787 788 operation = vdo_get_admin_state_code(&journal->slab->state); 789 if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) { 790 vdo_finish_operation(&journal->slab->state, 791 (vdo_is_read_only(journal->slab->allocator->depot->vdo) ? 792 VDO_READ_ONLY : VDO_SUCCESS)); 793 return; 794 } 795 796 add_entries(journal); 797 } 798 799 /** 800 * commit_tail() - Commit the tail block of the slab journal. 801 * @journal: The journal whose tail block should be committed. 802 */ 803 static void commit_tail(struct slab_journal *journal) 804 { 805 if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) { 806 /* 807 * There are no entries at the moment, but there are some waiters, so defer 808 * initiating the flush until those entries are ready to write. 809 */ 810 return; 811 } 812 813 if (vdo_is_read_only(journal->slab->allocator->depot->vdo) || 814 journal->waiting_to_commit || 815 (journal->tail_header.entry_count == 0)) { 816 /* 817 * There is nothing to do since the tail block is empty, or writing, or the journal 818 * is in read-only mode. 819 */ 820 return; 821 } 822 823 /* 824 * Since we are about to commit the tail block, this journal no longer needs to be on the 825 * ring of journals which the recovery journal might ask to commit. 826 */ 827 mark_slab_journal_clean(journal); 828 829 journal->waiting_to_commit = true; 830 831 journal->resource_waiter.callback = write_slab_journal_block; 832 acquire_vio_from_pool(journal->slab->allocator->vio_pool, 833 &journal->resource_waiter); 834 } 835 836 /** 837 * encode_slab_journal_entry() - Encode a slab journal entry. 838 * @tail_header: The unpacked header for the block. 839 * @payload: The journal block payload to hold the entry. 840 * @sbn: The slab block number of the entry to encode. 841 * @operation: The type of the entry. 842 * @increment: True if this is an increment. 843 * 844 * Exposed for unit tests. 845 */ 846 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header, 847 slab_journal_payload *payload, 848 slab_block_number sbn, 849 enum journal_operation operation, 850 bool increment) 851 { 852 journal_entry_count_t entry_number = tail_header->entry_count++; 853 854 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 855 if (!tail_header->has_block_map_increments) { 856 memset(payload->full_entries.entry_types, 0, 857 VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE); 858 tail_header->has_block_map_increments = true; 859 } 860 861 payload->full_entries.entry_types[entry_number / 8] |= 862 ((u8)1 << (entry_number % 8)); 863 } 864 865 vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment); 866 } 867 868 /** 869 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an 870 * increment and a decrement to a single point which refers to one or the 871 * other. 872 * @recovery_point: The journal point to convert. 873 * @increment: Whether the current entry is an increment. 874 * 875 * Return: The expanded journal point 876 * 877 * Because each data_vio has but a single recovery journal point, but may need to make both 878 * increment and decrement entries in the same slab journal. In order to distinguish the two 879 * entries, the entry count of the expanded journal point is twice the actual recovery journal 880 * entry count for increments, and one more than that for decrements. 881 */ 882 static struct journal_point expand_journal_point(struct journal_point recovery_point, 883 bool increment) 884 { 885 recovery_point.entry_count *= 2; 886 if (!increment) 887 recovery_point.entry_count++; 888 889 return recovery_point; 890 } 891 892 /** 893 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a 894 * block becomes full. 895 * @journal: The slab journal to append to. 896 * @pbn: The pbn being adjusted. 897 * @operation: The type of entry to make. 898 * @increment: True if this is an increment. 899 * @recovery_point: The expanded recovery point. 900 * 901 * This function is synchronous. 902 */ 903 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn, 904 enum journal_operation operation, bool increment, 905 struct journal_point recovery_point) 906 { 907 struct packed_slab_journal_block *block = journal->block; 908 int result; 909 910 result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point, 911 &recovery_point), 912 "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u", 913 (unsigned long long) recovery_point.sequence_number, 914 recovery_point.entry_count, 915 (unsigned long long) journal->tail_header.recovery_point.sequence_number, 916 journal->tail_header.recovery_point.entry_count); 917 if (result != VDO_SUCCESS) { 918 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result); 919 return; 920 } 921 922 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) { 923 result = VDO_ASSERT((journal->tail_header.entry_count < 924 journal->full_entries_per_block), 925 "block has room for full entries"); 926 if (result != VDO_SUCCESS) { 927 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, 928 result); 929 return; 930 } 931 } 932 933 encode_slab_journal_entry(&journal->tail_header, &block->payload, 934 pbn - journal->slab->start, operation, increment); 935 journal->tail_header.recovery_point = recovery_point; 936 if (block_is_full(journal)) 937 commit_tail(journal); 938 } 939 940 static inline block_count_t journal_length(const struct slab_journal *journal) 941 { 942 return journal->tail - journal->head; 943 } 944 945 /** 946 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal. 947 * @slab: The slab to play into. 948 * @pbn: The PBN for the entry. 949 * @operation: The type of entry to add. 950 * @increment: True if this entry is an increment. 951 * @recovery_point: The recovery journal point corresponding to this entry. 952 * @parent: The completion to notify when there is space to add the entry if the entry could not be 953 * added immediately. 954 * 955 * Return: true if the entry was added immediately. 956 */ 957 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn, 958 enum journal_operation operation, bool increment, 959 struct journal_point *recovery_point, 960 struct vdo_completion *parent) 961 { 962 struct slab_journal *journal = &slab->journal; 963 struct slab_journal_block_header *header = &journal->tail_header; 964 struct journal_point expanded = expand_journal_point(*recovery_point, increment); 965 966 /* Only accept entries after the current recovery point. */ 967 if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded)) 968 return true; 969 970 if ((header->entry_count >= journal->full_entries_per_block) && 971 (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) { 972 /* 973 * The tail block does not have room for the entry we are attempting to add so 974 * commit the tail block now. 975 */ 976 commit_tail(journal); 977 } 978 979 if (journal->waiting_to_commit) { 980 vdo_start_operation_with_waiter(&journal->slab->state, 981 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY, 982 parent, NULL); 983 return false; 984 } 985 986 if (journal_length(journal) >= journal->size) { 987 /* 988 * We must have reaped the current head before the crash, since the blocked 989 * threshold keeps us from having more entries than fit in a slab journal; hence we 990 * can just advance the head (and unreapable block), as needed. 991 */ 992 journal->head++; 993 journal->unreapable++; 994 } 995 996 if (journal->slab->status == VDO_SLAB_REBUILT) 997 journal->slab->status = VDO_SLAB_REPLAYING; 998 999 add_entry(journal, pbn, operation, increment, expanded); 1000 return true; 1001 } 1002 1003 /** 1004 * requires_reaping() - Check whether the journal must be reaped before adding new entries. 1005 * @journal: The journal to check. 1006 * 1007 * Return: true if the journal must be reaped. 1008 */ 1009 static bool requires_reaping(const struct slab_journal *journal) 1010 { 1011 return (journal_length(journal) >= journal->blocking_threshold); 1012 } 1013 1014 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */ 1015 static void finish_summary_update(struct vdo_waiter *waiter, void *context) 1016 { 1017 struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter); 1018 int result = *((int *) context); 1019 1020 slab->active_count--; 1021 1022 if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) { 1023 vdo_log_error_strerror(result, "failed to update slab summary"); 1024 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result); 1025 } 1026 1027 check_if_slab_drained(slab); 1028 } 1029 1030 static void write_reference_block(struct vdo_waiter *waiter, void *context); 1031 1032 /** 1033 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring 1034 * a VIO for it from the pool. 1035 * @waiter: The waiter of the block which is starting to write. 1036 * @context: The parent slab of the block. 1037 * 1038 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are 1039 * currently in use. 1040 */ 1041 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context) 1042 { 1043 struct vdo_slab *slab = context; 1044 1045 if (vdo_is_read_only(slab->allocator->depot->vdo)) 1046 return; 1047 1048 slab->active_count++; 1049 container_of(waiter, struct reference_block, waiter)->is_writing = true; 1050 waiter->callback = write_reference_block; 1051 acquire_vio_from_pool(slab->allocator->vio_pool, waiter); 1052 } 1053 1054 static void save_dirty_reference_blocks(struct vdo_slab *slab) 1055 { 1056 vdo_waitq_notify_all_waiters(&slab->dirty_blocks, 1057 launch_reference_block_write, slab); 1058 check_if_slab_drained(slab); 1059 } 1060 1061 /** 1062 * finish_reference_block_write() - After a reference block has written, clean it, release its 1063 * locks, and return its VIO to the pool. 1064 * @completion: The VIO that just finished writing. 1065 */ 1066 static void finish_reference_block_write(struct vdo_completion *completion) 1067 { 1068 struct vio *vio = as_vio(completion); 1069 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 1070 struct reference_block *block = completion->parent; 1071 struct vdo_slab *slab = block->slab; 1072 tail_block_offset_t offset; 1073 1074 slab->active_count--; 1075 1076 /* Release the slab journal lock. */ 1077 adjust_slab_journal_block_reference(&slab->journal, 1078 block->slab_journal_lock_to_release, -1); 1079 return_vio_to_pool(slab->allocator->vio_pool, pooled); 1080 1081 /* 1082 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause 1083 * us to be dirtied again, but we don't want to double enqueue. 1084 */ 1085 block->is_writing = false; 1086 1087 if (vdo_is_read_only(completion->vdo)) { 1088 check_if_slab_drained(slab); 1089 return; 1090 } 1091 1092 /* Re-queue the block if it was re-dirtied while it was writing. */ 1093 if (block->is_dirty) { 1094 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter); 1095 if (vdo_is_state_draining(&slab->state)) { 1096 /* We must be saving, and this block will otherwise not be relaunched. */ 1097 save_dirty_reference_blocks(slab); 1098 } 1099 1100 return; 1101 } 1102 1103 /* 1104 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks 1105 * and no summary update in progress. 1106 */ 1107 if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) { 1108 check_if_slab_drained(slab); 1109 return; 1110 } 1111 1112 offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 1113 slab->active_count++; 1114 slab->summary_waiter.callback = finish_summary_update; 1115 update_slab_summary_entry(slab, &slab->summary_waiter, offset, 1116 true, true, slab->free_blocks); 1117 } 1118 1119 /** 1120 * get_reference_counters_for_block() - Find the reference counters for a given block. 1121 * @block: The reference_block in question. 1122 * 1123 * Return: A pointer to the reference counters for this block. 1124 */ 1125 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block) 1126 { 1127 size_t block_index = block - block->slab->reference_blocks; 1128 1129 return &block->slab->counters[block_index * COUNTS_PER_BLOCK]; 1130 } 1131 1132 /** 1133 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out. 1134 * @block: The block to copy. 1135 * @buffer: The char buffer to fill with the packed block. 1136 */ 1137 static void pack_reference_block(struct reference_block *block, void *buffer) 1138 { 1139 struct packed_reference_block *packed = buffer; 1140 vdo_refcount_t *counters = get_reference_counters_for_block(block); 1141 sector_count_t i; 1142 struct packed_journal_point commit_point; 1143 1144 vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point); 1145 1146 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) { 1147 packed->sectors[i].commit_point = commit_point; 1148 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR), 1149 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR)); 1150 } 1151 } 1152 1153 static void write_reference_block_endio(struct bio *bio) 1154 { 1155 struct vio *vio = bio->bi_private; 1156 struct reference_block *block = vio->completion.parent; 1157 thread_id_t thread_id = block->slab->allocator->thread_id; 1158 1159 continue_vio_after_io(vio, finish_reference_block_write, thread_id); 1160 } 1161 1162 /** 1163 * handle_io_error() - Handle an I/O error reading or writing a reference count block. 1164 * @completion: The VIO doing the I/O as a completion. 1165 */ 1166 static void handle_io_error(struct vdo_completion *completion) 1167 { 1168 int result = completion->result; 1169 struct vio *vio = as_vio(completion); 1170 struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab; 1171 1172 vio_record_metadata_io_error(vio); 1173 return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio)); 1174 slab->active_count--; 1175 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result); 1176 check_if_slab_drained(slab); 1177 } 1178 1179 /** 1180 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy 1181 * its counters and associated data into the VIO, and launch the write. 1182 * @waiter: The waiter of the dirty block. 1183 * @context: The VIO returned by the pool. 1184 */ 1185 static void write_reference_block(struct vdo_waiter *waiter, void *context) 1186 { 1187 size_t block_offset; 1188 physical_block_number_t pbn; 1189 struct pooled_vio *pooled = context; 1190 struct vdo_completion *completion = &pooled->vio.completion; 1191 struct reference_block *block = container_of(waiter, struct reference_block, 1192 waiter); 1193 1194 pack_reference_block(block, pooled->vio.data); 1195 block_offset = (block - block->slab->reference_blocks); 1196 pbn = (block->slab->ref_counts_origin + block_offset); 1197 block->slab_journal_lock_to_release = block->slab_journal_lock; 1198 completion->parent = block; 1199 1200 /* 1201 * Mark the block as clean, since we won't be committing any updates that happen after this 1202 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not 1203 * cause complications. 1204 */ 1205 block->is_dirty = false; 1206 1207 /* 1208 * Flush before writing to ensure that the recovery journal and slab journal entries which 1209 * cover this reference update are stable. This prevents data corruption that can be caused 1210 * by out of order writes. 1211 */ 1212 WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written, 1213 block->slab->allocator->ref_counts_statistics.blocks_written + 1); 1214 1215 completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id; 1216 vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio, 1217 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH); 1218 } 1219 1220 static void reclaim_journal_space(struct slab_journal *journal) 1221 { 1222 block_count_t length = journal_length(journal); 1223 struct vdo_slab *slab = journal->slab; 1224 block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks); 1225 block_count_t written; 1226 1227 if ((length < journal->flushing_threshold) || (write_count == 0)) 1228 return; 1229 1230 /* The slab journal is over the first threshold, schedule some reference block writes. */ 1231 WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1); 1232 if (length < journal->flushing_deadline) { 1233 /* Schedule more writes the closer to the deadline we get. */ 1234 write_count /= journal->flushing_deadline - length + 1; 1235 write_count = max_t(block_count_t, write_count, 1); 1236 } 1237 1238 for (written = 0; written < write_count; written++) { 1239 vdo_waitq_notify_next_waiter(&slab->dirty_blocks, 1240 launch_reference_block_write, slab); 1241 } 1242 } 1243 1244 /** 1245 * reference_count_to_status() - Convert a reference count to a reference status. 1246 * @count: The count to convert. 1247 * 1248 * Return: The appropriate reference status. 1249 */ 1250 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count) 1251 { 1252 if (count == EMPTY_REFERENCE_COUNT) 1253 return RS_FREE; 1254 else if (count == 1) 1255 return RS_SINGLE; 1256 else if (count == PROVISIONAL_REFERENCE_COUNT) 1257 return RS_PROVISIONAL; 1258 else 1259 return RS_SHARED; 1260 } 1261 1262 /** 1263 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue 1264 * if it wasn't already dirty. 1265 * @block: The reference block to mark as dirty. 1266 */ 1267 static void dirty_block(struct reference_block *block) 1268 { 1269 if (block->is_dirty) 1270 return; 1271 1272 block->is_dirty = true; 1273 if (!block->is_writing) 1274 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter); 1275 } 1276 1277 /** 1278 * get_reference_block() - Get the reference block that covers the given block index. 1279 */ 1280 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab, 1281 slab_block_number index) 1282 { 1283 return &slab->reference_blocks[index / COUNTS_PER_BLOCK]; 1284 } 1285 1286 /** 1287 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical 1288 * block number. 1289 * @slab: The slab. 1290 * @physical_block_number: The physical block number. 1291 * @slab_block_number_ptr: A pointer to the slab block number. 1292 * 1293 * Return: VDO_SUCCESS or an error code. 1294 */ 1295 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab, 1296 physical_block_number_t pbn, 1297 slab_block_number *slab_block_number_ptr) 1298 { 1299 u64 slab_block_number; 1300 1301 if (pbn < slab->start) 1302 return VDO_OUT_OF_RANGE; 1303 1304 slab_block_number = pbn - slab->start; 1305 if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks) 1306 return VDO_OUT_OF_RANGE; 1307 1308 *slab_block_number_ptr = slab_block_number; 1309 return VDO_SUCCESS; 1310 } 1311 1312 /** 1313 * get_reference_counter() - Get the reference counter that covers the given physical block number. 1314 * @slab: The slab to query. 1315 * @pbn: The physical block number. 1316 * @counter_ptr: A pointer to the reference counter. 1317 */ 1318 static int __must_check get_reference_counter(struct vdo_slab *slab, 1319 physical_block_number_t pbn, 1320 vdo_refcount_t **counter_ptr) 1321 { 1322 slab_block_number index; 1323 int result = slab_block_number_from_pbn(slab, pbn, &index); 1324 1325 if (result != VDO_SUCCESS) 1326 return result; 1327 1328 *counter_ptr = &slab->counters[index]; 1329 1330 return VDO_SUCCESS; 1331 } 1332 1333 static unsigned int calculate_slab_priority(struct vdo_slab *slab) 1334 { 1335 block_count_t free_blocks = slab->free_blocks; 1336 unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority; 1337 unsigned int priority; 1338 1339 /* 1340 * Wholly full slabs must be the only ones with lowest priority, 0. 1341 * 1342 * Slabs that have never been opened (empty, newly initialized, and never been written to) 1343 * have lower priority than previously opened slabs that have a significant number of free 1344 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time 1345 * unless there are very few free blocks that have been previously written to. 1346 * 1347 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO 1348 * a better client of any underlying storage that is thinly-provisioned (though discarding 1349 * would be better). 1350 * 1351 * For all other slabs, the priority is derived from the logarithm of the number of free 1352 * blocks. Slabs with the same order of magnitude of free blocks have the same priority. 1353 * With 2^23 blocks, the priority will range from 1 to 25. The reserved 1354 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping. 1355 */ 1356 1357 if (free_blocks == 0) 1358 return 0; 1359 1360 if (is_slab_journal_blank(slab)) 1361 return unopened_slab_priority; 1362 1363 priority = (1 + ilog2(free_blocks)); 1364 return ((priority < unopened_slab_priority) ? priority : priority + 1); 1365 } 1366 1367 /* 1368 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab 1369 * so slabs with lots of free blocks will be opened for allocation before slabs that have few free 1370 * blocks. 1371 */ 1372 static void prioritize_slab(struct vdo_slab *slab) 1373 { 1374 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry), 1375 "a slab must not already be on a ring when prioritizing"); 1376 slab->priority = calculate_slab_priority(slab); 1377 vdo_priority_table_enqueue(slab->allocator->prioritized_slabs, 1378 slab->priority, &slab->allocq_entry); 1379 } 1380 1381 /** 1382 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab. 1383 * @incremented: true if the free block count went up. 1384 */ 1385 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented) 1386 { 1387 struct block_allocator *allocator = slab->allocator; 1388 1389 WRITE_ONCE(allocator->allocated_blocks, 1390 allocator->allocated_blocks + (incremented ? -1 : 1)); 1391 1392 /* The open slab doesn't need to be reprioritized until it is closed. */ 1393 if (slab == allocator->open_slab) 1394 return; 1395 1396 /* Don't bother adjusting the priority table if unneeded. */ 1397 if (slab->priority == calculate_slab_priority(slab)) 1398 return; 1399 1400 /* 1401 * Reprioritize the slab to reflect the new free block count by removing it from the table 1402 * and re-enqueuing it with the new priority. 1403 */ 1404 vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry); 1405 prioritize_slab(slab); 1406 } 1407 1408 /** 1409 * increment_for_data() - Increment the reference count for a data block. 1410 * @slab: The slab which owns the block. 1411 * @block: The reference block which contains the block being updated. 1412 * @block_number: The block to update. 1413 * @old_status: The reference status of the data block before this increment. 1414 * @lock: The pbn_lock associated with this increment (may be NULL). 1415 * @counter_ptr: A pointer to the count for the data block (in, out). 1416 * @adjust_block_count: Whether to update the allocator's free block count. 1417 * 1418 * Return: VDO_SUCCESS or an error. 1419 */ 1420 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block, 1421 slab_block_number block_number, 1422 enum reference_status old_status, 1423 struct pbn_lock *lock, vdo_refcount_t *counter_ptr, 1424 bool adjust_block_count) 1425 { 1426 switch (old_status) { 1427 case RS_FREE: 1428 *counter_ptr = 1; 1429 block->allocated_count++; 1430 slab->free_blocks--; 1431 if (adjust_block_count) 1432 adjust_free_block_count(slab, false); 1433 1434 break; 1435 1436 case RS_PROVISIONAL: 1437 *counter_ptr = 1; 1438 break; 1439 1440 default: 1441 /* Single or shared */ 1442 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) { 1443 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1444 "Incrementing a block already having 254 references (slab %u, offset %u)", 1445 slab->slab_number, block_number); 1446 } 1447 (*counter_ptr)++; 1448 } 1449 1450 if (lock != NULL) 1451 vdo_unassign_pbn_lock_provisional_reference(lock); 1452 return VDO_SUCCESS; 1453 } 1454 1455 /** 1456 * decrement_for_data() - Decrement the reference count for a data block. 1457 * @slab: The slab which owns the block. 1458 * @block: The reference block which contains the block being updated. 1459 * @block_number: The block to update. 1460 * @old_status: The reference status of the data block before this decrement. 1461 * @updater: The reference updater doing this operation in case we need to look up the pbn lock. 1462 * @lock: The pbn_lock associated with the block being decremented (may be NULL). 1463 * @counter_ptr: A pointer to the count for the data block (in, out). 1464 * @adjust_block_count: Whether to update the allocator's free block count. 1465 * 1466 * Return: VDO_SUCCESS or an error. 1467 */ 1468 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block, 1469 slab_block_number block_number, 1470 enum reference_status old_status, 1471 struct reference_updater *updater, 1472 vdo_refcount_t *counter_ptr, bool adjust_block_count) 1473 { 1474 switch (old_status) { 1475 case RS_FREE: 1476 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1477 "Decrementing free block at offset %u in slab %u", 1478 block_number, slab->slab_number); 1479 1480 case RS_PROVISIONAL: 1481 case RS_SINGLE: 1482 if (updater->zpbn.zone != NULL) { 1483 struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone, 1484 updater->zpbn.pbn); 1485 1486 if (lock != NULL) { 1487 /* 1488 * There is a read lock on this block, so the block must not become 1489 * unreferenced. 1490 */ 1491 *counter_ptr = PROVISIONAL_REFERENCE_COUNT; 1492 vdo_assign_pbn_lock_provisional_reference(lock); 1493 break; 1494 } 1495 } 1496 1497 *counter_ptr = EMPTY_REFERENCE_COUNT; 1498 block->allocated_count--; 1499 slab->free_blocks++; 1500 if (adjust_block_count) 1501 adjust_free_block_count(slab, true); 1502 1503 break; 1504 1505 default: 1506 /* Shared */ 1507 (*counter_ptr)--; 1508 } 1509 1510 return VDO_SUCCESS; 1511 } 1512 1513 /** 1514 * increment_for_block_map() - Increment the reference count for a block map page. 1515 * @slab: The slab which owns the block. 1516 * @block: The reference block which contains the block being updated. 1517 * @block_number: The block to update. 1518 * @old_status: The reference status of the block before this increment. 1519 * @lock: The pbn_lock associated with this increment (may be NULL). 1520 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild. 1521 * @counter_ptr: A pointer to the count for the block (in, out). 1522 * @adjust_block_count: Whether to update the allocator's free block count. 1523 * 1524 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map 1525 * blocks never dedupe they should never be adjusted from any other state. The adjustment always 1526 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map 1527 * blocks. 1528 * 1529 * Return: VDO_SUCCESS or an error. 1530 */ 1531 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block, 1532 slab_block_number block_number, 1533 enum reference_status old_status, 1534 struct pbn_lock *lock, bool normal_operation, 1535 vdo_refcount_t *counter_ptr, bool adjust_block_count) 1536 { 1537 switch (old_status) { 1538 case RS_FREE: 1539 if (normal_operation) { 1540 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1541 "Incrementing unallocated block map block (slab %u, offset %u)", 1542 slab->slab_number, block_number); 1543 } 1544 1545 *counter_ptr = MAXIMUM_REFERENCE_COUNT; 1546 block->allocated_count++; 1547 slab->free_blocks--; 1548 if (adjust_block_count) 1549 adjust_free_block_count(slab, false); 1550 1551 return VDO_SUCCESS; 1552 1553 case RS_PROVISIONAL: 1554 if (!normal_operation) 1555 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1556 "Block map block had provisional reference during replay (slab %u, offset %u)", 1557 slab->slab_number, block_number); 1558 1559 *counter_ptr = MAXIMUM_REFERENCE_COUNT; 1560 if (lock != NULL) 1561 vdo_unassign_pbn_lock_provisional_reference(lock); 1562 return VDO_SUCCESS; 1563 1564 default: 1565 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID, 1566 "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)", 1567 *counter_ptr, slab->slab_number, 1568 block_number); 1569 } 1570 } 1571 1572 static bool __must_check is_valid_journal_point(const struct journal_point *point) 1573 { 1574 return ((point != NULL) && (point->sequence_number > 0)); 1575 } 1576 1577 /** 1578 * update_reference_count() - Update the reference count of a block. 1579 * @slab: The slab which owns the block. 1580 * @block: The reference block which contains the block being updated. 1581 * @block_number: The block to update. 1582 * @slab_journal_point: The slab journal point at which this update is journaled. 1583 * @updater: The reference updater. 1584 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild. 1585 * @adjust_block_count: Whether to update the slab's free block count. 1586 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement 1587 * of a provisional reference. 1588 * 1589 * Return: VDO_SUCCESS or an error. 1590 */ 1591 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block, 1592 slab_block_number block_number, 1593 const struct journal_point *slab_journal_point, 1594 struct reference_updater *updater, 1595 bool normal_operation, bool adjust_block_count, 1596 bool *provisional_decrement_ptr) 1597 { 1598 vdo_refcount_t *counter_ptr = &slab->counters[block_number]; 1599 enum reference_status old_status = reference_count_to_status(*counter_ptr); 1600 int result; 1601 1602 if (!updater->increment) { 1603 result = decrement_for_data(slab, block, block_number, old_status, 1604 updater, counter_ptr, adjust_block_count); 1605 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) { 1606 if (provisional_decrement_ptr != NULL) 1607 *provisional_decrement_ptr = true; 1608 return VDO_SUCCESS; 1609 } 1610 } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) { 1611 result = increment_for_data(slab, block, block_number, old_status, 1612 updater->lock, counter_ptr, adjust_block_count); 1613 } else { 1614 result = increment_for_block_map(slab, block, block_number, old_status, 1615 updater->lock, normal_operation, 1616 counter_ptr, adjust_block_count); 1617 } 1618 1619 if (result != VDO_SUCCESS) 1620 return result; 1621 1622 if (is_valid_journal_point(slab_journal_point)) 1623 slab->slab_journal_point = *slab_journal_point; 1624 1625 return VDO_SUCCESS; 1626 } 1627 1628 static int __must_check adjust_reference_count(struct vdo_slab *slab, 1629 struct reference_updater *updater, 1630 const struct journal_point *slab_journal_point) 1631 { 1632 slab_block_number block_number; 1633 int result; 1634 struct reference_block *block; 1635 bool provisional_decrement = false; 1636 1637 if (!is_slab_open(slab)) 1638 return VDO_INVALID_ADMIN_STATE; 1639 1640 result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number); 1641 if (result != VDO_SUCCESS) 1642 return result; 1643 1644 block = get_reference_block(slab, block_number); 1645 result = update_reference_count(slab, block, block_number, slab_journal_point, 1646 updater, NORMAL_OPERATION, true, 1647 &provisional_decrement); 1648 if ((result != VDO_SUCCESS) || provisional_decrement) 1649 return result; 1650 1651 if (block->is_dirty && (block->slab_journal_lock > 0)) { 1652 sequence_number_t entry_lock = slab_journal_point->sequence_number; 1653 /* 1654 * This block is already dirty and a slab journal entry has been made for it since 1655 * the last time it was clean. We must release the per-entry slab journal lock for 1656 * the entry associated with the update we are now doing. 1657 */ 1658 result = VDO_ASSERT(is_valid_journal_point(slab_journal_point), 1659 "Reference count adjustments need slab journal points."); 1660 if (result != VDO_SUCCESS) 1661 return result; 1662 1663 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1); 1664 return VDO_SUCCESS; 1665 } 1666 1667 /* 1668 * This may be the first time we are applying an update for which there is a slab journal 1669 * entry to this block since the block was cleaned. Therefore, we convert the per-entry 1670 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock. 1671 */ 1672 if (is_valid_journal_point(slab_journal_point)) 1673 block->slab_journal_lock = slab_journal_point->sequence_number; 1674 else 1675 block->slab_journal_lock = 0; 1676 1677 dirty_block(block); 1678 return VDO_SUCCESS; 1679 } 1680 1681 /** 1682 * add_entry_from_waiter() - Add an entry to the slab journal. 1683 * @waiter: The vio which should make an entry now. 1684 * @context: The slab journal to make an entry in. 1685 * 1686 * This callback is invoked by add_entries() once it has determined that we are ready to make 1687 * another entry in the slab journal. Implements waiter_callback_fn. 1688 */ 1689 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context) 1690 { 1691 int result; 1692 struct reference_updater *updater = 1693 container_of(waiter, struct reference_updater, waiter); 1694 struct data_vio *data_vio = data_vio_from_reference_updater(updater); 1695 struct slab_journal *journal = context; 1696 struct slab_journal_block_header *header = &journal->tail_header; 1697 struct journal_point slab_journal_point = { 1698 .sequence_number = header->sequence_number, 1699 .entry_count = header->entry_count, 1700 }; 1701 sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number; 1702 1703 if (header->entry_count == 0) { 1704 /* 1705 * This is the first entry in the current tail block, so get a lock on the recovery 1706 * journal which we will hold until this tail block is committed. 1707 */ 1708 get_lock(journal, header->sequence_number)->recovery_start = recovery_block; 1709 if (journal->recovery_journal != NULL) { 1710 zone_count_t zone_number = journal->slab->allocator->zone_number; 1711 1712 vdo_acquire_recovery_journal_block_reference(journal->recovery_journal, 1713 recovery_block, 1714 VDO_ZONE_TYPE_PHYSICAL, 1715 zone_number); 1716 } 1717 1718 mark_slab_journal_dirty(journal, recovery_block); 1719 reclaim_journal_space(journal); 1720 } 1721 1722 add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment, 1723 expand_journal_point(data_vio->recovery_journal_point, 1724 updater->increment)); 1725 1726 if (journal->slab->status != VDO_SLAB_REBUILT) { 1727 /* 1728 * If the slab is unrecovered, scrubbing will take care of the count since the 1729 * update is now recorded in the journal. 1730 */ 1731 adjust_slab_journal_block_reference(journal, 1732 slab_journal_point.sequence_number, -1); 1733 result = VDO_SUCCESS; 1734 } else { 1735 /* Now that an entry has been made in the slab journal, update the counter. */ 1736 result = adjust_reference_count(journal->slab, updater, 1737 &slab_journal_point); 1738 } 1739 1740 if (updater->increment) 1741 continue_data_vio_with_error(data_vio, result); 1742 else 1743 vdo_continue_completion(&data_vio->decrement_completion, result); 1744 } 1745 1746 /** 1747 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map 1748 * increment. 1749 * @journal: The journal. 1750 * 1751 * Return: true if the first entry waiter's operation is a block map increment. 1752 */ 1753 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal) 1754 { 1755 struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters); 1756 struct reference_updater *updater = 1757 container_of(waiter, struct reference_updater, waiter); 1758 1759 return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING); 1760 } 1761 1762 /** 1763 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries. 1764 * @journal: The journal to which entries may be added. 1765 * 1766 * By processing the queue in order, we ensure that slab journal entries are made in the same order 1767 * as recovery journal entries for the same increment or decrement. 1768 */ 1769 static void add_entries(struct slab_journal *journal) 1770 { 1771 if (journal->adding_entries) { 1772 /* Protect against re-entrancy. */ 1773 return; 1774 } 1775 1776 journal->adding_entries = true; 1777 while (vdo_waitq_has_waiters(&journal->entry_waiters)) { 1778 struct slab_journal_block_header *header = &journal->tail_header; 1779 1780 if (journal->partial_write_in_progress || 1781 (journal->slab->status == VDO_SLAB_REBUILDING)) { 1782 /* 1783 * Don't add entries while rebuilding or while a partial write is 1784 * outstanding, as it could result in reference count corruption. 1785 */ 1786 break; 1787 } 1788 1789 if (journal->waiting_to_commit) { 1790 /* 1791 * If we are waiting for resources to write the tail block, and the tail 1792 * block is full, we can't make another entry. 1793 */ 1794 WRITE_ONCE(journal->events->tail_busy_count, 1795 journal->events->tail_busy_count + 1); 1796 break; 1797 } else if (is_next_entry_a_block_map_increment(journal) && 1798 (header->entry_count >= journal->full_entries_per_block)) { 1799 /* 1800 * The tail block does not have room for a block map increment, so commit 1801 * it now. 1802 */ 1803 commit_tail(journal); 1804 if (journal->waiting_to_commit) { 1805 WRITE_ONCE(journal->events->tail_busy_count, 1806 journal->events->tail_busy_count + 1); 1807 break; 1808 } 1809 } 1810 1811 /* If the slab is over the blocking threshold, make the vio wait. */ 1812 if (requires_reaping(journal)) { 1813 WRITE_ONCE(journal->events->blocked_count, 1814 journal->events->blocked_count + 1); 1815 save_dirty_reference_blocks(journal->slab); 1816 break; 1817 } 1818 1819 if (header->entry_count == 0) { 1820 struct journal_lock *lock = 1821 get_lock(journal, header->sequence_number); 1822 1823 /* 1824 * Check if the on disk slab journal is full. Because of the blocking and 1825 * scrubbing thresholds, this should never happen. 1826 */ 1827 if (lock->count > 0) { 1828 VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail, 1829 "New block has locks, but journal is not full"); 1830 1831 /* 1832 * The blocking threshold must let the journal fill up if the new 1833 * block has locks; if the blocking threshold is smaller than the 1834 * journal size, the new block cannot possibly have locks already. 1835 */ 1836 VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size), 1837 "New block can have locks already iff blocking threshold is at the end of the journal"); 1838 1839 WRITE_ONCE(journal->events->disk_full_count, 1840 journal->events->disk_full_count + 1); 1841 save_dirty_reference_blocks(journal->slab); 1842 break; 1843 } 1844 1845 /* 1846 * Don't allow the new block to be reaped until all of the reference count 1847 * blocks are written and the journal block has been fully committed as 1848 * well. 1849 */ 1850 lock->count = journal->entries_per_block + 1; 1851 1852 if (header->sequence_number == 1) { 1853 struct vdo_slab *slab = journal->slab; 1854 block_count_t i; 1855 1856 /* 1857 * This is the first entry in this slab journal, ever. Dirty all of 1858 * the reference count blocks. Each will acquire a lock on the tail 1859 * block so that the journal won't be reaped until the reference 1860 * counts are initialized. The lock acquisition must be done by the 1861 * ref_counts since here we don't know how many reference blocks 1862 * the ref_counts has. 1863 */ 1864 for (i = 0; i < slab->reference_block_count; i++) { 1865 slab->reference_blocks[i].slab_journal_lock = 1; 1866 dirty_block(&slab->reference_blocks[i]); 1867 } 1868 1869 adjust_slab_journal_block_reference(journal, 1, 1870 slab->reference_block_count); 1871 } 1872 } 1873 1874 vdo_waitq_notify_next_waiter(&journal->entry_waiters, 1875 add_entry_from_waiter, journal); 1876 } 1877 1878 journal->adding_entries = false; 1879 1880 /* If there are no waiters, and we are flushing or saving, commit the tail block. */ 1881 if (vdo_is_state_draining(&journal->slab->state) && 1882 !vdo_is_state_suspending(&journal->slab->state) && 1883 !vdo_waitq_has_waiters(&journal->entry_waiters)) 1884 commit_tail(journal); 1885 } 1886 1887 /** 1888 * reset_search_cursor() - Reset the free block search back to the first reference counter in the 1889 * first reference block of a slab. 1890 */ 1891 static void reset_search_cursor(struct vdo_slab *slab) 1892 { 1893 struct search_cursor *cursor = &slab->search_cursor; 1894 1895 cursor->block = cursor->first_block; 1896 cursor->index = 0; 1897 /* Unit tests have slabs with only one reference block (and it's a runt). */ 1898 cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count); 1899 } 1900 1901 /** 1902 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in 1903 * a slab, 1904 * 1905 * Wraps around to the first reference block if the current block is the last reference block. 1906 * 1907 * Return: true unless the cursor was at the last reference block. 1908 */ 1909 static bool advance_search_cursor(struct vdo_slab *slab) 1910 { 1911 struct search_cursor *cursor = &slab->search_cursor; 1912 1913 /* 1914 * If we just finished searching the last reference block, then wrap back around to the 1915 * start of the array. 1916 */ 1917 if (cursor->block == cursor->last_block) { 1918 reset_search_cursor(slab); 1919 return false; 1920 } 1921 1922 /* We're not already at the end, so advance to cursor to the next block. */ 1923 cursor->block++; 1924 cursor->index = cursor->end_index; 1925 1926 if (cursor->block == cursor->last_block) { 1927 /* The last reference block will usually be a runt. */ 1928 cursor->end_index = slab->block_count; 1929 } else { 1930 cursor->end_index += COUNTS_PER_BLOCK; 1931 } 1932 1933 return true; 1934 } 1935 1936 /** 1937 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild. 1938 * 1939 * Return: VDO_SUCCESS or an error. 1940 */ 1941 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot, 1942 physical_block_number_t pbn, 1943 enum journal_operation operation) 1944 { 1945 int result; 1946 slab_block_number block_number; 1947 struct reference_block *block; 1948 struct vdo_slab *slab = vdo_get_slab(depot, pbn); 1949 struct reference_updater updater = { 1950 .operation = operation, 1951 .increment = true, 1952 }; 1953 1954 result = slab_block_number_from_pbn(slab, pbn, &block_number); 1955 if (result != VDO_SUCCESS) 1956 return result; 1957 1958 block = get_reference_block(slab, block_number); 1959 result = update_reference_count(slab, block, block_number, NULL, 1960 &updater, !NORMAL_OPERATION, false, NULL); 1961 if (result != VDO_SUCCESS) 1962 return result; 1963 1964 dirty_block(block); 1965 return VDO_SUCCESS; 1966 } 1967 1968 /** 1969 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal 1970 * entry into the reference count for a block. 1971 * @slab: The slab. 1972 * @entry_point: The slab journal point for the entry. 1973 * @entry: The slab journal entry being replayed. 1974 * 1975 * The adjustment will be ignored if it was already recorded in the reference count. 1976 * 1977 * Return: VDO_SUCCESS or an error code. 1978 */ 1979 static int replay_reference_count_change(struct vdo_slab *slab, 1980 const struct journal_point *entry_point, 1981 struct slab_journal_entry entry) 1982 { 1983 int result; 1984 struct reference_block *block = get_reference_block(slab, entry.sbn); 1985 sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR; 1986 struct reference_updater updater = { 1987 .operation = entry.operation, 1988 .increment = entry.increment, 1989 }; 1990 1991 if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) { 1992 /* This entry is already reflected in the existing counts, so do nothing. */ 1993 return VDO_SUCCESS; 1994 } 1995 1996 /* This entry is not yet counted in the reference counts. */ 1997 result = update_reference_count(slab, block, entry.sbn, entry_point, 1998 &updater, !NORMAL_OPERATION, false, NULL); 1999 if (result != VDO_SUCCESS) 2000 return result; 2001 2002 dirty_block(block); 2003 return VDO_SUCCESS; 2004 } 2005 2006 /** 2007 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of 2008 * reference counters. 2009 * @word_ptr: A pointer to the eight counter bytes to check. 2010 * @start_index: The array index corresponding to word_ptr[0]. 2011 * @fail_index: The array index to return if no zero byte is found. 2012 * 2013 * The search does no bounds checking; the function relies on the array being sufficiently padded. 2014 * 2015 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if 2016 * no zero byte was found. 2017 */ 2018 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr, 2019 slab_block_number start_index, 2020 slab_block_number fail_index) 2021 { 2022 u64 word = get_unaligned_le64(word_ptr); 2023 2024 /* This looks like a loop, but GCC will unroll the eight iterations for us. */ 2025 unsigned int offset; 2026 2027 for (offset = 0; offset < BYTES_PER_WORD; offset++) { 2028 /* Assumes little-endian byte order, which we have on X86. */ 2029 if ((word & 0xFF) == 0) 2030 return (start_index + offset); 2031 word >>= 8; 2032 } 2033 2034 return fail_index; 2035 } 2036 2037 /** 2038 * find_free_block() - Find the first block with a reference count of zero in the specified 2039 * range of reference counter indexes. 2040 * @slab: The slab counters to scan. 2041 * @index_ptr: A pointer to hold the array index of the free block. 2042 * 2043 * Exposed for unit testing. 2044 * 2045 * Return: true if a free block was found in the specified range. 2046 */ 2047 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr) 2048 { 2049 slab_block_number zero_index; 2050 slab_block_number next_index = slab->search_cursor.index; 2051 slab_block_number end_index = slab->search_cursor.end_index; 2052 u8 *next_counter = &slab->counters[next_index]; 2053 u8 *end_counter = &slab->counters[end_index]; 2054 2055 /* 2056 * Search every byte of the first unaligned word. (Array is padded so reading past end is 2057 * safe.) 2058 */ 2059 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index); 2060 if (zero_index < end_index) { 2061 *index_ptr = zero_index; 2062 return true; 2063 } 2064 2065 /* 2066 * On architectures where unaligned word access is expensive, this would be a good place to 2067 * advance to an alignment boundary. 2068 */ 2069 next_index += BYTES_PER_WORD; 2070 next_counter += BYTES_PER_WORD; 2071 2072 /* 2073 * Now we're word-aligned; check an word at a time until we find a word containing a zero. 2074 * (Array is padded so reading past end is safe.) 2075 */ 2076 while (next_counter < end_counter) { 2077 /* 2078 * The following code is currently an exact copy of the code preceding the loop, 2079 * but if you try to merge them by using a do loop, it runs slower because a jump 2080 * instruction gets added at the start of the iteration. 2081 */ 2082 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index); 2083 if (zero_index < end_index) { 2084 *index_ptr = zero_index; 2085 return true; 2086 } 2087 2088 next_index += BYTES_PER_WORD; 2089 next_counter += BYTES_PER_WORD; 2090 } 2091 2092 return false; 2093 } 2094 2095 /** 2096 * search_current_reference_block() - Search the reference block currently saved in the search 2097 * cursor for a reference count of zero, starting at the saved 2098 * counter index. 2099 * @slab: The slab to search. 2100 * @free_index_ptr: A pointer to receive the array index of the zero reference count. 2101 * 2102 * Return: true if an unreferenced counter was found. 2103 */ 2104 static bool search_current_reference_block(const struct vdo_slab *slab, 2105 slab_block_number *free_index_ptr) 2106 { 2107 /* Don't bother searching if the current block is known to be full. */ 2108 return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) && 2109 find_free_block(slab, free_index_ptr)); 2110 } 2111 2112 /** 2113 * search_reference_blocks() - Search each reference block for a reference count of zero. 2114 * @slab: The slab to search. 2115 * @free_index_ptr: A pointer to receive the array index of the zero reference count. 2116 * 2117 * Searches each reference block for a reference count of zero, starting at the reference block and 2118 * counter index saved in the search cursor and searching up to the end of the last reference 2119 * block. The search does not wrap. 2120 * 2121 * Return: true if an unreferenced counter was found. 2122 */ 2123 static bool search_reference_blocks(struct vdo_slab *slab, 2124 slab_block_number *free_index_ptr) 2125 { 2126 /* Start searching at the saved search position in the current block. */ 2127 if (search_current_reference_block(slab, free_index_ptr)) 2128 return true; 2129 2130 /* Search each reference block up to the end of the slab. */ 2131 while (advance_search_cursor(slab)) { 2132 if (search_current_reference_block(slab, free_index_ptr)) 2133 return true; 2134 } 2135 2136 return false; 2137 } 2138 2139 /** 2140 * make_provisional_reference() - Do the bookkeeping for making a provisional reference. 2141 */ 2142 static void make_provisional_reference(struct vdo_slab *slab, 2143 slab_block_number block_number) 2144 { 2145 struct reference_block *block = get_reference_block(slab, block_number); 2146 2147 /* 2148 * Make the initial transition from an unreferenced block to a 2149 * provisionally allocated block. 2150 */ 2151 slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT; 2152 2153 /* Account for the allocation. */ 2154 block->allocated_count++; 2155 slab->free_blocks--; 2156 } 2157 2158 /** 2159 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty. 2160 */ 2161 static void dirty_all_reference_blocks(struct vdo_slab *slab) 2162 { 2163 block_count_t i; 2164 2165 for (i = 0; i < slab->reference_block_count; i++) 2166 dirty_block(&slab->reference_blocks[i]); 2167 } 2168 2169 /** 2170 * clear_provisional_references() - Clear the provisional reference counts from a reference block. 2171 * @block: The block to clear. 2172 */ 2173 static void clear_provisional_references(struct reference_block *block) 2174 { 2175 vdo_refcount_t *counters = get_reference_counters_for_block(block); 2176 block_count_t j; 2177 2178 for (j = 0; j < COUNTS_PER_BLOCK; j++) { 2179 if (counters[j] == PROVISIONAL_REFERENCE_COUNT) { 2180 counters[j] = EMPTY_REFERENCE_COUNT; 2181 block->allocated_count--; 2182 } 2183 } 2184 } 2185 2186 static inline bool journal_points_equal(struct journal_point first, 2187 struct journal_point second) 2188 { 2189 return ((first.sequence_number == second.sequence_number) && 2190 (first.entry_count == second.entry_count)); 2191 } 2192 2193 /** 2194 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure. 2195 * @packed: The written reference block to be unpacked. 2196 * @block: The internal reference block to be loaded. 2197 */ 2198 static void unpack_reference_block(struct packed_reference_block *packed, 2199 struct reference_block *block) 2200 { 2201 block_count_t index; 2202 sector_count_t i; 2203 struct vdo_slab *slab = block->slab; 2204 vdo_refcount_t *counters = get_reference_counters_for_block(block); 2205 2206 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) { 2207 struct packed_reference_sector *sector = &packed->sectors[i]; 2208 2209 vdo_unpack_journal_point(§or->commit_point, &block->commit_points[i]); 2210 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts, 2211 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR)); 2212 /* The slab_journal_point must be the latest point found in any sector. */ 2213 if (vdo_before_journal_point(&slab->slab_journal_point, 2214 &block->commit_points[i])) 2215 slab->slab_journal_point = block->commit_points[i]; 2216 2217 if ((i > 0) && 2218 !journal_points_equal(block->commit_points[0], 2219 block->commit_points[i])) { 2220 size_t block_index = block - block->slab->reference_blocks; 2221 2222 vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u", 2223 i, block_index, block->slab->slab_number); 2224 } 2225 } 2226 2227 block->allocated_count = 0; 2228 for (index = 0; index < COUNTS_PER_BLOCK; index++) { 2229 if (counters[index] != EMPTY_REFERENCE_COUNT) 2230 block->allocated_count++; 2231 } 2232 } 2233 2234 /** 2235 * finish_reference_block_load() - After a reference block has been read, unpack it. 2236 * @completion: The VIO that just finished reading. 2237 */ 2238 static void finish_reference_block_load(struct vdo_completion *completion) 2239 { 2240 struct vio *vio = as_vio(completion); 2241 struct pooled_vio *pooled = vio_as_pooled_vio(vio); 2242 struct reference_block *block = completion->parent; 2243 struct vdo_slab *slab = block->slab; 2244 2245 unpack_reference_block((struct packed_reference_block *) vio->data, block); 2246 return_vio_to_pool(slab->allocator->vio_pool, pooled); 2247 slab->active_count--; 2248 clear_provisional_references(block); 2249 2250 slab->free_blocks -= block->allocated_count; 2251 check_if_slab_drained(slab); 2252 } 2253 2254 static void load_reference_block_endio(struct bio *bio) 2255 { 2256 struct vio *vio = bio->bi_private; 2257 struct reference_block *block = vio->completion.parent; 2258 2259 continue_vio_after_io(vio, finish_reference_block_load, 2260 block->slab->allocator->thread_id); 2261 } 2262 2263 /** 2264 * load_reference_block() - After a block waiter has gotten a VIO from the VIO pool, load the 2265 * block. 2266 * @waiter: The waiter of the block to load. 2267 * @context: The VIO returned by the pool. 2268 */ 2269 static void load_reference_block(struct vdo_waiter *waiter, void *context) 2270 { 2271 struct pooled_vio *pooled = context; 2272 struct vio *vio = &pooled->vio; 2273 struct reference_block *block = 2274 container_of(waiter, struct reference_block, waiter); 2275 size_t block_offset = (block - block->slab->reference_blocks); 2276 2277 vio->completion.parent = block; 2278 vdo_submit_metadata_vio(vio, block->slab->ref_counts_origin + block_offset, 2279 load_reference_block_endio, handle_io_error, 2280 REQ_OP_READ); 2281 } 2282 2283 /** 2284 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a 2285 * pre-allocated reference counter. 2286 */ 2287 static void load_reference_blocks(struct vdo_slab *slab) 2288 { 2289 block_count_t i; 2290 2291 slab->free_blocks = slab->block_count; 2292 slab->active_count = slab->reference_block_count; 2293 for (i = 0; i < slab->reference_block_count; i++) { 2294 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter; 2295 2296 waiter->callback = load_reference_block; 2297 acquire_vio_from_pool(slab->allocator->vio_pool, waiter); 2298 } 2299 } 2300 2301 /** 2302 * drain_slab() - Drain all reference count I/O. 2303 * 2304 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the 2305 * reference blocks may be loaded from disk or dirty reference blocks may be written out. 2306 */ 2307 static void drain_slab(struct vdo_slab *slab) 2308 { 2309 bool save; 2310 bool load; 2311 const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state); 2312 2313 if (state == VDO_ADMIN_STATE_SUSPENDING) 2314 return; 2315 2316 if ((state != VDO_ADMIN_STATE_REBUILDING) && 2317 (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING)) 2318 commit_tail(&slab->journal); 2319 2320 if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL)) 2321 return; 2322 2323 save = false; 2324 load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts; 2325 if (state == VDO_ADMIN_STATE_SCRUBBING) { 2326 if (load) { 2327 load_reference_blocks(slab); 2328 return; 2329 } 2330 } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) { 2331 if (!load) { 2332 /* These reference counts were never written, so mark them all dirty. */ 2333 dirty_all_reference_blocks(slab); 2334 } 2335 save = true; 2336 } else if (state == VDO_ADMIN_STATE_REBUILDING) { 2337 /* 2338 * Write out the counters if the slab has written them before, or it has any 2339 * non-zero reference counts, or there are any slab journal blocks. 2340 */ 2341 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks; 2342 2343 if (load || (slab->free_blocks != data_blocks) || 2344 !is_slab_journal_blank(slab)) { 2345 dirty_all_reference_blocks(slab); 2346 save = true; 2347 } 2348 } else if (state == VDO_ADMIN_STATE_SAVING) { 2349 save = (slab->status == VDO_SLAB_REBUILT); 2350 } else { 2351 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS); 2352 return; 2353 } 2354 2355 if (save) 2356 save_dirty_reference_blocks(slab); 2357 } 2358 2359 static int allocate_slab_counters(struct vdo_slab *slab) 2360 { 2361 int result; 2362 size_t index, bytes; 2363 2364 result = VDO_ASSERT(slab->reference_blocks == NULL, 2365 "vdo_slab %u doesn't allocate refcounts twice", 2366 slab->slab_number); 2367 if (result != VDO_SUCCESS) 2368 return result; 2369 2370 result = vdo_allocate(slab->reference_block_count, struct reference_block, 2371 __func__, &slab->reference_blocks); 2372 if (result != VDO_SUCCESS) 2373 return result; 2374 2375 /* 2376 * Allocate such that the runt slab has a full-length memory array, plus a little padding 2377 * so we can word-search even at the very end. 2378 */ 2379 bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD); 2380 result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array", 2381 &slab->counters); 2382 if (result != VDO_SUCCESS) { 2383 vdo_free(vdo_forget(slab->reference_blocks)); 2384 return result; 2385 } 2386 2387 slab->search_cursor.first_block = slab->reference_blocks; 2388 slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1]; 2389 reset_search_cursor(slab); 2390 2391 for (index = 0; index < slab->reference_block_count; index++) { 2392 slab->reference_blocks[index] = (struct reference_block) { 2393 .slab = slab, 2394 }; 2395 } 2396 2397 return VDO_SUCCESS; 2398 } 2399 2400 static int allocate_counters_if_clean(struct vdo_slab *slab) 2401 { 2402 if (vdo_is_state_clean_load(&slab->state)) 2403 return allocate_slab_counters(slab); 2404 2405 return VDO_SUCCESS; 2406 } 2407 2408 static void finish_loading_journal(struct vdo_completion *completion) 2409 { 2410 struct vio *vio = as_vio(completion); 2411 struct slab_journal *journal = completion->parent; 2412 struct vdo_slab *slab = journal->slab; 2413 struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data; 2414 struct slab_journal_block_header header; 2415 2416 vdo_unpack_slab_journal_block_header(&block->header, &header); 2417 2418 /* FIXME: should it be an error if the following conditional fails? */ 2419 if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) && 2420 (header.nonce == slab->allocator->nonce)) { 2421 journal->tail = header.sequence_number + 1; 2422 2423 /* 2424 * If the slab is clean, this implies the slab journal is empty, so advance the 2425 * head appropriately. 2426 */ 2427 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ? 2428 header.head : journal->tail); 2429 journal->tail_header = header; 2430 initialize_journal_state(journal); 2431 } 2432 2433 return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio)); 2434 vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab)); 2435 } 2436 2437 static void read_slab_journal_tail_endio(struct bio *bio) 2438 { 2439 struct vio *vio = bio->bi_private; 2440 struct slab_journal *journal = vio->completion.parent; 2441 2442 continue_vio_after_io(vio, finish_loading_journal, 2443 journal->slab->allocator->thread_id); 2444 } 2445 2446 static void handle_load_error(struct vdo_completion *completion) 2447 { 2448 int result = completion->result; 2449 struct slab_journal *journal = completion->parent; 2450 struct vio *vio = as_vio(completion); 2451 2452 vio_record_metadata_io_error(vio); 2453 return_vio_to_pool(journal->slab->allocator->vio_pool, vio_as_pooled_vio(vio)); 2454 vdo_finish_loading_with_result(&journal->slab->state, result); 2455 } 2456 2457 /** 2458 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio 2459 * pool. 2460 * @waiter: The vio pool waiter which has just been notified. 2461 * @context: The vio pool entry given to the waiter. 2462 * 2463 * This is the success callback from acquire_vio_from_pool() when loading a slab journal. 2464 */ 2465 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context) 2466 { 2467 struct slab_journal *journal = 2468 container_of(waiter, struct slab_journal, resource_waiter); 2469 struct vdo_slab *slab = journal->slab; 2470 struct pooled_vio *pooled = context; 2471 struct vio *vio = &pooled->vio; 2472 tail_block_offset_t last_commit_point = 2473 slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 2474 2475 /* 2476 * Slab summary keeps the commit point offset, so the tail block is the block before that. 2477 * Calculation supports small journals in unit tests. 2478 */ 2479 tail_block_offset_t tail_block = ((last_commit_point == 0) ? 2480 (tail_block_offset_t)(journal->size - 1) : 2481 (last_commit_point - 1)); 2482 2483 vio->completion.parent = journal; 2484 vio->completion.callback_thread_id = slab->allocator->thread_id; 2485 vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block, 2486 read_slab_journal_tail_endio, handle_load_error, 2487 REQ_OP_READ); 2488 } 2489 2490 /** 2491 * load_slab_journal() - Load a slab's journal by reading the journal's tail. 2492 */ 2493 static void load_slab_journal(struct vdo_slab *slab) 2494 { 2495 struct slab_journal *journal = &slab->journal; 2496 tail_block_offset_t last_commit_point; 2497 2498 last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset; 2499 if ((last_commit_point == 0) && 2500 !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) { 2501 /* 2502 * This slab claims that it has a tail block at (journal->size - 1), but a head of 2503 * 1. This is impossible, due to the scrubbing threshold, on a real system, so 2504 * don't bother reading the (bogus) data off disk. 2505 */ 2506 VDO_ASSERT_LOG_ONLY(((journal->size < 16) || 2507 (journal->scrubbing_threshold < (journal->size - 1))), 2508 "Scrubbing threshold protects against reads of unwritten slab journal blocks"); 2509 vdo_finish_loading_with_result(&slab->state, 2510 allocate_counters_if_clean(slab)); 2511 return; 2512 } 2513 2514 journal->resource_waiter.callback = read_slab_journal_tail; 2515 acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter); 2516 } 2517 2518 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority) 2519 { 2520 struct slab_scrubber *scrubber = &slab->allocator->scrubber; 2521 2522 VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT), 2523 "slab to be scrubbed is unrecovered"); 2524 2525 if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING) 2526 return; 2527 2528 list_del_init(&slab->allocq_entry); 2529 if (!slab->was_queued_for_scrubbing) { 2530 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1); 2531 slab->was_queued_for_scrubbing = true; 2532 } 2533 2534 if (high_priority) { 2535 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING; 2536 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs); 2537 return; 2538 } 2539 2540 list_add_tail(&slab->allocq_entry, &scrubber->slabs); 2541 } 2542 2543 /* Queue a slab for allocation or scrubbing. */ 2544 static void queue_slab(struct vdo_slab *slab) 2545 { 2546 struct block_allocator *allocator = slab->allocator; 2547 block_count_t free_blocks; 2548 int result; 2549 2550 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry), 2551 "a requeued slab must not already be on a ring"); 2552 2553 if (vdo_is_read_only(allocator->depot->vdo)) 2554 return; 2555 2556 free_blocks = slab->free_blocks; 2557 result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks), 2558 "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)", 2559 slab->slab_number, (unsigned long long) free_blocks, 2560 (unsigned long long) allocator->depot->slab_config.data_blocks); 2561 if (result != VDO_SUCCESS) { 2562 vdo_enter_read_only_mode(allocator->depot->vdo, result); 2563 return; 2564 } 2565 2566 if (slab->status != VDO_SLAB_REBUILT) { 2567 register_slab_for_scrubbing(slab, false); 2568 return; 2569 } 2570 2571 if (!vdo_is_state_resuming(&slab->state)) { 2572 /* 2573 * If the slab is resuming, we've already accounted for it here, so don't do it 2574 * again. 2575 * FIXME: under what situation would the slab be resuming here? 2576 */ 2577 WRITE_ONCE(allocator->allocated_blocks, 2578 allocator->allocated_blocks - free_blocks); 2579 if (!is_slab_journal_blank(slab)) { 2580 WRITE_ONCE(allocator->statistics.slabs_opened, 2581 allocator->statistics.slabs_opened + 1); 2582 } 2583 } 2584 2585 if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING) 2586 reopen_slab_journal(slab); 2587 2588 prioritize_slab(slab); 2589 } 2590 2591 /** 2592 * initiate_slab_action() - Initiate a slab action. 2593 * 2594 * Implements vdo_admin_initiator_fn. 2595 */ 2596 static void initiate_slab_action(struct admin_state *state) 2597 { 2598 struct vdo_slab *slab = container_of(state, struct vdo_slab, state); 2599 2600 if (vdo_is_state_draining(state)) { 2601 const struct admin_state_code *operation = vdo_get_admin_state_code(state); 2602 2603 if (operation == VDO_ADMIN_STATE_SCRUBBING) 2604 slab->status = VDO_SLAB_REBUILDING; 2605 2606 drain_slab(slab); 2607 check_if_slab_drained(slab); 2608 return; 2609 } 2610 2611 if (vdo_is_state_loading(state)) { 2612 load_slab_journal(slab); 2613 return; 2614 } 2615 2616 if (vdo_is_state_resuming(state)) { 2617 queue_slab(slab); 2618 vdo_finish_resuming(state); 2619 return; 2620 } 2621 2622 vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE); 2623 } 2624 2625 /** 2626 * get_next_slab() - Get the next slab to scrub. 2627 * @scrubber: The slab scrubber. 2628 * 2629 * Return: The next slab to scrub or NULL if there are none. 2630 */ 2631 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber) 2632 { 2633 struct vdo_slab *slab; 2634 2635 slab = list_first_entry_or_null(&scrubber->high_priority_slabs, 2636 struct vdo_slab, allocq_entry); 2637 if (slab != NULL) 2638 return slab; 2639 2640 return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab, 2641 allocq_entry); 2642 } 2643 2644 /** 2645 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub. 2646 * @scrubber: The scrubber to check. 2647 * 2648 * Return: true if the scrubber has slabs to scrub. 2649 */ 2650 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber) 2651 { 2652 return (get_next_slab(scrubber) != NULL); 2653 } 2654 2655 /** 2656 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio. 2657 * @scrubber: The scrubber. 2658 */ 2659 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber) 2660 { 2661 vdo_free(vdo_forget(scrubber->vio.data)); 2662 free_vio_components(&scrubber->vio); 2663 } 2664 2665 /** 2666 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because 2667 * there's been an error. 2668 * @scrubber: The scrubber. 2669 */ 2670 static void finish_scrubbing(struct slab_scrubber *scrubber, int result) 2671 { 2672 bool notify = vdo_waitq_has_waiters(&scrubber->waiters); 2673 bool done = !has_slabs_to_scrub(scrubber); 2674 struct block_allocator *allocator = 2675 container_of(scrubber, struct block_allocator, scrubber); 2676 2677 if (done) 2678 uninitialize_scrubber_vio(scrubber); 2679 2680 if (scrubber->high_priority_only) { 2681 scrubber->high_priority_only = false; 2682 vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result); 2683 } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) { 2684 /* All of our slabs were scrubbed, and we're the last allocator to finish. */ 2685 enum vdo_state prior_state = 2686 atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING, 2687 VDO_DIRTY); 2688 2689 /* 2690 * To be safe, even if the CAS failed, ensure anything that follows is ordered with 2691 * respect to whatever state change did happen. 2692 */ 2693 smp_mb__after_atomic(); 2694 2695 /* 2696 * We must check the VDO state here and not the depot's read_only_notifier since 2697 * the compare-swap-above could have failed due to a read-only entry which our own 2698 * thread does not yet know about. 2699 */ 2700 if (prior_state == VDO_DIRTY) 2701 vdo_log_info("VDO commencing normal operation"); 2702 else if (prior_state == VDO_RECOVERING) 2703 vdo_log_info("Exiting recovery mode"); 2704 } 2705 2706 /* 2707 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to 2708 * happen. 2709 */ 2710 if (!vdo_finish_draining(&scrubber->admin_state)) 2711 WRITE_ONCE(scrubber->admin_state.current_state, 2712 VDO_ADMIN_STATE_SUSPENDED); 2713 2714 /* 2715 * We can't notify waiters until after we've finished draining or they'll just requeue. 2716 * Fortunately if there were waiters, we can't have been freed yet. 2717 */ 2718 if (notify) 2719 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL); 2720 } 2721 2722 static void scrub_next_slab(struct slab_scrubber *scrubber); 2723 2724 /** 2725 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed. 2726 * @completion: The slab rebuild completion. 2727 * 2728 * This callback is registered in apply_journal_entries(). 2729 */ 2730 static void slab_scrubbed(struct vdo_completion *completion) 2731 { 2732 struct slab_scrubber *scrubber = 2733 container_of(as_vio(completion), struct slab_scrubber, vio); 2734 struct vdo_slab *slab = scrubber->slab; 2735 2736 slab->status = VDO_SLAB_REBUILT; 2737 queue_slab(slab); 2738 reopen_slab_journal(slab); 2739 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1); 2740 scrub_next_slab(scrubber); 2741 } 2742 2743 /** 2744 * abort_scrubbing() - Abort scrubbing due to an error. 2745 * @scrubber: The slab scrubber. 2746 * @result: The error. 2747 */ 2748 static void abort_scrubbing(struct slab_scrubber *scrubber, int result) 2749 { 2750 vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result); 2751 finish_scrubbing(scrubber, result); 2752 } 2753 2754 /** 2755 * handle_scrubber_error() - Handle errors while rebuilding a slab. 2756 * @completion: The slab rebuild completion. 2757 */ 2758 static void handle_scrubber_error(struct vdo_completion *completion) 2759 { 2760 struct vio *vio = as_vio(completion); 2761 2762 vio_record_metadata_io_error(vio); 2763 abort_scrubbing(container_of(vio, struct slab_scrubber, vio), 2764 completion->result); 2765 } 2766 2767 /** 2768 * apply_block_entries() - Apply all the entries in a block to the reference counts. 2769 * @block: A block with entries to apply. 2770 * @entry_count: The number of entries to apply. 2771 * @block_number: The sequence number of the block. 2772 * @slab: The slab to apply the entries to. 2773 * 2774 * Return: VDO_SUCCESS or an error code. 2775 */ 2776 static int apply_block_entries(struct packed_slab_journal_block *block, 2777 journal_entry_count_t entry_count, 2778 sequence_number_t block_number, struct vdo_slab *slab) 2779 { 2780 struct journal_point entry_point = { 2781 .sequence_number = block_number, 2782 .entry_count = 0, 2783 }; 2784 int result; 2785 slab_block_number max_sbn = slab->end - slab->start; 2786 2787 while (entry_point.entry_count < entry_count) { 2788 struct slab_journal_entry entry = 2789 vdo_decode_slab_journal_entry(block, entry_point.entry_count); 2790 2791 if (entry.sbn > max_sbn) { 2792 /* This entry is out of bounds. */ 2793 return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL, 2794 "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)", 2795 (unsigned long long) block_number, 2796 entry_point.entry_count, 2797 entry.sbn, max_sbn); 2798 } 2799 2800 result = replay_reference_count_change(slab, &entry_point, entry); 2801 if (result != VDO_SUCCESS) { 2802 vdo_log_error_strerror(result, 2803 "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u", 2804 (unsigned long long) block_number, 2805 entry_point.entry_count, 2806 vdo_get_journal_operation_name(entry.operation), 2807 entry.sbn, slab->slab_number); 2808 return result; 2809 } 2810 entry_point.entry_count++; 2811 } 2812 2813 return VDO_SUCCESS; 2814 } 2815 2816 /** 2817 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries. 2818 * @completion: The metadata read vio completion. 2819 * 2820 * This is a callback registered in start_scrubbing(). 2821 */ 2822 static void apply_journal_entries(struct vdo_completion *completion) 2823 { 2824 int result; 2825 struct slab_scrubber *scrubber = 2826 container_of(as_vio(completion), struct slab_scrubber, vio); 2827 struct vdo_slab *slab = scrubber->slab; 2828 struct slab_journal *journal = &slab->journal; 2829 2830 /* Find the boundaries of the useful part of the journal. */ 2831 sequence_number_t tail = journal->tail; 2832 tail_block_offset_t end_index = (tail - 1) % journal->size; 2833 char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE); 2834 struct packed_slab_journal_block *end_block = 2835 (struct packed_slab_journal_block *) end_data; 2836 2837 sequence_number_t head = __le64_to_cpu(end_block->header.head); 2838 tail_block_offset_t head_index = head % journal->size; 2839 block_count_t index = head_index; 2840 2841 struct journal_point ref_counts_point = slab->slab_journal_point; 2842 struct journal_point last_entry_applied = ref_counts_point; 2843 sequence_number_t sequence; 2844 2845 for (sequence = head; sequence < tail; sequence++) { 2846 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE); 2847 struct packed_slab_journal_block *block = 2848 (struct packed_slab_journal_block *) block_data; 2849 struct slab_journal_block_header header; 2850 2851 vdo_unpack_slab_journal_block_header(&block->header, &header); 2852 2853 if ((header.nonce != slab->allocator->nonce) || 2854 (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) || 2855 (header.sequence_number != sequence) || 2856 (header.entry_count > journal->entries_per_block) || 2857 (header.has_block_map_increments && 2858 (header.entry_count > journal->full_entries_per_block))) { 2859 /* The block is not what we expect it to be. */ 2860 vdo_log_error("vdo_slab journal block for slab %u was invalid", 2861 slab->slab_number); 2862 abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL); 2863 return; 2864 } 2865 2866 result = apply_block_entries(block, header.entry_count, sequence, slab); 2867 if (result != VDO_SUCCESS) { 2868 abort_scrubbing(scrubber, result); 2869 return; 2870 } 2871 2872 last_entry_applied.sequence_number = sequence; 2873 last_entry_applied.entry_count = header.entry_count - 1; 2874 index++; 2875 if (index == journal->size) 2876 index = 0; 2877 } 2878 2879 /* 2880 * At the end of rebuild, the reference counters should be accurate to the end of the 2881 * journal we just applied. 2882 */ 2883 result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied, 2884 &ref_counts_point), 2885 "Refcounts are not more accurate than the slab journal"); 2886 if (result != VDO_SUCCESS) { 2887 abort_scrubbing(scrubber, result); 2888 return; 2889 } 2890 2891 /* Save out the rebuilt reference blocks. */ 2892 vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error, 2893 slab->allocator->thread_id, completion->parent); 2894 vdo_start_operation_with_waiter(&slab->state, 2895 VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING, 2896 completion, initiate_slab_action); 2897 } 2898 2899 static void read_slab_journal_endio(struct bio *bio) 2900 { 2901 struct vio *vio = bio->bi_private; 2902 struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio); 2903 2904 continue_vio_after_io(bio->bi_private, apply_journal_entries, 2905 scrubber->slab->allocator->thread_id); 2906 } 2907 2908 /** 2909 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed. 2910 * @completion: The scrubber's vio completion. 2911 * 2912 * This callback is registered in scrub_next_slab(). 2913 */ 2914 static void start_scrubbing(struct vdo_completion *completion) 2915 { 2916 struct slab_scrubber *scrubber = 2917 container_of(as_vio(completion), struct slab_scrubber, vio); 2918 struct vdo_slab *slab = scrubber->slab; 2919 2920 if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) { 2921 slab_scrubbed(completion); 2922 return; 2923 } 2924 2925 vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin, 2926 read_slab_journal_endio, handle_scrubber_error, 2927 REQ_OP_READ); 2928 } 2929 2930 /** 2931 * scrub_next_slab() - Scrub the next slab if there is one. 2932 * @scrubber: The scrubber. 2933 */ 2934 static void scrub_next_slab(struct slab_scrubber *scrubber) 2935 { 2936 struct vdo_completion *completion = &scrubber->vio.completion; 2937 struct vdo_slab *slab; 2938 2939 /* 2940 * Note: this notify call is always safe only because scrubbing can only be started when 2941 * the VDO is quiescent. 2942 */ 2943 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL); 2944 2945 if (vdo_is_read_only(completion->vdo)) { 2946 finish_scrubbing(scrubber, VDO_READ_ONLY); 2947 return; 2948 } 2949 2950 slab = get_next_slab(scrubber); 2951 if ((slab == NULL) || 2952 (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) { 2953 finish_scrubbing(scrubber, VDO_SUCCESS); 2954 return; 2955 } 2956 2957 if (vdo_finish_draining(&scrubber->admin_state)) 2958 return; 2959 2960 list_del_init(&slab->allocq_entry); 2961 scrubber->slab = slab; 2962 vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error, 2963 slab->allocator->thread_id, completion->parent); 2964 vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING, 2965 completion, initiate_slab_action); 2966 } 2967 2968 /** 2969 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing. 2970 * @allocator: The block_allocator to scrub. 2971 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL. 2972 */ 2973 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent) 2974 { 2975 struct slab_scrubber *scrubber = &allocator->scrubber; 2976 2977 scrubber->vio.completion.parent = parent; 2978 scrubber->high_priority_only = (parent != NULL); 2979 if (!has_slabs_to_scrub(scrubber)) { 2980 finish_scrubbing(scrubber, VDO_SUCCESS); 2981 return; 2982 } 2983 2984 if (scrubber->high_priority_only && 2985 vdo_is_priority_table_empty(allocator->prioritized_slabs) && 2986 list_empty(&scrubber->high_priority_slabs)) 2987 register_slab_for_scrubbing(get_next_slab(scrubber), true); 2988 2989 vdo_resume_if_quiescent(&scrubber->admin_state); 2990 scrub_next_slab(scrubber); 2991 } 2992 2993 static inline void assert_on_allocator_thread(thread_id_t thread_id, 2994 const char *function_name) 2995 { 2996 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id), 2997 "%s called on correct thread", function_name); 2998 } 2999 3000 static void register_slab_with_allocator(struct block_allocator *allocator, 3001 struct vdo_slab *slab) 3002 { 3003 allocator->slab_count++; 3004 allocator->last_slab = slab->slab_number; 3005 } 3006 3007 /** 3008 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot. 3009 * @depot: The depot over which to iterate. 3010 * @start: The number of the slab to start iterating from. 3011 * @end: The number of the last slab which may be returned. 3012 * @stride: The difference in slab number between successive slabs. 3013 * 3014 * Iteration always occurs from higher to lower numbered slabs. 3015 * 3016 * Return: An initialized iterator structure. 3017 */ 3018 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot, 3019 slab_count_t start, slab_count_t end, 3020 slab_count_t stride) 3021 { 3022 struct vdo_slab **slabs = depot->slabs; 3023 3024 return (struct slab_iterator) { 3025 .slabs = slabs, 3026 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]), 3027 .end = end, 3028 .stride = stride, 3029 }; 3030 } 3031 3032 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator) 3033 { 3034 return get_depot_slab_iterator(allocator->depot, allocator->last_slab, 3035 allocator->zone_number, 3036 allocator->depot->zone_count); 3037 } 3038 3039 /** 3040 * next_slab() - Get the next slab from a slab_iterator and advance the iterator 3041 * @iterator: The slab_iterator. 3042 * 3043 * Return: The next slab or NULL if the iterator is exhausted. 3044 */ 3045 static struct vdo_slab *next_slab(struct slab_iterator *iterator) 3046 { 3047 struct vdo_slab *slab = iterator->next; 3048 3049 if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride)) 3050 iterator->next = NULL; 3051 else 3052 iterator->next = iterator->slabs[slab->slab_number - iterator->stride]; 3053 3054 return slab; 3055 } 3056 3057 /** 3058 * abort_waiter() - Abort vios waiting to make journal entries when read-only. 3059 * 3060 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone 3061 * into read-only mode. Implements waiter_callback_fn. 3062 */ 3063 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused) 3064 { 3065 struct reference_updater *updater = 3066 container_of(waiter, struct reference_updater, waiter); 3067 struct data_vio *data_vio = data_vio_from_reference_updater(updater); 3068 3069 if (updater->increment) { 3070 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 3071 return; 3072 } 3073 3074 vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY); 3075 } 3076 3077 /* Implements vdo_read_only_notification_fn. */ 3078 static void notify_block_allocator_of_read_only_mode(void *listener, 3079 struct vdo_completion *parent) 3080 { 3081 struct block_allocator *allocator = listener; 3082 struct slab_iterator iterator; 3083 3084 assert_on_allocator_thread(allocator->thread_id, __func__); 3085 iterator = get_slab_iterator(allocator); 3086 while (iterator.next != NULL) { 3087 struct vdo_slab *slab = next_slab(&iterator); 3088 3089 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters, 3090 abort_waiter, &slab->journal); 3091 check_if_slab_drained(slab); 3092 } 3093 3094 vdo_finish_completion(parent); 3095 } 3096 3097 /** 3098 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if 3099 * the block it locks is unreferenced. 3100 * @slab: The slab which contains the block. 3101 * @pbn: The physical block to reference. 3102 * @lock: The lock. 3103 * 3104 * Return: VDO_SUCCESS or an error. 3105 */ 3106 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn, 3107 struct pbn_lock *lock) 3108 { 3109 slab_block_number block_number; 3110 int result; 3111 3112 if (vdo_pbn_lock_has_provisional_reference(lock)) 3113 return VDO_SUCCESS; 3114 3115 if (!is_slab_open(slab)) 3116 return VDO_INVALID_ADMIN_STATE; 3117 3118 result = slab_block_number_from_pbn(slab, pbn, &block_number); 3119 if (result != VDO_SUCCESS) 3120 return result; 3121 3122 if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) { 3123 make_provisional_reference(slab, block_number); 3124 if (lock != NULL) 3125 vdo_assign_pbn_lock_provisional_reference(lock); 3126 } 3127 3128 if (vdo_pbn_lock_has_provisional_reference(lock)) 3129 adjust_free_block_count(slab, false); 3130 3131 return VDO_SUCCESS; 3132 } 3133 3134 static int __must_check allocate_slab_block(struct vdo_slab *slab, 3135 physical_block_number_t *block_number_ptr) 3136 { 3137 slab_block_number free_index; 3138 3139 if (!is_slab_open(slab)) 3140 return VDO_INVALID_ADMIN_STATE; 3141 3142 if (!search_reference_blocks(slab, &free_index)) 3143 return VDO_NO_SPACE; 3144 3145 VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT), 3146 "free block must have ref count of zero"); 3147 make_provisional_reference(slab, free_index); 3148 adjust_free_block_count(slab, false); 3149 3150 /* 3151 * Update the search hint so the next search will start at the array index just past the 3152 * free block we just found. 3153 */ 3154 slab->search_cursor.index = (free_index + 1); 3155 3156 *block_number_ptr = slab->start + free_index; 3157 return VDO_SUCCESS; 3158 } 3159 3160 /** 3161 * open_slab() - Prepare a slab to be allocated from. 3162 * @slab: The slab. 3163 */ 3164 static void open_slab(struct vdo_slab *slab) 3165 { 3166 reset_search_cursor(slab); 3167 if (is_slab_journal_blank(slab)) { 3168 WRITE_ONCE(slab->allocator->statistics.slabs_opened, 3169 slab->allocator->statistics.slabs_opened + 1); 3170 dirty_all_reference_blocks(slab); 3171 } else { 3172 WRITE_ONCE(slab->allocator->statistics.slabs_reopened, 3173 slab->allocator->statistics.slabs_reopened + 1); 3174 } 3175 3176 slab->allocator->open_slab = slab; 3177 } 3178 3179 3180 /* 3181 * The block allocated will have a provisional reference and the reference must be either confirmed 3182 * with a subsequent increment or vacated with a subsequent decrement via 3183 * vdo_release_block_reference(). 3184 */ 3185 int vdo_allocate_block(struct block_allocator *allocator, 3186 physical_block_number_t *block_number_ptr) 3187 { 3188 int result; 3189 3190 if (allocator->open_slab != NULL) { 3191 /* Try to allocate the next block in the currently open slab. */ 3192 result = allocate_slab_block(allocator->open_slab, block_number_ptr); 3193 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE)) 3194 return result; 3195 3196 /* Put the exhausted open slab back into the priority table. */ 3197 prioritize_slab(allocator->open_slab); 3198 } 3199 3200 /* Remove the highest priority slab from the priority table and make it the open slab. */ 3201 open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs), 3202 struct vdo_slab, allocq_entry)); 3203 3204 /* 3205 * Try allocating again. If we're out of space immediately after opening a slab, then every 3206 * slab must be fully allocated. 3207 */ 3208 return allocate_slab_block(allocator->open_slab, block_number_ptr); 3209 } 3210 3211 /** 3212 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab. 3213 * @allocator: The block_allocator on which to wait. 3214 * @waiter: The waiter. 3215 * 3216 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and 3217 * some other error otherwise. 3218 */ 3219 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator, 3220 struct vdo_waiter *waiter) 3221 { 3222 if (vdo_is_read_only(allocator->depot->vdo)) 3223 return VDO_READ_ONLY; 3224 3225 if (vdo_is_state_quiescent(&allocator->scrubber.admin_state)) 3226 return VDO_NO_SPACE; 3227 3228 vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter); 3229 return VDO_SUCCESS; 3230 } 3231 3232 /** 3233 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab 3234 * journal entry and then updating the reference counter. 3235 * 3236 * @data_vio: The data_vio for which to add the entry. 3237 * @updater: Which of the data_vio's reference updaters is being submitted. 3238 */ 3239 void vdo_modify_reference_count(struct vdo_completion *completion, 3240 struct reference_updater *updater) 3241 { 3242 struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn); 3243 3244 if (!is_slab_open(slab)) { 3245 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE); 3246 return; 3247 } 3248 3249 if (vdo_is_read_only(completion->vdo)) { 3250 vdo_continue_completion(completion, VDO_READ_ONLY); 3251 return; 3252 } 3253 3254 vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter); 3255 if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal)) 3256 register_slab_for_scrubbing(slab, true); 3257 3258 add_entries(&slab->journal); 3259 } 3260 3261 /* Release an unused provisional reference. */ 3262 int vdo_release_block_reference(struct block_allocator *allocator, 3263 physical_block_number_t pbn) 3264 { 3265 struct reference_updater updater; 3266 3267 if (pbn == VDO_ZERO_BLOCK) 3268 return VDO_SUCCESS; 3269 3270 updater = (struct reference_updater) { 3271 .operation = VDO_JOURNAL_DATA_REMAPPING, 3272 .increment = false, 3273 .zpbn = { 3274 .pbn = pbn, 3275 }, 3276 }; 3277 3278 return adjust_reference_count(vdo_get_slab(allocator->depot, pbn), 3279 &updater, NULL); 3280 } 3281 3282 /* 3283 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as 3284 * the primary key and the 'emptiness' field as the secondary key. 3285 * 3286 * Slabs need to be pushed onto the rings in the same order they are to be popped off. Popping 3287 * should always get the most empty first, so pushing should be from most empty to least empty. 3288 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements 3289 * before larger ones. 3290 */ 3291 static bool slab_status_is_less_than(const void *item1, const void *item2) 3292 { 3293 const struct slab_status *info1 = item1; 3294 const struct slab_status *info2 = item2; 3295 3296 if (info1->is_clean != info2->is_clean) 3297 return info1->is_clean; 3298 if (info1->emptiness != info2->emptiness) 3299 return info1->emptiness > info2->emptiness; 3300 return info1->slab_number < info2->slab_number; 3301 } 3302 3303 static void swap_slab_statuses(void *item1, void *item2) 3304 { 3305 struct slab_status *info1 = item1; 3306 struct slab_status *info2 = item2; 3307 3308 swap(*info1, *info2); 3309 } 3310 3311 static const struct min_heap_callbacks slab_status_min_heap = { 3312 .elem_size = sizeof(struct slab_status), 3313 .less = slab_status_is_less_than, 3314 .swp = swap_slab_statuses, 3315 }; 3316 3317 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */ 3318 static void slab_action_callback(struct vdo_completion *completion) 3319 { 3320 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3321 struct slab_actor *actor = &allocator->slab_actor; 3322 3323 if (--actor->slab_action_count == 0) { 3324 actor->callback(completion); 3325 return; 3326 } 3327 3328 vdo_reset_completion(completion); 3329 } 3330 3331 /* Preserve the error from part of an action and continue. */ 3332 static void handle_operation_error(struct vdo_completion *completion) 3333 { 3334 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3335 3336 if (allocator->state.waiter != NULL) 3337 vdo_set_completion_result(allocator->state.waiter, completion->result); 3338 completion->callback(completion); 3339 } 3340 3341 /* Perform an action on each of an allocator's slabs in parallel. */ 3342 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback) 3343 { 3344 struct slab_iterator iterator; 3345 3346 vdo_prepare_completion(&allocator->completion, slab_action_callback, 3347 handle_operation_error, allocator->thread_id, NULL); 3348 allocator->completion.requeue = false; 3349 3350 /* 3351 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so 3352 * clear it. 3353 */ 3354 allocator->open_slab = NULL; 3355 3356 /* Ensure that we don't finish before we're done starting. */ 3357 allocator->slab_actor = (struct slab_actor) { 3358 .slab_action_count = 1, 3359 .callback = callback, 3360 }; 3361 3362 iterator = get_slab_iterator(allocator); 3363 while (iterator.next != NULL) { 3364 const struct admin_state_code *operation = 3365 vdo_get_admin_state_code(&allocator->state); 3366 struct vdo_slab *slab = next_slab(&iterator); 3367 3368 list_del_init(&slab->allocq_entry); 3369 allocator->slab_actor.slab_action_count++; 3370 vdo_start_operation_with_waiter(&slab->state, operation, 3371 &allocator->completion, 3372 initiate_slab_action); 3373 } 3374 3375 slab_action_callback(&allocator->completion); 3376 } 3377 3378 static void finish_loading_allocator(struct vdo_completion *completion) 3379 { 3380 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3381 const struct admin_state_code *operation = 3382 vdo_get_admin_state_code(&allocator->state); 3383 3384 if (allocator->eraser != NULL) 3385 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser)); 3386 3387 if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) { 3388 void *context = 3389 vdo_get_current_action_context(allocator->depot->action_manager); 3390 3391 vdo_replay_into_slab_journals(allocator, context); 3392 return; 3393 } 3394 3395 vdo_finish_loading(&allocator->state); 3396 } 3397 3398 static void erase_next_slab_journal(struct block_allocator *allocator); 3399 3400 static void copy_callback(int read_err, unsigned long write_err, void *context) 3401 { 3402 struct block_allocator *allocator = context; 3403 int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO); 3404 3405 if (result != VDO_SUCCESS) { 3406 vdo_fail_completion(&allocator->completion, result); 3407 return; 3408 } 3409 3410 erase_next_slab_journal(allocator); 3411 } 3412 3413 /* erase_next_slab_journal() - Erase the next slab journal. */ 3414 static void erase_next_slab_journal(struct block_allocator *allocator) 3415 { 3416 struct vdo_slab *slab; 3417 physical_block_number_t pbn; 3418 struct dm_io_region regions[1]; 3419 struct slab_depot *depot = allocator->depot; 3420 block_count_t blocks = depot->slab_config.slab_journal_blocks; 3421 3422 if (allocator->slabs_to_erase.next == NULL) { 3423 vdo_finish_completion(&allocator->completion); 3424 return; 3425 } 3426 3427 slab = next_slab(&allocator->slabs_to_erase); 3428 pbn = slab->journal_origin - depot->vdo->geometry.bio_offset; 3429 regions[0] = (struct dm_io_region) { 3430 .bdev = vdo_get_backing_device(depot->vdo), 3431 .sector = pbn * VDO_SECTORS_PER_BLOCK, 3432 .count = blocks * VDO_SECTORS_PER_BLOCK, 3433 }; 3434 dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator); 3435 } 3436 3437 /* Implements vdo_admin_initiator_fn. */ 3438 static void initiate_load(struct admin_state *state) 3439 { 3440 struct block_allocator *allocator = 3441 container_of(state, struct block_allocator, state); 3442 const struct admin_state_code *operation = vdo_get_admin_state_code(state); 3443 3444 if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) { 3445 /* 3446 * Must requeue because the kcopyd client cannot be freed in the same stack frame 3447 * as the kcopyd callback, lest it deadlock. 3448 */ 3449 vdo_prepare_completion_for_requeue(&allocator->completion, 3450 finish_loading_allocator, 3451 handle_operation_error, 3452 allocator->thread_id, NULL); 3453 allocator->eraser = dm_kcopyd_client_create(NULL); 3454 if (IS_ERR(allocator->eraser)) { 3455 vdo_fail_completion(&allocator->completion, 3456 PTR_ERR(allocator->eraser)); 3457 allocator->eraser = NULL; 3458 return; 3459 } 3460 allocator->slabs_to_erase = get_slab_iterator(allocator); 3461 3462 erase_next_slab_journal(allocator); 3463 return; 3464 } 3465 3466 apply_to_slabs(allocator, finish_loading_allocator); 3467 } 3468 3469 /** 3470 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have 3471 * been recovered from the recovery journal. 3472 * @completion The allocator completion 3473 */ 3474 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion) 3475 { 3476 struct block_allocator *allocator = vdo_as_block_allocator(completion); 3477 3478 vdo_finish_loading_with_result(&allocator->state, completion->result); 3479 } 3480 3481 static int get_slab_statuses(struct block_allocator *allocator, 3482 struct slab_status **statuses_ptr) 3483 { 3484 int result; 3485 struct slab_status *statuses; 3486 struct slab_iterator iterator = get_slab_iterator(allocator); 3487 3488 result = vdo_allocate(allocator->slab_count, struct slab_status, __func__, 3489 &statuses); 3490 if (result != VDO_SUCCESS) 3491 return result; 3492 3493 *statuses_ptr = statuses; 3494 3495 while (iterator.next != NULL) { 3496 slab_count_t slab_number = next_slab(&iterator)->slab_number; 3497 3498 *statuses++ = (struct slab_status) { 3499 .slab_number = slab_number, 3500 .is_clean = !allocator->summary_entries[slab_number].is_dirty, 3501 .emptiness = allocator->summary_entries[slab_number].fullness_hint, 3502 }; 3503 } 3504 3505 return VDO_SUCCESS; 3506 } 3507 3508 /* Prepare slabs for allocation or scrubbing. */ 3509 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator) 3510 { 3511 struct slab_status current_slab_status; 3512 struct min_heap heap; 3513 int result; 3514 struct slab_status *slab_statuses; 3515 struct slab_depot *depot = allocator->depot; 3516 3517 WRITE_ONCE(allocator->allocated_blocks, 3518 allocator->slab_count * depot->slab_config.data_blocks); 3519 result = get_slab_statuses(allocator, &slab_statuses); 3520 if (result != VDO_SUCCESS) 3521 return result; 3522 3523 /* Sort the slabs by cleanliness, then by emptiness hint. */ 3524 heap = (struct min_heap) { 3525 .data = slab_statuses, 3526 .nr = allocator->slab_count, 3527 .size = allocator->slab_count, 3528 }; 3529 min_heapify_all(&heap, &slab_status_min_heap); 3530 3531 while (heap.nr > 0) { 3532 bool high_priority; 3533 struct vdo_slab *slab; 3534 struct slab_journal *journal; 3535 3536 current_slab_status = slab_statuses[0]; 3537 min_heap_pop(&heap, &slab_status_min_heap); 3538 slab = depot->slabs[current_slab_status.slab_number]; 3539 3540 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) || 3541 (!allocator->summary_entries[slab->slab_number].load_ref_counts && 3542 current_slab_status.is_clean)) { 3543 queue_slab(slab); 3544 continue; 3545 } 3546 3547 slab->status = VDO_SLAB_REQUIRES_SCRUBBING; 3548 journal = &slab->journal; 3549 high_priority = ((current_slab_status.is_clean && 3550 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) || 3551 (journal_length(journal) >= journal->scrubbing_threshold)); 3552 register_slab_for_scrubbing(slab, high_priority); 3553 } 3554 3555 vdo_free(slab_statuses); 3556 return VDO_SUCCESS; 3557 } 3558 3559 static const char *status_to_string(enum slab_rebuild_status status) 3560 { 3561 switch (status) { 3562 case VDO_SLAB_REBUILT: 3563 return "REBUILT"; 3564 case VDO_SLAB_REQUIRES_SCRUBBING: 3565 return "SCRUBBING"; 3566 case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING: 3567 return "PRIORITY_SCRUBBING"; 3568 case VDO_SLAB_REBUILDING: 3569 return "REBUILDING"; 3570 case VDO_SLAB_REPLAYING: 3571 return "REPLAYING"; 3572 default: 3573 return "UNKNOWN"; 3574 } 3575 } 3576 3577 void vdo_dump_block_allocator(const struct block_allocator *allocator) 3578 { 3579 unsigned int pause_counter = 0; 3580 struct slab_iterator iterator = get_slab_iterator(allocator); 3581 const struct slab_scrubber *scrubber = &allocator->scrubber; 3582 3583 vdo_log_info("block_allocator zone %u", allocator->zone_number); 3584 while (iterator.next != NULL) { 3585 struct vdo_slab *slab = next_slab(&iterator); 3586 struct slab_journal *journal = &slab->journal; 3587 3588 if (slab->reference_blocks != NULL) { 3589 /* Terse because there are a lot of slabs to dump and syslog is lossy. */ 3590 vdo_log_info("slab %u: P%u, %llu free", slab->slab_number, 3591 slab->priority, 3592 (unsigned long long) slab->free_blocks); 3593 } else { 3594 vdo_log_info("slab %u: status %s", slab->slab_number, 3595 status_to_string(slab->status)); 3596 } 3597 3598 vdo_log_info(" slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s", 3599 vdo_waitq_num_waiters(&journal->entry_waiters), 3600 vdo_bool_to_string(journal->waiting_to_commit), 3601 vdo_bool_to_string(journal->updating_slab_summary), 3602 (unsigned long long) journal->head, 3603 (unsigned long long) journal->unreapable, 3604 (unsigned long long) journal->tail, 3605 (unsigned long long) journal->next_commit, 3606 (unsigned long long) journal->summarized, 3607 (unsigned long long) journal->last_summarized, 3608 (unsigned long long) journal->recovery_lock, 3609 vdo_bool_to_string(journal->recovery_lock != 0)); 3610 /* 3611 * Given the frequency with which the locks are just a tiny bit off, it might be 3612 * worth dumping all the locks, but that might be too much logging. 3613 */ 3614 3615 if (slab->counters != NULL) { 3616 /* Terse because there are a lot of slabs to dump and syslog is lossy. */ 3617 vdo_log_info(" slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)", 3618 slab->free_blocks, slab->block_count, 3619 slab->reference_block_count, 3620 vdo_waitq_num_waiters(&slab->dirty_blocks), 3621 slab->active_count, 3622 (unsigned long long) slab->slab_journal_point.sequence_number, 3623 slab->slab_journal_point.entry_count); 3624 } else { 3625 vdo_log_info(" no counters"); 3626 } 3627 3628 /* 3629 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number, 3630 * allowing the kernel log a chance to be flushed instead of being overrun. 3631 */ 3632 if (pause_counter++ == 31) { 3633 pause_counter = 0; 3634 vdo_pause_for_logger(); 3635 } 3636 } 3637 3638 vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s", 3639 READ_ONCE(scrubber->slab_count), 3640 vdo_waitq_num_waiters(&scrubber->waiters), 3641 vdo_get_admin_state_code(&scrubber->admin_state)->name, 3642 scrubber->high_priority_only ? ", high_priority_only " : ""); 3643 } 3644 3645 static void free_slab(struct vdo_slab *slab) 3646 { 3647 if (slab == NULL) 3648 return; 3649 3650 list_del(&slab->allocq_entry); 3651 vdo_free(vdo_forget(slab->journal.block)); 3652 vdo_free(vdo_forget(slab->journal.locks)); 3653 vdo_free(vdo_forget(slab->counters)); 3654 vdo_free(vdo_forget(slab->reference_blocks)); 3655 vdo_free(slab); 3656 } 3657 3658 static int initialize_slab_journal(struct vdo_slab *slab) 3659 { 3660 struct slab_journal *journal = &slab->journal; 3661 const struct slab_config *slab_config = &slab->allocator->depot->slab_config; 3662 int result; 3663 3664 result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock, 3665 __func__, &journal->locks); 3666 if (result != VDO_SUCCESS) 3667 return result; 3668 3669 result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block", 3670 (char **) &journal->block); 3671 if (result != VDO_SUCCESS) 3672 return result; 3673 3674 journal->slab = slab; 3675 journal->size = slab_config->slab_journal_blocks; 3676 journal->flushing_threshold = slab_config->slab_journal_flushing_threshold; 3677 journal->blocking_threshold = slab_config->slab_journal_blocking_threshold; 3678 journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold; 3679 journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK; 3680 journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK; 3681 journal->events = &slab->allocator->slab_journal_statistics; 3682 journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal; 3683 journal->tail = 1; 3684 journal->head = 1; 3685 3686 journal->flushing_deadline = journal->flushing_threshold; 3687 /* 3688 * Set there to be some time between the deadline and the blocking threshold, so that 3689 * hopefully all are done before blocking. 3690 */ 3691 if ((journal->blocking_threshold - journal->flushing_threshold) > 5) 3692 journal->flushing_deadline = journal->blocking_threshold - 5; 3693 3694 journal->slab_summary_waiter.callback = release_journal_locks; 3695 3696 INIT_LIST_HEAD(&journal->dirty_entry); 3697 INIT_LIST_HEAD(&journal->uncommitted_blocks); 3698 3699 journal->tail_header.nonce = slab->allocator->nonce; 3700 journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL; 3701 initialize_journal_state(journal); 3702 return VDO_SUCCESS; 3703 } 3704 3705 /** 3706 * make_slab() - Construct a new, empty slab. 3707 * @slab_origin: The physical block number within the block allocator partition of the first block 3708 * in the slab. 3709 * @allocator: The block allocator to which the slab belongs. 3710 * @slab_number: The slab number of the slab. 3711 * @is_new: true if this slab is being allocated as part of a resize. 3712 * @slab_ptr: A pointer to receive the new slab. 3713 * 3714 * Return: VDO_SUCCESS or an error code. 3715 */ 3716 static int __must_check make_slab(physical_block_number_t slab_origin, 3717 struct block_allocator *allocator, 3718 slab_count_t slab_number, bool is_new, 3719 struct vdo_slab **slab_ptr) 3720 { 3721 const struct slab_config *slab_config = &allocator->depot->slab_config; 3722 struct vdo_slab *slab; 3723 int result; 3724 3725 result = vdo_allocate(1, struct vdo_slab, __func__, &slab); 3726 if (result != VDO_SUCCESS) 3727 return result; 3728 3729 *slab = (struct vdo_slab) { 3730 .allocator = allocator, 3731 .start = slab_origin, 3732 .end = slab_origin + slab_config->slab_blocks, 3733 .slab_number = slab_number, 3734 .ref_counts_origin = slab_origin + slab_config->data_blocks, 3735 .journal_origin = 3736 vdo_get_slab_journal_start_block(slab_config, slab_origin), 3737 .block_count = slab_config->data_blocks, 3738 .free_blocks = slab_config->data_blocks, 3739 .reference_block_count = 3740 vdo_get_saved_reference_count_size(slab_config->data_blocks), 3741 }; 3742 INIT_LIST_HEAD(&slab->allocq_entry); 3743 3744 result = initialize_slab_journal(slab); 3745 if (result != VDO_SUCCESS) { 3746 free_slab(slab); 3747 return result; 3748 } 3749 3750 if (is_new) { 3751 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW); 3752 result = allocate_slab_counters(slab); 3753 if (result != VDO_SUCCESS) { 3754 free_slab(slab); 3755 return result; 3756 } 3757 } else { 3758 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 3759 } 3760 3761 *slab_ptr = slab; 3762 return VDO_SUCCESS; 3763 } 3764 3765 /** 3766 * allocate_slabs() - Allocate a new slab pointer array. 3767 * @depot: The depot. 3768 * @slab_count: The number of slabs the depot should have in the new array. 3769 * 3770 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as 3771 * needed. The newly allocated slabs will not be distributed for use by the block allocators. 3772 * 3773 * Return: VDO_SUCCESS or an error code. 3774 */ 3775 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count) 3776 { 3777 block_count_t slab_size; 3778 bool resizing = false; 3779 physical_block_number_t slab_origin; 3780 int result; 3781 3782 result = vdo_allocate(slab_count, struct vdo_slab *, 3783 "slab pointer array", &depot->new_slabs); 3784 if (result != VDO_SUCCESS) 3785 return result; 3786 3787 if (depot->slabs != NULL) { 3788 memcpy(depot->new_slabs, depot->slabs, 3789 depot->slab_count * sizeof(struct vdo_slab *)); 3790 resizing = true; 3791 } 3792 3793 slab_size = depot->slab_config.slab_blocks; 3794 slab_origin = depot->first_block + (depot->slab_count * slab_size); 3795 3796 for (depot->new_slab_count = depot->slab_count; 3797 depot->new_slab_count < slab_count; 3798 depot->new_slab_count++, slab_origin += slab_size) { 3799 struct block_allocator *allocator = 3800 &depot->allocators[depot->new_slab_count % depot->zone_count]; 3801 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count]; 3802 3803 result = make_slab(slab_origin, allocator, depot->new_slab_count, 3804 resizing, slab_ptr); 3805 if (result != VDO_SUCCESS) 3806 return result; 3807 } 3808 3809 return VDO_SUCCESS; 3810 } 3811 3812 /** 3813 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed. 3814 * @depot: The depot. 3815 */ 3816 void vdo_abandon_new_slabs(struct slab_depot *depot) 3817 { 3818 slab_count_t i; 3819 3820 if (depot->new_slabs == NULL) 3821 return; 3822 3823 for (i = depot->slab_count; i < depot->new_slab_count; i++) 3824 free_slab(vdo_forget(depot->new_slabs[i])); 3825 depot->new_slab_count = 0; 3826 depot->new_size = 0; 3827 vdo_free(vdo_forget(depot->new_slabs)); 3828 } 3829 3830 /** 3831 * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates. 3832 * 3833 * Implements vdo_zone_thread_getter_fn. 3834 */ 3835 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number) 3836 { 3837 return ((struct slab_depot *) context)->allocators[zone_number].thread_id; 3838 } 3839 3840 /** 3841 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock 3842 * it may hold on a specified recovery journal block. 3843 * @journal: The slab journal. 3844 * @recovery_lock: The sequence number of the recovery journal block whose locks should be 3845 * released. 3846 * 3847 * Return: true if the journal does hold a lock on the specified block (which it will release). 3848 */ 3849 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal, 3850 sequence_number_t recovery_lock) 3851 { 3852 if (recovery_lock > journal->recovery_lock) { 3853 VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock), 3854 "slab journal recovery lock is not older than the recovery journal head"); 3855 return false; 3856 } 3857 3858 if ((recovery_lock < journal->recovery_lock) || 3859 vdo_is_read_only(journal->slab->allocator->depot->vdo)) 3860 return false; 3861 3862 /* All locks are held by the block which is in progress; write it. */ 3863 commit_tail(journal); 3864 return true; 3865 } 3866 3867 /* 3868 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot 3869 * is seeking to release. 3870 * 3871 * Implements vdo_zone_action_fn. 3872 */ 3873 static void release_tail_block_locks(void *context, zone_count_t zone_number, 3874 struct vdo_completion *parent) 3875 { 3876 struct slab_journal *journal, *tmp; 3877 struct slab_depot *depot = context; 3878 struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals; 3879 3880 list_for_each_entry_safe(journal, tmp, list, dirty_entry) { 3881 if (!release_recovery_journal_lock(journal, 3882 depot->active_release_request)) 3883 break; 3884 } 3885 3886 vdo_finish_completion(parent); 3887 } 3888 3889 /** 3890 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks. 3891 * 3892 * Implements vdo_action_preamble_fn. 3893 */ 3894 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent) 3895 { 3896 struct slab_depot *depot = context; 3897 3898 depot->active_release_request = depot->new_release_request; 3899 vdo_finish_completion(parent); 3900 } 3901 3902 /** 3903 * schedule_tail_block_commit() - Schedule a tail block commit if necessary. 3904 * 3905 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the 3906 * depot's action manager. 3907 * 3908 * Implements vdo_action_scheduler_fn. 3909 */ 3910 static bool schedule_tail_block_commit(void *context) 3911 { 3912 struct slab_depot *depot = context; 3913 3914 if (depot->new_release_request == depot->active_release_request) 3915 return false; 3916 3917 return vdo_schedule_action(depot->action_manager, 3918 prepare_for_tail_block_commit, 3919 release_tail_block_locks, 3920 NULL, NULL); 3921 } 3922 3923 /** 3924 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber. 3925 * @allocator: The allocator being initialized 3926 * 3927 * Return: VDO_SUCCESS or an error. 3928 */ 3929 static int initialize_slab_scrubber(struct block_allocator *allocator) 3930 { 3931 struct slab_scrubber *scrubber = &allocator->scrubber; 3932 block_count_t slab_journal_size = 3933 allocator->depot->slab_config.slab_journal_blocks; 3934 char *journal_data; 3935 int result; 3936 3937 result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size, 3938 char, __func__, &journal_data); 3939 if (result != VDO_SUCCESS) 3940 return result; 3941 3942 result = allocate_vio_components(allocator->completion.vdo, 3943 VIO_TYPE_SLAB_JOURNAL, 3944 VIO_PRIORITY_METADATA, 3945 allocator, slab_journal_size, 3946 journal_data, &scrubber->vio); 3947 if (result != VDO_SUCCESS) { 3948 vdo_free(journal_data); 3949 return result; 3950 } 3951 3952 INIT_LIST_HEAD(&scrubber->high_priority_slabs); 3953 INIT_LIST_HEAD(&scrubber->slabs); 3954 vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED); 3955 return VDO_SUCCESS; 3956 } 3957 3958 /** 3959 * initialize_slab_summary_block() - Initialize a slab_summary_block. 3960 * @allocator: The allocator which owns the block. 3961 * @index: The index of this block in its zone's summary. 3962 * 3963 * Return: VDO_SUCCESS or an error. 3964 */ 3965 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator, 3966 block_count_t index) 3967 { 3968 struct slab_summary_block *block = &allocator->summary_blocks[index]; 3969 int result; 3970 3971 result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries); 3972 if (result != VDO_SUCCESS) 3973 return result; 3974 3975 result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY, 3976 VIO_PRIORITY_METADATA, NULL, 1, 3977 block->outgoing_entries, &block->vio); 3978 if (result != VDO_SUCCESS) 3979 return result; 3980 3981 block->allocator = allocator; 3982 block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index]; 3983 block->index = index; 3984 return VDO_SUCCESS; 3985 } 3986 3987 static int __must_check initialize_block_allocator(struct slab_depot *depot, 3988 zone_count_t zone) 3989 { 3990 int result; 3991 block_count_t i; 3992 struct block_allocator *allocator = &depot->allocators[zone]; 3993 struct vdo *vdo = depot->vdo; 3994 block_count_t max_free_blocks = depot->slab_config.data_blocks; 3995 unsigned int max_priority = (2 + ilog2(max_free_blocks)); 3996 3997 *allocator = (struct block_allocator) { 3998 .depot = depot, 3999 .zone_number = zone, 4000 .thread_id = vdo->thread_config.physical_threads[zone], 4001 .nonce = vdo->states.vdo.nonce, 4002 }; 4003 4004 INIT_LIST_HEAD(&allocator->dirty_slab_journals); 4005 vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 4006 result = vdo_register_read_only_listener(vdo, allocator, 4007 notify_block_allocator_of_read_only_mode, 4008 allocator->thread_id); 4009 if (result != VDO_SUCCESS) 4010 return result; 4011 4012 vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION); 4013 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, allocator->thread_id, 4014 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA, 4015 allocator, &allocator->vio_pool); 4016 if (result != VDO_SUCCESS) 4017 return result; 4018 4019 result = initialize_slab_scrubber(allocator); 4020 if (result != VDO_SUCCESS) 4021 return result; 4022 4023 result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs); 4024 if (result != VDO_SUCCESS) 4025 return result; 4026 4027 result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE, 4028 struct slab_summary_block, __func__, 4029 &allocator->summary_blocks); 4030 if (result != VDO_SUCCESS) 4031 return result; 4032 4033 vdo_set_admin_state_code(&allocator->summary_state, 4034 VDO_ADMIN_STATE_NORMAL_OPERATION); 4035 allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone); 4036 4037 /* Initialize each summary block. */ 4038 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) { 4039 result = initialize_slab_summary_block(allocator, i); 4040 if (result != VDO_SUCCESS) 4041 return result; 4042 } 4043 4044 /* 4045 * Performing well atop thin provisioned storage requires either that VDO discards freed 4046 * blocks, or that the block allocator try to use slabs that already have allocated blocks 4047 * in preference to slabs that have never been opened. For reasons we have not been able to 4048 * fully understand, some SSD machines have been have been very sensitive (50% reduction in 4049 * test throughput) to very slight differences in the timing and locality of block 4050 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be 4051 * ideal for the story, but anything less than a very high threshold (max_priority - 1) 4052 * hurts on these machines. 4053 * 4054 * This sets the free block threshold for preferring to open an unopened slab to the binary 4055 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate 4056 * to about half the slab size. 4057 */ 4058 allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4)); 4059 4060 return VDO_SUCCESS; 4061 } 4062 4063 static int allocate_components(struct slab_depot *depot, 4064 struct partition *summary_partition) 4065 { 4066 int result; 4067 zone_count_t zone; 4068 slab_count_t slab_count; 4069 u8 hint; 4070 u32 i; 4071 const struct thread_config *thread_config = &depot->vdo->thread_config; 4072 4073 result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id, 4074 thread_config->journal_thread, depot, 4075 schedule_tail_block_commit, 4076 depot->vdo, &depot->action_manager); 4077 if (result != VDO_SUCCESS) 4078 return result; 4079 4080 depot->origin = depot->first_block; 4081 4082 /* block size must be a multiple of entry size */ 4083 BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0); 4084 4085 depot->summary_origin = summary_partition->offset; 4086 depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift); 4087 result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES, 4088 struct slab_summary_entry, __func__, 4089 &depot->summary_entries); 4090 if (result != VDO_SUCCESS) 4091 return result; 4092 4093 4094 /* Initialize all the entries. */ 4095 hint = compute_fullness_hint(depot, depot->slab_config.data_blocks); 4096 for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) { 4097 /* 4098 * This default tail block offset must be reflected in 4099 * slabJournal.c::read_slab_journal_tail(). 4100 */ 4101 depot->summary_entries[i] = (struct slab_summary_entry) { 4102 .tail_block_offset = 0, 4103 .fullness_hint = hint, 4104 .load_ref_counts = false, 4105 .is_dirty = false, 4106 }; 4107 } 4108 4109 slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block, 4110 depot->slab_size_shift); 4111 if (thread_config->physical_zone_count > slab_count) { 4112 return vdo_log_error_strerror(VDO_BAD_CONFIGURATION, 4113 "%u physical zones exceeds slab count %u", 4114 thread_config->physical_zone_count, 4115 slab_count); 4116 } 4117 4118 /* Initialize the block allocators. */ 4119 for (zone = 0; zone < depot->zone_count; zone++) { 4120 result = initialize_block_allocator(depot, zone); 4121 if (result != VDO_SUCCESS) 4122 return result; 4123 } 4124 4125 /* Allocate slabs. */ 4126 result = allocate_slabs(depot, slab_count); 4127 if (result != VDO_SUCCESS) 4128 return result; 4129 4130 /* Use the new slabs. */ 4131 for (i = depot->slab_count; i < depot->new_slab_count; i++) { 4132 struct vdo_slab *slab = depot->new_slabs[i]; 4133 4134 register_slab_with_allocator(slab->allocator, slab); 4135 WRITE_ONCE(depot->slab_count, depot->slab_count + 1); 4136 } 4137 4138 depot->slabs = depot->new_slabs; 4139 depot->new_slabs = NULL; 4140 depot->new_slab_count = 0; 4141 4142 return VDO_SUCCESS; 4143 } 4144 4145 /** 4146 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super 4147 * block. 4148 * @state: The slab depot state from the super block. 4149 * @vdo: The VDO which will own the depot. 4150 * @summary_partition: The partition which holds the slab summary. 4151 * @depot_ptr: A pointer to hold the depot. 4152 * 4153 * Return: A success or error code. 4154 */ 4155 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo, 4156 struct partition *summary_partition, 4157 struct slab_depot **depot_ptr) 4158 { 4159 unsigned int slab_size_shift; 4160 struct slab_depot *depot; 4161 int result; 4162 4163 /* 4164 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift 4165 * requires that the slab size be a power of two. 4166 */ 4167 block_count_t slab_size = state.slab_config.slab_blocks; 4168 4169 if (!is_power_of_2(slab_size)) { 4170 return vdo_log_error_strerror(UDS_INVALID_ARGUMENT, 4171 "slab size must be a power of two"); 4172 } 4173 slab_size_shift = ilog2(slab_size); 4174 4175 result = vdo_allocate_extended(struct slab_depot, 4176 vdo->thread_config.physical_zone_count, 4177 struct block_allocator, __func__, &depot); 4178 if (result != VDO_SUCCESS) 4179 return result; 4180 4181 depot->vdo = vdo; 4182 depot->old_zone_count = state.zone_count; 4183 depot->zone_count = vdo->thread_config.physical_zone_count; 4184 depot->slab_config = state.slab_config; 4185 depot->first_block = state.first_block; 4186 depot->last_block = state.last_block; 4187 depot->slab_size_shift = slab_size_shift; 4188 4189 result = allocate_components(depot, summary_partition); 4190 if (result != VDO_SUCCESS) { 4191 vdo_free_slab_depot(depot); 4192 return result; 4193 } 4194 4195 *depot_ptr = depot; 4196 return VDO_SUCCESS; 4197 } 4198 4199 static void uninitialize_allocator_summary(struct block_allocator *allocator) 4200 { 4201 block_count_t i; 4202 4203 if (allocator->summary_blocks == NULL) 4204 return; 4205 4206 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) { 4207 free_vio_components(&allocator->summary_blocks[i].vio); 4208 vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries)); 4209 } 4210 4211 vdo_free(vdo_forget(allocator->summary_blocks)); 4212 } 4213 4214 /** 4215 * vdo_free_slab_depot() - Destroy a slab depot. 4216 * @depot: The depot to destroy. 4217 */ 4218 void vdo_free_slab_depot(struct slab_depot *depot) 4219 { 4220 zone_count_t zone = 0; 4221 4222 if (depot == NULL) 4223 return; 4224 4225 vdo_abandon_new_slabs(depot); 4226 4227 for (zone = 0; zone < depot->zone_count; zone++) { 4228 struct block_allocator *allocator = &depot->allocators[zone]; 4229 4230 if (allocator->eraser != NULL) 4231 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser)); 4232 4233 uninitialize_allocator_summary(allocator); 4234 uninitialize_scrubber_vio(&allocator->scrubber); 4235 free_vio_pool(vdo_forget(allocator->vio_pool)); 4236 vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs)); 4237 } 4238 4239 if (depot->slabs != NULL) { 4240 slab_count_t i; 4241 4242 for (i = 0; i < depot->slab_count; i++) 4243 free_slab(vdo_forget(depot->slabs[i])); 4244 } 4245 4246 vdo_free(vdo_forget(depot->slabs)); 4247 vdo_free(vdo_forget(depot->action_manager)); 4248 vdo_free(vdo_forget(depot->summary_entries)); 4249 vdo_free(depot); 4250 } 4251 4252 /** 4253 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block. 4254 * @depot: The depot to encode. 4255 * 4256 * Return: The depot state. 4257 */ 4258 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot) 4259 { 4260 /* 4261 * If this depot is currently using 0 zones, it must have been synchronously loaded by a 4262 * tool and is now being saved. We did not load and combine the slab summary, so we still 4263 * need to do that next time we load with the old zone count rather than 0. 4264 */ 4265 struct slab_depot_state_2_0 state; 4266 zone_count_t zones_to_record = depot->zone_count; 4267 4268 if (depot->zone_count == 0) 4269 zones_to_record = depot->old_zone_count; 4270 4271 state = (struct slab_depot_state_2_0) { 4272 .slab_config = depot->slab_config, 4273 .first_block = depot->first_block, 4274 .last_block = depot->last_block, 4275 .zone_count = zones_to_record, 4276 }; 4277 4278 return state; 4279 } 4280 4281 /** 4282 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot. 4283 * 4284 * Context: This method may be called only before entering normal operation from the load thread. 4285 * 4286 * Return: VDO_SUCCESS or an error. 4287 */ 4288 int vdo_allocate_reference_counters(struct slab_depot *depot) 4289 { 4290 struct slab_iterator iterator = 4291 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1); 4292 4293 while (iterator.next != NULL) { 4294 int result = allocate_slab_counters(next_slab(&iterator)); 4295 4296 if (result != VDO_SUCCESS) 4297 return result; 4298 } 4299 4300 return VDO_SUCCESS; 4301 } 4302 4303 /** 4304 * get_slab_number() - Get the number of the slab that contains a specified block. 4305 * @depot: The slab depot. 4306 * @pbn: The physical block number. 4307 * @slab_number_ptr: A pointer to hold the slab number. 4308 * 4309 * Return: VDO_SUCCESS or an error. 4310 */ 4311 static int __must_check get_slab_number(const struct slab_depot *depot, 4312 physical_block_number_t pbn, 4313 slab_count_t *slab_number_ptr) 4314 { 4315 slab_count_t slab_number; 4316 4317 if (pbn < depot->first_block) 4318 return VDO_OUT_OF_RANGE; 4319 4320 slab_number = (pbn - depot->first_block) >> depot->slab_size_shift; 4321 if (slab_number >= depot->slab_count) 4322 return VDO_OUT_OF_RANGE; 4323 4324 *slab_number_ptr = slab_number; 4325 return VDO_SUCCESS; 4326 } 4327 4328 /** 4329 * vdo_get_slab() - Get the slab object for the slab that contains a specified block. 4330 * @depot: The slab depot. 4331 * @pbn: The physical block number. 4332 * 4333 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block. 4334 * 4335 * Return: The slab containing the block, or NULL if the block number is the zero block or 4336 * otherwise out of range. 4337 */ 4338 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot, 4339 physical_block_number_t pbn) 4340 { 4341 slab_count_t slab_number; 4342 int result; 4343 4344 if (pbn == VDO_ZERO_BLOCK) 4345 return NULL; 4346 4347 result = get_slab_number(depot, pbn, &slab_number); 4348 if (result != VDO_SUCCESS) { 4349 vdo_enter_read_only_mode(depot->vdo, result); 4350 return NULL; 4351 } 4352 4353 return depot->slabs[slab_number]; 4354 } 4355 4356 /** 4357 * vdo_get_increment_limit() - Determine how many new references a block can acquire. 4358 * @depot: The slab depot. 4359 * @pbn: The physical block number that is being queried. 4360 * 4361 * Context: This method must be called from the physical zone thread of the PBN. 4362 * 4363 * Return: The number of available references. 4364 */ 4365 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn) 4366 { 4367 struct vdo_slab *slab = vdo_get_slab(depot, pbn); 4368 vdo_refcount_t *counter_ptr = NULL; 4369 int result; 4370 4371 if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT)) 4372 return 0; 4373 4374 result = get_reference_counter(slab, pbn, &counter_ptr); 4375 if (result != VDO_SUCCESS) 4376 return 0; 4377 4378 if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT) 4379 return (MAXIMUM_REFERENCE_COUNT - 1); 4380 4381 return (MAXIMUM_REFERENCE_COUNT - *counter_ptr); 4382 } 4383 4384 /** 4385 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block. 4386 * @depot: The depot. 4387 * @pbn: The physical block number to ask about. 4388 * 4389 * Return: True if the PBN corresponds to a data block. 4390 */ 4391 bool vdo_is_physical_data_block(const struct slab_depot *depot, 4392 physical_block_number_t pbn) 4393 { 4394 slab_count_t slab_number; 4395 slab_block_number sbn; 4396 4397 return ((pbn == VDO_ZERO_BLOCK) || 4398 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) && 4399 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) == 4400 VDO_SUCCESS))); 4401 } 4402 4403 /** 4404 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all 4405 * the slabs in the depot. 4406 * @depot: The slab depot. 4407 * 4408 * This is the total number of blocks with a non-zero reference count. 4409 * 4410 * Context: This may be called from any thread. 4411 * 4412 * Return: The total number of blocks with a non-zero reference count. 4413 */ 4414 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot) 4415 { 4416 block_count_t total = 0; 4417 zone_count_t zone; 4418 4419 for (zone = 0; zone < depot->zone_count; zone++) { 4420 /* The allocators are responsible for thread safety. */ 4421 total += READ_ONCE(depot->allocators[zone].allocated_blocks); 4422 } 4423 4424 return total; 4425 } 4426 4427 /** 4428 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the 4429 * depot. 4430 * @depot: The slab depot. 4431 * 4432 * Context: This may be called from any thread. 4433 * 4434 * Return: The total number of data blocks in all slabs. 4435 */ 4436 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot) 4437 { 4438 return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks); 4439 } 4440 4441 /** 4442 * finish_combining_zones() - Clean up after saving out the combined slab summary. 4443 * @completion: The vio which was used to write the summary data. 4444 */ 4445 static void finish_combining_zones(struct vdo_completion *completion) 4446 { 4447 int result = completion->result; 4448 struct vdo_completion *parent = completion->parent; 4449 4450 free_vio(as_vio(vdo_forget(completion))); 4451 vdo_fail_completion(parent, result); 4452 } 4453 4454 static void handle_combining_error(struct vdo_completion *completion) 4455 { 4456 vio_record_metadata_io_error(as_vio(completion)); 4457 finish_combining_zones(completion); 4458 } 4459 4460 static void write_summary_endio(struct bio *bio) 4461 { 4462 struct vio *vio = bio->bi_private; 4463 struct vdo *vdo = vio->completion.vdo; 4464 4465 continue_vio_after_io(vio, finish_combining_zones, 4466 vdo->thread_config.admin_thread); 4467 } 4468 4469 /** 4470 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones, 4471 * update every zone to the correct values for every slab. 4472 * @depot: The depot whose summary entries should be combined. 4473 */ 4474 static void combine_summaries(struct slab_depot *depot) 4475 { 4476 /* 4477 * Combine all the old summary data into the portion of the buffer corresponding to the 4478 * first zone. 4479 */ 4480 zone_count_t zone = 0; 4481 struct slab_summary_entry *entries = depot->summary_entries; 4482 4483 if (depot->old_zone_count > 1) { 4484 slab_count_t entry_number; 4485 4486 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) { 4487 if (zone != 0) { 4488 memcpy(entries + entry_number, 4489 entries + (zone * MAX_VDO_SLABS) + entry_number, 4490 sizeof(struct slab_summary_entry)); 4491 } 4492 4493 zone++; 4494 if (zone == depot->old_zone_count) 4495 zone = 0; 4496 } 4497 } 4498 4499 /* Copy the combined data to each zones's region of the buffer. */ 4500 for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) { 4501 memcpy(entries + (zone * MAX_VDO_SLABS), entries, 4502 MAX_VDO_SLABS * sizeof(struct slab_summary_entry)); 4503 } 4504 } 4505 4506 /** 4507 * finish_loading_summary() - Finish loading slab summary data. 4508 * @completion: The vio which was used to read the summary data. 4509 * 4510 * Combines the slab summary data from all the previously written zones and copies the combined 4511 * summary to each partition's data region. Then writes the combined summary back out to disk. This 4512 * callback is registered in load_summary_endio(). 4513 */ 4514 static void finish_loading_summary(struct vdo_completion *completion) 4515 { 4516 struct slab_depot *depot = completion->vdo->depot; 4517 4518 /* Combine the summary from each zone so each zone is correct for all slabs. */ 4519 combine_summaries(depot); 4520 4521 /* Write the combined summary back out. */ 4522 vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin, 4523 write_summary_endio, handle_combining_error, 4524 REQ_OP_WRITE); 4525 } 4526 4527 static void load_summary_endio(struct bio *bio) 4528 { 4529 struct vio *vio = bio->bi_private; 4530 struct vdo *vdo = vio->completion.vdo; 4531 4532 continue_vio_after_io(vio, finish_loading_summary, 4533 vdo->thread_config.admin_thread); 4534 } 4535 4536 /** 4537 * load_slab_summary() - The preamble of a load operation. 4538 * 4539 * Implements vdo_action_preamble_fn. 4540 */ 4541 static void load_slab_summary(void *context, struct vdo_completion *parent) 4542 { 4543 int result; 4544 struct vio *vio; 4545 struct slab_depot *depot = context; 4546 const struct admin_state_code *operation = 4547 vdo_get_current_manager_operation(depot->action_manager); 4548 4549 result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY, 4550 VIO_PRIORITY_METADATA, parent, 4551 VDO_SLAB_SUMMARY_BLOCKS, 4552 (char *) depot->summary_entries, &vio); 4553 if (result != VDO_SUCCESS) { 4554 vdo_fail_completion(parent, result); 4555 return; 4556 } 4557 4558 if ((operation == VDO_ADMIN_STATE_FORMATTING) || 4559 (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) { 4560 finish_loading_summary(&vio->completion); 4561 return; 4562 } 4563 4564 vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio, 4565 handle_combining_error, REQ_OP_READ); 4566 } 4567 4568 /* Implements vdo_zone_action_fn. */ 4569 static void load_allocator(void *context, zone_count_t zone_number, 4570 struct vdo_completion *parent) 4571 { 4572 struct slab_depot *depot = context; 4573 4574 vdo_start_loading(&depot->allocators[zone_number].state, 4575 vdo_get_current_manager_operation(depot->action_manager), 4576 parent, initiate_load); 4577 } 4578 4579 /** 4580 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the 4581 * super_block component. 4582 * @depot: The depot to load. 4583 * @operation: The type of load to perform. 4584 * @parent: The completion to notify when the load is complete. 4585 * @context: Additional context for the load operation; may be NULL. 4586 * 4587 * This method may be called only before entering normal operation from the load thread. 4588 */ 4589 void vdo_load_slab_depot(struct slab_depot *depot, 4590 const struct admin_state_code *operation, 4591 struct vdo_completion *parent, void *context) 4592 { 4593 if (!vdo_assert_load_operation(operation, parent)) 4594 return; 4595 4596 vdo_schedule_operation_with_context(depot->action_manager, operation, 4597 load_slab_summary, load_allocator, 4598 NULL, context, parent); 4599 } 4600 4601 /* Implements vdo_zone_action_fn. */ 4602 static void prepare_to_allocate(void *context, zone_count_t zone_number, 4603 struct vdo_completion *parent) 4604 { 4605 struct slab_depot *depot = context; 4606 struct block_allocator *allocator = &depot->allocators[zone_number]; 4607 int result; 4608 4609 result = vdo_prepare_slabs_for_allocation(allocator); 4610 if (result != VDO_SUCCESS) { 4611 vdo_fail_completion(parent, result); 4612 return; 4613 } 4614 4615 scrub_slabs(allocator, parent); 4616 } 4617 4618 /** 4619 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start 4620 * allocating blocks. 4621 * @depot: The depot to prepare. 4622 * @load_type: The load type. 4623 * @parent: The completion to notify when the operation is complete. 4624 * 4625 * This method may be called only before entering normal operation from the load thread. It must be 4626 * called before allocation may proceed. 4627 */ 4628 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot, 4629 enum slab_depot_load_type load_type, 4630 struct vdo_completion *parent) 4631 { 4632 depot->load_type = load_type; 4633 atomic_set(&depot->zones_to_scrub, depot->zone_count); 4634 vdo_schedule_action(depot->action_manager, NULL, 4635 prepare_to_allocate, NULL, parent); 4636 } 4637 4638 /** 4639 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory. 4640 * @depot: The depot to update. 4641 * 4642 * This size is saved to disk as part of the super block. 4643 */ 4644 void vdo_update_slab_depot_size(struct slab_depot *depot) 4645 { 4646 depot->last_block = depot->new_last_block; 4647 } 4648 4649 /** 4650 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to 4651 * the given size. 4652 * @depot: The depot to prepare to resize. 4653 * @partition: The new depot partition 4654 * 4655 * Return: VDO_SUCCESS or an error. 4656 */ 4657 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot, 4658 const struct partition *partition) 4659 { 4660 struct slab_depot_state_2_0 new_state; 4661 int result; 4662 slab_count_t new_slab_count; 4663 4664 if ((partition->count >> depot->slab_size_shift) <= depot->slab_count) 4665 return VDO_INCREMENT_TOO_SMALL; 4666 4667 /* Generate the depot configuration for the new block count. */ 4668 VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset, 4669 "New slab depot partition doesn't change origin"); 4670 result = vdo_configure_slab_depot(partition, depot->slab_config, 4671 depot->zone_count, &new_state); 4672 if (result != VDO_SUCCESS) 4673 return result; 4674 4675 new_slab_count = vdo_compute_slab_count(depot->first_block, 4676 new_state.last_block, 4677 depot->slab_size_shift); 4678 if (new_slab_count <= depot->slab_count) 4679 return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL, 4680 "Depot can only grow"); 4681 if (new_slab_count == depot->new_slab_count) { 4682 /* Check it out, we've already got all the new slabs allocated! */ 4683 return VDO_SUCCESS; 4684 } 4685 4686 vdo_abandon_new_slabs(depot); 4687 result = allocate_slabs(depot, new_slab_count); 4688 if (result != VDO_SUCCESS) { 4689 vdo_abandon_new_slabs(depot); 4690 return result; 4691 } 4692 4693 depot->new_size = partition->count; 4694 depot->old_last_block = depot->last_block; 4695 depot->new_last_block = new_state.last_block; 4696 4697 return VDO_SUCCESS; 4698 } 4699 4700 /** 4701 * finish_registration() - Finish registering new slabs now that all of the allocators have 4702 * received their new slabs. 4703 * 4704 * Implements vdo_action_conclusion_fn. 4705 */ 4706 static int finish_registration(void *context) 4707 { 4708 struct slab_depot *depot = context; 4709 4710 WRITE_ONCE(depot->slab_count, depot->new_slab_count); 4711 vdo_free(depot->slabs); 4712 depot->slabs = depot->new_slabs; 4713 depot->new_slabs = NULL; 4714 depot->new_slab_count = 0; 4715 return VDO_SUCCESS; 4716 } 4717 4718 /* Implements vdo_zone_action_fn. */ 4719 static void register_new_slabs(void *context, zone_count_t zone_number, 4720 struct vdo_completion *parent) 4721 { 4722 struct slab_depot *depot = context; 4723 struct block_allocator *allocator = &depot->allocators[zone_number]; 4724 slab_count_t i; 4725 4726 for (i = depot->slab_count; i < depot->new_slab_count; i++) { 4727 struct vdo_slab *slab = depot->new_slabs[i]; 4728 4729 if (slab->allocator == allocator) 4730 register_slab_with_allocator(allocator, slab); 4731 } 4732 4733 vdo_finish_completion(parent); 4734 } 4735 4736 /** 4737 * vdo_use_new_slabs() - Use the new slabs allocated for resize. 4738 * @depot: The depot. 4739 * @parent: The object to notify when complete. 4740 */ 4741 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent) 4742 { 4743 VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use"); 4744 vdo_schedule_operation(depot->action_manager, 4745 VDO_ADMIN_STATE_SUSPENDED_OPERATION, 4746 NULL, register_new_slabs, 4747 finish_registration, parent); 4748 } 4749 4750 /** 4751 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is 4752 * currently working on. 4753 * @scrubber: The scrubber to stop. 4754 * @parent: The completion to notify when scrubbing has stopped. 4755 */ 4756 static void stop_scrubbing(struct block_allocator *allocator) 4757 { 4758 struct slab_scrubber *scrubber = &allocator->scrubber; 4759 4760 if (vdo_is_state_quiescent(&scrubber->admin_state)) { 4761 vdo_finish_completion(&allocator->completion); 4762 } else { 4763 vdo_start_draining(&scrubber->admin_state, 4764 VDO_ADMIN_STATE_SUSPENDING, 4765 &allocator->completion, NULL); 4766 } 4767 } 4768 4769 /* Implements vdo_admin_initiator_fn. */ 4770 static void initiate_summary_drain(struct admin_state *state) 4771 { 4772 check_summary_drain_complete(container_of(state, struct block_allocator, 4773 summary_state)); 4774 } 4775 4776 static void do_drain_step(struct vdo_completion *completion) 4777 { 4778 struct block_allocator *allocator = vdo_as_block_allocator(completion); 4779 4780 vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step, 4781 handle_operation_error, allocator->thread_id, 4782 NULL); 4783 switch (++allocator->drain_step) { 4784 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER: 4785 stop_scrubbing(allocator); 4786 return; 4787 4788 case VDO_DRAIN_ALLOCATOR_STEP_SLABS: 4789 apply_to_slabs(allocator, do_drain_step); 4790 return; 4791 4792 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY: 4793 vdo_start_draining(&allocator->summary_state, 4794 vdo_get_admin_state_code(&allocator->state), 4795 completion, initiate_summary_drain); 4796 return; 4797 4798 case VDO_DRAIN_ALLOCATOR_STEP_FINISHED: 4799 VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool), 4800 "vio pool not busy"); 4801 vdo_finish_draining_with_result(&allocator->state, completion->result); 4802 return; 4803 4804 default: 4805 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE); 4806 } 4807 } 4808 4809 /* Implements vdo_admin_initiator_fn. */ 4810 static void initiate_drain(struct admin_state *state) 4811 { 4812 struct block_allocator *allocator = 4813 container_of(state, struct block_allocator, state); 4814 4815 allocator->drain_step = VDO_DRAIN_ALLOCATOR_START; 4816 do_drain_step(&allocator->completion); 4817 } 4818 4819 /* 4820 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be 4821 * written to disk. The type of drain will be determined from the state of the allocator's depot. 4822 * 4823 * Implements vdo_zone_action_fn. 4824 */ 4825 static void drain_allocator(void *context, zone_count_t zone_number, 4826 struct vdo_completion *parent) 4827 { 4828 struct slab_depot *depot = context; 4829 4830 vdo_start_draining(&depot->allocators[zone_number].state, 4831 vdo_get_current_manager_operation(depot->action_manager), 4832 parent, initiate_drain); 4833 } 4834 4835 /** 4836 * vdo_drain_slab_depot() - Drain all slab depot I/O. 4837 * @depot: The depot to drain. 4838 * @operation: The drain operation (flush, rebuild, suspend, or save). 4839 * @parent: The completion to finish when the drain is complete. 4840 * 4841 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending, 4842 * the depot will be left in a suspended state. 4843 */ 4844 void vdo_drain_slab_depot(struct slab_depot *depot, 4845 const struct admin_state_code *operation, 4846 struct vdo_completion *parent) 4847 { 4848 vdo_schedule_operation(depot->action_manager, operation, 4849 NULL, drain_allocator, NULL, parent); 4850 } 4851 4852 /** 4853 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped. 4854 * @allocator: The allocator being resumed. 4855 */ 4856 static void resume_scrubbing(struct block_allocator *allocator) 4857 { 4858 int result; 4859 struct slab_scrubber *scrubber = &allocator->scrubber; 4860 4861 if (!has_slabs_to_scrub(scrubber)) { 4862 vdo_finish_completion(&allocator->completion); 4863 return; 4864 } 4865 4866 result = vdo_resume_if_quiescent(&scrubber->admin_state); 4867 if (result != VDO_SUCCESS) { 4868 vdo_fail_completion(&allocator->completion, result); 4869 return; 4870 } 4871 4872 scrub_next_slab(scrubber); 4873 vdo_finish_completion(&allocator->completion); 4874 } 4875 4876 static void do_resume_step(struct vdo_completion *completion) 4877 { 4878 struct block_allocator *allocator = vdo_as_block_allocator(completion); 4879 4880 vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step, 4881 handle_operation_error, 4882 allocator->thread_id, NULL); 4883 switch (--allocator->drain_step) { 4884 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY: 4885 vdo_fail_completion(completion, 4886 vdo_resume_if_quiescent(&allocator->summary_state)); 4887 return; 4888 4889 case VDO_DRAIN_ALLOCATOR_STEP_SLABS: 4890 apply_to_slabs(allocator, do_resume_step); 4891 return; 4892 4893 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER: 4894 resume_scrubbing(allocator); 4895 return; 4896 4897 case VDO_DRAIN_ALLOCATOR_START: 4898 vdo_finish_resuming_with_result(&allocator->state, completion->result); 4899 return; 4900 4901 default: 4902 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE); 4903 } 4904 } 4905 4906 /* Implements vdo_admin_initiator_fn. */ 4907 static void initiate_resume(struct admin_state *state) 4908 { 4909 struct block_allocator *allocator = 4910 container_of(state, struct block_allocator, state); 4911 4912 allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED; 4913 do_resume_step(&allocator->completion); 4914 } 4915 4916 /* Implements vdo_zone_action_fn. */ 4917 static void resume_allocator(void *context, zone_count_t zone_number, 4918 struct vdo_completion *parent) 4919 { 4920 struct slab_depot *depot = context; 4921 4922 vdo_start_resuming(&depot->allocators[zone_number].state, 4923 vdo_get_current_manager_operation(depot->action_manager), 4924 parent, initiate_resume); 4925 } 4926 4927 /** 4928 * vdo_resume_slab_depot() - Resume a suspended slab depot. 4929 * @depot: The depot to resume. 4930 * @parent: The completion to finish when the depot has resumed. 4931 */ 4932 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent) 4933 { 4934 if (vdo_is_read_only(depot->vdo)) { 4935 vdo_continue_completion(parent, VDO_READ_ONLY); 4936 return; 4937 } 4938 4939 vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING, 4940 NULL, resume_allocator, NULL, parent); 4941 } 4942 4943 /** 4944 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a 4945 * given recovery journal block. 4946 * @depot: The depot. 4947 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be 4948 * released. 4949 * 4950 * Context: This method must be called from the journal zone thread. 4951 */ 4952 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot, 4953 sequence_number_t recovery_block_number) 4954 { 4955 if (depot == NULL) 4956 return; 4957 4958 depot->new_release_request = recovery_block_number; 4959 vdo_schedule_default_action(depot->action_manager); 4960 } 4961 4962 /* Implements vdo_zone_action_fn. */ 4963 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number, 4964 struct vdo_completion *parent) 4965 { 4966 struct slab_depot *depot = context; 4967 4968 scrub_slabs(&depot->allocators[zone_number], NULL); 4969 vdo_launch_completion(parent); 4970 } 4971 4972 /** 4973 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs. 4974 * @depot: The depot to scrub. 4975 * @parent: The object to notify when scrubbing has been launched for all zones. 4976 */ 4977 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot, 4978 struct vdo_completion *parent) 4979 { 4980 vdo_schedule_action(depot->action_manager, NULL, 4981 scrub_all_unrecovered_slabs, 4982 NULL, parent); 4983 } 4984 4985 /** 4986 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators 4987 * in the depot. 4988 * @depot: The slab depot. 4989 * 4990 * Return: The statistics from all block allocators in the depot. 4991 */ 4992 static struct block_allocator_statistics __must_check 4993 get_block_allocator_statistics(const struct slab_depot *depot) 4994 { 4995 struct block_allocator_statistics totals; 4996 zone_count_t zone; 4997 4998 memset(&totals, 0, sizeof(totals)); 4999 5000 for (zone = 0; zone < depot->zone_count; zone++) { 5001 const struct block_allocator *allocator = &depot->allocators[zone]; 5002 const struct block_allocator_statistics *stats = &allocator->statistics; 5003 5004 totals.slab_count += allocator->slab_count; 5005 totals.slabs_opened += READ_ONCE(stats->slabs_opened); 5006 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened); 5007 } 5008 5009 return totals; 5010 } 5011 5012 /** 5013 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot. 5014 * @depot: The slab depot. 5015 * 5016 * Return: The cumulative statistics for all ref_counts in the depot. 5017 */ 5018 static struct ref_counts_statistics __must_check 5019 get_ref_counts_statistics(const struct slab_depot *depot) 5020 { 5021 struct ref_counts_statistics totals; 5022 zone_count_t zone; 5023 5024 memset(&totals, 0, sizeof(totals)); 5025 5026 for (zone = 0; zone < depot->zone_count; zone++) { 5027 totals.blocks_written += 5028 READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written); 5029 } 5030 5031 return totals; 5032 } 5033 5034 /** 5035 * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot. 5036 * @depot: The slab depot. 5037 * 5038 * Return: The aggregated statistics for all slab journals in the depot. 5039 */ 5040 static struct slab_journal_statistics __must_check 5041 get_slab_journal_statistics(const struct slab_depot *depot) 5042 { 5043 struct slab_journal_statistics totals; 5044 zone_count_t zone; 5045 5046 memset(&totals, 0, sizeof(totals)); 5047 5048 for (zone = 0; zone < depot->zone_count; zone++) { 5049 const struct slab_journal_statistics *stats = 5050 &depot->allocators[zone].slab_journal_statistics; 5051 5052 totals.disk_full_count += READ_ONCE(stats->disk_full_count); 5053 totals.flush_count += READ_ONCE(stats->flush_count); 5054 totals.blocked_count += READ_ONCE(stats->blocked_count); 5055 totals.blocks_written += READ_ONCE(stats->blocks_written); 5056 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count); 5057 } 5058 5059 return totals; 5060 } 5061 5062 /** 5063 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the 5064 * slab depot. 5065 * @depot: The slab depot. 5066 * @stats: The vdo statistics structure to partially fill. 5067 */ 5068 void vdo_get_slab_depot_statistics(const struct slab_depot *depot, 5069 struct vdo_statistics *stats) 5070 { 5071 slab_count_t slab_count = READ_ONCE(depot->slab_count); 5072 slab_count_t unrecovered = 0; 5073 zone_count_t zone; 5074 5075 for (zone = 0; zone < depot->zone_count; zone++) { 5076 /* The allocators are responsible for thread safety. */ 5077 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count); 5078 } 5079 5080 stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count; 5081 stats->allocator = get_block_allocator_statistics(depot); 5082 stats->ref_counts = get_ref_counts_statistics(depot); 5083 stats->slab_journal = get_slab_journal_statistics(depot); 5084 stats->slab_summary = (struct slab_summary_statistics) { 5085 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written), 5086 }; 5087 } 5088 5089 /** 5090 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion. 5091 * @depot: The slab depot. 5092 */ 5093 void vdo_dump_slab_depot(const struct slab_depot *depot) 5094 { 5095 vdo_log_info("vdo slab depot"); 5096 vdo_log_info(" zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu", 5097 (unsigned int) depot->zone_count, 5098 (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count), 5099 (unsigned long long) depot->active_release_request, 5100 (unsigned long long) depot->new_release_request); 5101 } 5102