1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "data-vio.h" 7 8 #include <linux/atomic.h> 9 #include <linux/bio.h> 10 #include <linux/blkdev.h> 11 #include <linux/delay.h> 12 #include <linux/device-mapper.h> 13 #include <linux/jiffies.h> 14 #include <linux/kernel.h> 15 #include <linux/list.h> 16 #include <linux/lz4.h> 17 #include <linux/minmax.h> 18 #include <linux/sched.h> 19 #include <linux/spinlock.h> 20 #include <linux/string.h> 21 #include <linux/wait.h> 22 23 #include "logger.h" 24 #include "memory-alloc.h" 25 #include "murmurhash3.h" 26 #include "permassert.h" 27 28 #include "block-map.h" 29 #include "dump.h" 30 #include "encodings.h" 31 #include "int-map.h" 32 #include "io-submitter.h" 33 #include "logical-zone.h" 34 #include "packer.h" 35 #include "recovery-journal.h" 36 #include "slab-depot.h" 37 #include "status-codes.h" 38 #include "types.h" 39 #include "vdo.h" 40 #include "vio.h" 41 #include "wait-queue.h" 42 43 /** 44 * DOC: Bio flags. 45 * 46 * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those 47 * flags on our own bio(s) for that request may help underlying layers better fulfill the user 48 * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other 49 * flags, as they convey incorrect information. 50 * 51 * These flags are always irrelevant if we have already finished the user bio as they are only 52 * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how 53 * important finishing the finished bio was. 54 * 55 * Note that bio.c contains the complete list of flags we believe may be set; the following list 56 * explains the action taken with each of those flags VDO could receive: 57 * 58 * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio 59 * completion is required for further work to be done by the issuer. 60 * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer 61 * treats it as more urgent, similar to REQ_SYNC. 62 * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is 63 * important. 64 * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO. 65 * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't 66 * match incoming IO, so this flag is incorrect for it. 67 * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise. 68 * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance. 69 * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled 70 * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load 71 * prioritization. 72 */ 73 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD); 74 75 /** 76 * DOC: 77 * 78 * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For 79 * correctness, and in order to avoid potentially expensive or blocking memory allocations during 80 * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order 81 * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for 82 * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios 83 * for which a data_vio or discard permit are not available will block until the necessary 84 * resources are available. The pool is also responsible for distributing resources to blocked 85 * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by 86 * performing the work of actually assigning resources to blocked threads or placing data_vios back 87 * into the pool on a single cpu at a time. 88 * 89 * The pool contains two "limiters", one for tracking data_vios and one for tracking discard 90 * permits. The limiters also provide safe cross-thread access to pool statistics without the need 91 * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to 92 * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources 93 * are available, the incoming bio will be assigned to the acquired data_vio, and it will be 94 * launched. However, if either of these are unavailable, the arrival time of the bio is recorded 95 * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate 96 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will 97 * break if jiffies are only 32 bits.) 98 * 99 * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio() 100 * will be called on it. This function will add the data_vio to a funnel queue, and then check the 101 * state of the pool. If the pool is not currently processing released data_vios, the pool's 102 * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to 103 * hold the pool's lock, and also batches release work while avoiding starvation of the cpu 104 * threads. 105 * 106 * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which 107 * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For 108 * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there 109 * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the 110 * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting 111 * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool. 112 * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or 113 * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the 114 * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit 115 * them are awakened. 116 */ 117 118 #define DATA_VIO_RELEASE_BATCH_SIZE 128 119 120 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1; 121 static const u32 COMPRESSION_STATUS_MASK = 0xff; 122 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000; 123 124 struct limiter; 125 typedef void (*assigner_fn)(struct limiter *limiter); 126 127 /* Bookkeeping structure for a single type of resource. */ 128 struct limiter { 129 /* The data_vio_pool to which this limiter belongs */ 130 struct data_vio_pool *pool; 131 /* The maximum number of data_vios available */ 132 data_vio_count_t limit; 133 /* The number of resources in use */ 134 data_vio_count_t busy; 135 /* The maximum number of resources ever simultaneously in use */ 136 data_vio_count_t max_busy; 137 /* The number of resources to release */ 138 data_vio_count_t release_count; 139 /* The number of waiters to wake */ 140 data_vio_count_t wake_count; 141 /* The list of waiting bios which are known to process_release_callback() */ 142 struct bio_list waiters; 143 /* The list of waiting bios which are not yet known to process_release_callback() */ 144 struct bio_list new_waiters; 145 /* The list of waiters which have their permits */ 146 struct bio_list *permitted_waiters; 147 /* The function for assigning a resource to a waiter */ 148 assigner_fn assigner; 149 /* The queue of blocked threads */ 150 wait_queue_head_t blocked_threads; 151 /* The arrival time of the eldest waiter */ 152 u64 arrival; 153 }; 154 155 /* 156 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread, 157 * and are released in batches. 158 */ 159 struct data_vio_pool { 160 /* Completion for scheduling releases */ 161 struct vdo_completion completion; 162 /* The administrative state of the pool */ 163 struct admin_state state; 164 /* Lock protecting the pool */ 165 spinlock_t lock; 166 /* The main limiter controlling the total data_vios in the pool. */ 167 struct limiter limiter; 168 /* The limiter controlling data_vios for discard */ 169 struct limiter discard_limiter; 170 /* The list of bios which have discard permits but still need a data_vio */ 171 struct bio_list permitted_discards; 172 /* The list of available data_vios */ 173 struct list_head available; 174 /* The queue of data_vios waiting to be returned to the pool */ 175 struct funnel_queue *queue; 176 /* Whether the pool is processing, or scheduled to process releases */ 177 atomic_t processing; 178 /* The data vios in the pool */ 179 struct data_vio data_vios[]; 180 }; 181 182 static const char * const ASYNC_OPERATION_NAMES[] = { 183 "launch", 184 "acknowledge_write", 185 "acquire_hash_lock", 186 "attempt_logical_block_lock", 187 "lock_duplicate_pbn", 188 "check_for_duplication", 189 "cleanup", 190 "compress_data_vio", 191 "find_block_map_slot", 192 "get_mapped_block_for_read", 193 "get_mapped_block_for_write", 194 "hash_data_vio", 195 "journal_remapping", 196 "vdo_attempt_packing", 197 "put_mapped_block", 198 "read_data_vio", 199 "update_dedupe_index", 200 "update_reference_counts", 201 "verify_duplication", 202 "write_data_vio", 203 }; 204 205 /* The steps taken cleaning up a VIO, in the order they are performed. */ 206 enum data_vio_cleanup_stage { 207 VIO_CLEANUP_START, 208 VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START, 209 VIO_RELEASE_ALLOCATED, 210 VIO_RELEASE_RECOVERY_LOCKS, 211 VIO_RELEASE_LOGICAL, 212 VIO_CLEANUP_DONE 213 }; 214 215 static inline struct data_vio_pool * __must_check 216 as_data_vio_pool(struct vdo_completion *completion) 217 { 218 vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION); 219 return container_of(completion, struct data_vio_pool, completion); 220 } 221 222 static inline u64 get_arrival_time(struct bio *bio) 223 { 224 return (u64) bio->bi_private; 225 } 226 227 /** 228 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios 229 * or waiters while holding the pool's lock. 230 * @pool: The data_vio pool. 231 */ 232 static bool check_for_drain_complete_locked(struct data_vio_pool *pool) 233 { 234 if (pool->limiter.busy > 0) 235 return false; 236 237 VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0), 238 "no outstanding discard permits"); 239 240 return (bio_list_empty(&pool->limiter.new_waiters) && 241 bio_list_empty(&pool->discard_limiter.new_waiters)); 242 } 243 244 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn) 245 { 246 struct vdo *vdo = vdo_from_data_vio(data_vio); 247 zone_count_t zone_number; 248 struct lbn_lock *lock = &data_vio->logical; 249 250 lock->lbn = lbn; 251 lock->locked = false; 252 vdo_waitq_init(&lock->waiters); 253 zone_number = vdo_compute_logical_zone(data_vio); 254 lock->zone = &vdo->logical_zones->zones[zone_number]; 255 } 256 257 static void launch_locked_request(struct data_vio *data_vio) 258 { 259 data_vio->logical.locked = true; 260 if (data_vio->write) { 261 struct vdo *vdo = vdo_from_data_vio(data_vio); 262 263 if (vdo_is_read_only(vdo)) { 264 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 265 return; 266 } 267 } 268 269 data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT; 270 vdo_find_block_map_slot(data_vio); 271 } 272 273 static void acknowledge_data_vio(struct data_vio *data_vio) 274 { 275 struct vdo *vdo = vdo_from_data_vio(data_vio); 276 struct bio *bio = data_vio->user_bio; 277 int error = vdo_status_to_errno(data_vio->vio.completion.result); 278 279 if (bio == NULL) 280 return; 281 282 VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <= 283 (u32) (VDO_BLOCK_SIZE - data_vio->offset)), 284 "data_vio to acknowledge is not an incomplete discard"); 285 286 data_vio->user_bio = NULL; 287 vdo_count_bios(&vdo->stats.bios_acknowledged, bio); 288 if (data_vio->is_partial) 289 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio); 290 291 bio->bi_status = errno_to_blk_status(error); 292 bio_endio(bio); 293 } 294 295 static void copy_to_bio(struct bio *bio, char *data_ptr) 296 { 297 struct bio_vec biovec; 298 struct bvec_iter iter; 299 300 bio_for_each_segment(biovec, bio, iter) { 301 memcpy_to_bvec(&biovec, data_ptr); 302 data_ptr += biovec.bv_len; 303 } 304 } 305 306 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio) 307 { 308 u32 packed = atomic_read(&data_vio->compression.status); 309 310 /* pairs with cmpxchg in set_data_vio_compression_status */ 311 smp_rmb(); 312 return (struct data_vio_compression_status) { 313 .stage = packed & COMPRESSION_STATUS_MASK, 314 .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0), 315 }; 316 } 317 318 /** 319 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored 320 * atomically. 321 * @status: The state to convert. 322 * 323 * Return: The compression state packed into a u32. 324 */ 325 static u32 __must_check pack_status(struct data_vio_compression_status status) 326 { 327 return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0); 328 } 329 330 /** 331 * set_data_vio_compression_status() - Set the compression status of a data_vio. 332 * @data_vio: The data_vio to change. 333 * @status: The expected current status of the data_vio. 334 * @new_status: The status to set. 335 * 336 * Return: true if the new status was set, false if the data_vio's compression status did not 337 * match the expected state, and so was left unchanged. 338 */ 339 static bool __must_check 340 set_data_vio_compression_status(struct data_vio *data_vio, 341 struct data_vio_compression_status status, 342 struct data_vio_compression_status new_status) 343 { 344 u32 actual; 345 u32 expected = pack_status(status); 346 u32 replacement = pack_status(new_status); 347 348 /* 349 * Extra barriers because this was original developed using a CAS operation that implicitly 350 * had them. 351 */ 352 smp_mb__before_atomic(); 353 actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement); 354 /* same as before_atomic */ 355 smp_mb__after_atomic(); 356 return (expected == actual); 357 } 358 359 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio) 360 { 361 for (;;) { 362 struct data_vio_compression_status status = 363 get_data_vio_compression_status(data_vio); 364 struct data_vio_compression_status new_status = status; 365 366 if (status.stage == DATA_VIO_POST_PACKER) { 367 /* We're already in the last stage. */ 368 return status; 369 } 370 371 if (status.may_not_compress) { 372 /* 373 * Compression has been dis-allowed for this VIO, so skip the rest of the 374 * path and go to the end. 375 */ 376 new_status.stage = DATA_VIO_POST_PACKER; 377 } else { 378 /* Go to the next state. */ 379 new_status.stage++; 380 } 381 382 if (set_data_vio_compression_status(data_vio, status, new_status)) 383 return new_status; 384 385 /* Another thread changed the status out from under us so try again. */ 386 } 387 } 388 389 /** 390 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed. 391 * @data_vio: The data_vio. 392 * 393 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it. 394 */ 395 bool cancel_data_vio_compression(struct data_vio *data_vio) 396 { 397 struct data_vio_compression_status status, new_status; 398 399 for (;;) { 400 status = get_data_vio_compression_status(data_vio); 401 if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) { 402 /* This data_vio is already set up to not block in the packer. */ 403 break; 404 } 405 406 new_status.stage = status.stage; 407 new_status.may_not_compress = true; 408 409 if (set_data_vio_compression_status(data_vio, status, new_status)) 410 break; 411 } 412 413 return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress); 414 } 415 416 /** 417 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block. 418 * @completion: The data_vio for an external data request as a completion. 419 * 420 * This is the start of the path for all external requests. It is registered in launch_data_vio(). 421 */ 422 static void attempt_logical_block_lock(struct vdo_completion *completion) 423 { 424 struct data_vio *data_vio = as_data_vio(completion); 425 struct lbn_lock *lock = &data_vio->logical; 426 struct vdo *vdo = vdo_from_data_vio(data_vio); 427 struct data_vio *lock_holder; 428 int result; 429 430 assert_data_vio_in_logical_zone(data_vio); 431 432 if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) { 433 continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE); 434 return; 435 } 436 437 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, 438 data_vio, false, (void **) &lock_holder); 439 if (result != VDO_SUCCESS) { 440 continue_data_vio_with_error(data_vio, result); 441 return; 442 } 443 444 if (lock_holder == NULL) { 445 /* We got the lock */ 446 launch_locked_request(data_vio); 447 return; 448 } 449 450 result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held"); 451 if (result != VDO_SUCCESS) { 452 continue_data_vio_with_error(data_vio, result); 453 return; 454 } 455 456 /* 457 * If the new request is a pure read request (not read-modify-write) and the lock_holder is 458 * writing and has received an allocation, service the read request immediately by copying 459 * data from the lock_holder to avoid having to flush the write out of the packer just to 460 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an 461 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in 462 * order to prevent returning data that may not have actually been written. 463 */ 464 if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) { 465 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset); 466 acknowledge_data_vio(data_vio); 467 complete_data_vio(completion); 468 return; 469 } 470 471 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK; 472 vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter); 473 474 /* 475 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the 476 * packer. 477 */ 478 if (lock_holder->write && cancel_data_vio_compression(lock_holder)) { 479 data_vio->compression.lock_holder = lock_holder; 480 launch_data_vio_packer_callback(data_vio, 481 vdo_remove_lock_holder_from_packer); 482 } 483 } 484 485 /** 486 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the 487 * same parent and other state and send it on its way. 488 * @data_vio: The data_vio to launch. 489 * @lbn: The logical block number. 490 */ 491 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn) 492 { 493 struct vdo_completion *completion = &data_vio->vio.completion; 494 495 /* 496 * Clearing the tree lock must happen before initializing the LBN lock, which also adds 497 * information to the tree lock. 498 */ 499 memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock)); 500 initialize_lbn_lock(data_vio, lbn); 501 INIT_LIST_HEAD(&data_vio->hash_lock_entry); 502 INIT_LIST_HEAD(&data_vio->write_entry); 503 504 memset(&data_vio->allocation, 0, sizeof(data_vio->allocation)); 505 506 data_vio->is_duplicate = false; 507 508 memset(&data_vio->record_name, 0, sizeof(data_vio->record_name)); 509 memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate)); 510 vdo_reset_completion(&data_vio->decrement_completion); 511 vdo_reset_completion(completion); 512 completion->error_handler = handle_data_vio_error; 513 set_data_vio_logical_callback(data_vio, attempt_logical_block_lock); 514 vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY); 515 } 516 517 static void copy_from_bio(struct bio *bio, char *data_ptr) 518 { 519 struct bio_vec biovec; 520 struct bvec_iter iter; 521 522 bio_for_each_segment(biovec, bio, iter) { 523 memcpy_from_bvec(data_ptr, &biovec); 524 data_ptr += biovec.bv_len; 525 } 526 } 527 528 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio) 529 { 530 logical_block_number_t lbn; 531 /* 532 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to 533 * separately allocated objects). 534 */ 535 memset(data_vio, 0, offsetof(struct data_vio, vio)); 536 memset(&data_vio->compression, 0, offsetof(struct compression_state, block)); 537 538 data_vio->user_bio = bio; 539 data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK); 540 data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0); 541 542 /* 543 * Discards behave very differently than other requests when coming in from device-mapper. 544 * We have to be able to handle any size discards and various sector offsets within a 545 * block. 546 */ 547 if (bio_op(bio) == REQ_OP_DISCARD) { 548 data_vio->remaining_discard = bio->bi_iter.bi_size; 549 data_vio->write = true; 550 data_vio->is_discard = true; 551 if (data_vio->is_partial) { 552 vdo_count_bios(&vdo->stats.bios_in_partial, bio); 553 data_vio->read = true; 554 } 555 } else if (data_vio->is_partial) { 556 vdo_count_bios(&vdo->stats.bios_in_partial, bio); 557 data_vio->read = true; 558 if (bio_data_dir(bio) == WRITE) 559 data_vio->write = true; 560 } else if (bio_data_dir(bio) == READ) { 561 data_vio->read = true; 562 } else { 563 /* 564 * Copy the bio data to a char array so that we can continue to use the data after 565 * we acknowledge the bio. 566 */ 567 copy_from_bio(bio, data_vio->vio.data); 568 data_vio->is_zero = mem_is_zero(data_vio->vio.data, VDO_BLOCK_SIZE); 569 data_vio->write = true; 570 } 571 572 if (data_vio->user_bio->bi_opf & REQ_FUA) 573 data_vio->fua = true; 574 575 lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK; 576 launch_data_vio(data_vio, lbn); 577 } 578 579 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio) 580 { 581 struct bio *bio = bio_list_pop(limiter->permitted_waiters); 582 583 launch_bio(limiter->pool->completion.vdo, data_vio, bio); 584 limiter->wake_count++; 585 586 bio = bio_list_peek(limiter->permitted_waiters); 587 limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio)); 588 } 589 590 static void assign_discard_permit(struct limiter *limiter) 591 { 592 struct bio *bio = bio_list_pop(&limiter->waiters); 593 594 if (limiter->arrival == U64_MAX) 595 limiter->arrival = get_arrival_time(bio); 596 597 bio_list_add(limiter->permitted_waiters, bio); 598 } 599 600 static void get_waiters(struct limiter *limiter) 601 { 602 bio_list_merge_init(&limiter->waiters, &limiter->new_waiters); 603 } 604 605 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool) 606 { 607 struct data_vio *data_vio = 608 list_first_entry(&pool->available, struct data_vio, pool_entry); 609 610 list_del_init(&data_vio->pool_entry); 611 return data_vio; 612 } 613 614 static void assign_data_vio_to_waiter(struct limiter *limiter) 615 { 616 assign_data_vio(limiter, get_available_data_vio(limiter->pool)); 617 } 618 619 static void update_limiter(struct limiter *limiter) 620 { 621 struct bio_list *waiters = &limiter->waiters; 622 data_vio_count_t available = limiter->limit - limiter->busy; 623 624 VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy), 625 "Release count %u is not more than busy count %u", 626 limiter->release_count, limiter->busy); 627 628 get_waiters(limiter); 629 for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--) 630 limiter->assigner(limiter); 631 632 if (limiter->release_count > 0) { 633 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count); 634 limiter->release_count = 0; 635 return; 636 } 637 638 for (; (available > 0) && !bio_list_empty(waiters); available--) 639 limiter->assigner(limiter); 640 641 WRITE_ONCE(limiter->busy, limiter->limit - available); 642 if (limiter->max_busy < limiter->busy) 643 WRITE_ONCE(limiter->max_busy, limiter->busy); 644 } 645 646 /** 647 * schedule_releases() - Ensure that release processing is scheduled. 648 * @pool: The data_vio pool. 649 * 650 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already 651 * done so. 652 */ 653 static void schedule_releases(struct data_vio_pool *pool) 654 { 655 /* Pairs with the barrier in process_release_callback(). */ 656 smp_mb__before_atomic(); 657 if (atomic_cmpxchg(&pool->processing, false, true)) 658 return; 659 660 pool->completion.requeue = true; 661 vdo_launch_completion_with_priority(&pool->completion, 662 CPU_Q_COMPLETE_VIO_PRIORITY); 663 } 664 665 static void reuse_or_release_resources(struct data_vio_pool *pool, 666 struct data_vio *data_vio, 667 struct list_head *returned) 668 { 669 if (data_vio->remaining_discard > 0) { 670 if (bio_list_empty(&pool->discard_limiter.waiters)) { 671 /* Return the data_vio's discard permit. */ 672 pool->discard_limiter.release_count++; 673 } else { 674 assign_discard_permit(&pool->discard_limiter); 675 } 676 } 677 678 if (pool->limiter.arrival < pool->discard_limiter.arrival) { 679 assign_data_vio(&pool->limiter, data_vio); 680 } else if (pool->discard_limiter.arrival < U64_MAX) { 681 assign_data_vio(&pool->discard_limiter, data_vio); 682 } else { 683 list_add(&data_vio->pool_entry, returned); 684 pool->limiter.release_count++; 685 } 686 } 687 688 /** 689 * process_release_callback() - Process a batch of data_vio releases. 690 * @completion: The pool with data_vios to release. 691 */ 692 static void process_release_callback(struct vdo_completion *completion) 693 { 694 struct data_vio_pool *pool = as_data_vio_pool(completion); 695 bool reschedule; 696 bool drained; 697 data_vio_count_t processed; 698 data_vio_count_t to_wake; 699 data_vio_count_t discards_to_wake; 700 LIST_HEAD(returned); 701 702 spin_lock(&pool->lock); 703 get_waiters(&pool->discard_limiter); 704 get_waiters(&pool->limiter); 705 spin_unlock(&pool->lock); 706 707 if (pool->limiter.arrival == U64_MAX) { 708 struct bio *bio = bio_list_peek(&pool->limiter.waiters); 709 710 if (bio != NULL) 711 pool->limiter.arrival = get_arrival_time(bio); 712 } 713 714 for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) { 715 struct data_vio *data_vio; 716 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue); 717 718 if (entry == NULL) 719 break; 720 721 data_vio = as_data_vio(container_of(entry, struct vdo_completion, 722 work_queue_entry_link)); 723 acknowledge_data_vio(data_vio); 724 reuse_or_release_resources(pool, data_vio, &returned); 725 } 726 727 spin_lock(&pool->lock); 728 /* 729 * There is a race where waiters could be added while we are in the unlocked section above. 730 * Those waiters could not see the resources we are now about to release, so we assign 731 * those resources now as we have no guarantee of being rescheduled. This is handled in 732 * update_limiter(). 733 */ 734 update_limiter(&pool->discard_limiter); 735 list_splice(&returned, &pool->available); 736 update_limiter(&pool->limiter); 737 to_wake = pool->limiter.wake_count; 738 pool->limiter.wake_count = 0; 739 discards_to_wake = pool->discard_limiter.wake_count; 740 pool->discard_limiter.wake_count = 0; 741 742 atomic_set(&pool->processing, false); 743 /* Pairs with the barrier in schedule_releases(). */ 744 smp_mb(); 745 746 reschedule = !vdo_is_funnel_queue_empty(pool->queue); 747 drained = (!reschedule && 748 vdo_is_state_draining(&pool->state) && 749 check_for_drain_complete_locked(pool)); 750 spin_unlock(&pool->lock); 751 752 if (to_wake > 0) 753 wake_up_nr(&pool->limiter.blocked_threads, to_wake); 754 755 if (discards_to_wake > 0) 756 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake); 757 758 if (reschedule) 759 schedule_releases(pool); 760 else if (drained) 761 vdo_finish_draining(&pool->state); 762 } 763 764 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool, 765 assigner_fn assigner, data_vio_count_t limit) 766 { 767 limiter->pool = pool; 768 limiter->assigner = assigner; 769 limiter->limit = limit; 770 limiter->arrival = U64_MAX; 771 init_waitqueue_head(&limiter->blocked_threads); 772 } 773 774 /** 775 * initialize_data_vio() - Allocate the components of a data_vio. 776 * @data_vio: The data_vio to initialize. 777 * @vdo: The vdo containing the data_vio. 778 * 779 * The caller is responsible for cleaning up the data_vio on error. 780 * 781 * Return: VDO_SUCCESS or an error. 782 */ 783 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo) 784 { 785 struct bio *bio; 786 int result; 787 788 BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE); 789 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data", 790 &data_vio->vio.data); 791 if (result != VDO_SUCCESS) 792 return vdo_log_error_strerror(result, 793 "data_vio data allocation failure"); 794 795 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block", 796 &data_vio->compression.block); 797 if (result != VDO_SUCCESS) { 798 return vdo_log_error_strerror(result, 799 "data_vio compressed block allocation failure"); 800 } 801 802 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch", 803 &data_vio->scratch_block); 804 if (result != VDO_SUCCESS) 805 return vdo_log_error_strerror(result, 806 "data_vio scratch allocation failure"); 807 808 result = vdo_create_bio(&bio); 809 if (result != VDO_SUCCESS) 810 return vdo_log_error_strerror(result, 811 "data_vio data bio allocation failure"); 812 813 vdo_initialize_completion(&data_vio->decrement_completion, vdo, 814 VDO_DECREMENT_COMPLETION); 815 initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo); 816 817 return VDO_SUCCESS; 818 } 819 820 static void destroy_data_vio(struct data_vio *data_vio) 821 { 822 if (data_vio == NULL) 823 return; 824 825 vdo_free_bio(vdo_forget(data_vio->vio.bio)); 826 vdo_free(vdo_forget(data_vio->vio.data)); 827 vdo_free(vdo_forget(data_vio->compression.block)); 828 vdo_free(vdo_forget(data_vio->scratch_block)); 829 } 830 831 /** 832 * make_data_vio_pool() - Initialize a data_vio pool. 833 * @vdo: The vdo to which the pool will belong. 834 * @pool_size: The number of data_vios in the pool. 835 * @discard_limit: The maximum number of data_vios which may be used for discards. 836 * @pool_ptr: A pointer to hold the newly allocated pool. 837 */ 838 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size, 839 data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr) 840 { 841 int result; 842 struct data_vio_pool *pool; 843 data_vio_count_t i; 844 845 result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio, 846 __func__, &pool); 847 if (result != VDO_SUCCESS) 848 return result; 849 850 VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size), 851 "discard limit does not exceed pool size"); 852 initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit, 853 discard_limit); 854 pool->discard_limiter.permitted_waiters = &pool->permitted_discards; 855 initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size); 856 pool->limiter.permitted_waiters = &pool->limiter.waiters; 857 INIT_LIST_HEAD(&pool->available); 858 spin_lock_init(&pool->lock); 859 vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 860 vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION); 861 vdo_prepare_completion(&pool->completion, process_release_callback, 862 process_release_callback, vdo->thread_config.cpu_thread, 863 NULL); 864 865 result = vdo_make_funnel_queue(&pool->queue); 866 if (result != VDO_SUCCESS) { 867 free_data_vio_pool(vdo_forget(pool)); 868 return result; 869 } 870 871 for (i = 0; i < pool_size; i++) { 872 struct data_vio *data_vio = &pool->data_vios[i]; 873 874 result = initialize_data_vio(data_vio, vdo); 875 if (result != VDO_SUCCESS) { 876 destroy_data_vio(data_vio); 877 free_data_vio_pool(pool); 878 return result; 879 } 880 881 list_add(&data_vio->pool_entry, &pool->available); 882 } 883 884 *pool_ptr = pool; 885 return VDO_SUCCESS; 886 } 887 888 /** 889 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it. 890 * @pool: The data_vio pool to free. 891 * 892 * All data_vios must be returned to the pool before calling this function. 893 */ 894 void free_data_vio_pool(struct data_vio_pool *pool) 895 { 896 struct data_vio *data_vio, *tmp; 897 898 if (pool == NULL) 899 return; 900 901 /* 902 * Pairs with the barrier in process_release_callback(). Possibly not needed since it 903 * caters to an enqueue vs. free race. 904 */ 905 smp_mb(); 906 BUG_ON(atomic_read(&pool->processing)); 907 908 spin_lock(&pool->lock); 909 VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0), 910 "data_vio pool must not have %u busy entries when being freed", 911 pool->limiter.busy); 912 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) && 913 bio_list_empty(&pool->limiter.new_waiters)), 914 "data_vio pool must not have threads waiting to read or write when being freed"); 915 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) && 916 bio_list_empty(&pool->discard_limiter.new_waiters)), 917 "data_vio pool must not have threads waiting to discard when being freed"); 918 spin_unlock(&pool->lock); 919 920 list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) { 921 list_del_init(&data_vio->pool_entry); 922 destroy_data_vio(data_vio); 923 } 924 925 vdo_free_funnel_queue(vdo_forget(pool->queue)); 926 vdo_free(pool); 927 } 928 929 static bool acquire_permit(struct limiter *limiter) 930 { 931 if (limiter->busy >= limiter->limit) 932 return false; 933 934 WRITE_ONCE(limiter->busy, limiter->busy + 1); 935 if (limiter->max_busy < limiter->busy) 936 WRITE_ONCE(limiter->max_busy, limiter->busy); 937 return true; 938 } 939 940 static void wait_permit(struct limiter *limiter, struct bio *bio) 941 __releases(&limiter->pool->lock) 942 { 943 DEFINE_WAIT(wait); 944 945 bio_list_add(&limiter->new_waiters, bio); 946 prepare_to_wait_exclusive(&limiter->blocked_threads, &wait, 947 TASK_UNINTERRUPTIBLE); 948 spin_unlock(&limiter->pool->lock); 949 io_schedule(); 950 finish_wait(&limiter->blocked_threads, &wait); 951 } 952 953 /** 954 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it. 955 * @pool: The data_vio pool. 956 * @bio: The bio to launch. 957 * 958 * This will block if data_vios or discard permits are not available. 959 */ 960 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio) 961 { 962 struct data_vio *data_vio; 963 964 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state), 965 "data_vio_pool not quiescent on acquire"); 966 967 bio->bi_private = (void *) jiffies; 968 spin_lock(&pool->lock); 969 if ((bio_op(bio) == REQ_OP_DISCARD) && 970 !acquire_permit(&pool->discard_limiter)) { 971 wait_permit(&pool->discard_limiter, bio); 972 return; 973 } 974 975 if (!acquire_permit(&pool->limiter)) { 976 wait_permit(&pool->limiter, bio); 977 return; 978 } 979 980 data_vio = get_available_data_vio(pool); 981 spin_unlock(&pool->lock); 982 launch_bio(pool->completion.vdo, data_vio, bio); 983 } 984 985 /* Implements vdo_admin_initiator_fn. */ 986 static void initiate_drain(struct admin_state *state) 987 { 988 bool drained; 989 struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state); 990 991 spin_lock(&pool->lock); 992 drained = check_for_drain_complete_locked(pool); 993 spin_unlock(&pool->lock); 994 995 if (drained) 996 vdo_finish_draining(state); 997 } 998 999 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name) 1000 { 1001 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread), 1002 "%s called on cpu thread", name); 1003 } 1004 1005 /** 1006 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool. 1007 * @pool: The data_vio pool. 1008 * @completion: The completion to notify when the pool has drained. 1009 */ 1010 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion) 1011 { 1012 assert_on_vdo_cpu_thread(completion->vdo, __func__); 1013 vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion, 1014 initiate_drain); 1015 } 1016 1017 /** 1018 * resume_data_vio_pool() - Resume a data_vio pool. 1019 * @pool: The data_vio pool. 1020 * @completion: The completion to notify when the pool has resumed. 1021 */ 1022 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion) 1023 { 1024 assert_on_vdo_cpu_thread(completion->vdo, __func__); 1025 vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state)); 1026 } 1027 1028 static void dump_limiter(const char *name, struct limiter *limiter) 1029 { 1030 vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy, 1031 limiter->limit, limiter->max_busy, 1032 ((bio_list_empty(&limiter->waiters) && 1033 bio_list_empty(&limiter->new_waiters)) ? 1034 "no waiters" : "has waiters")); 1035 } 1036 1037 /** 1038 * dump_data_vio_pool() - Dump a data_vio pool to the log. 1039 * @pool: The data_vio pool. 1040 * @dump_vios: Whether to dump the details of each busy data_vio as well. 1041 */ 1042 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios) 1043 { 1044 /* 1045 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the 1046 * second clock tick). These numbers were picked based on experiments with lab machines. 1047 */ 1048 static const int ELEMENTS_PER_BATCH = 35; 1049 static const int SLEEP_FOR_SYSLOG = 4000; 1050 1051 if (pool == NULL) 1052 return; 1053 1054 spin_lock(&pool->lock); 1055 dump_limiter("data_vios", &pool->limiter); 1056 dump_limiter("discard permits", &pool->discard_limiter); 1057 if (dump_vios) { 1058 int i; 1059 int dumped = 0; 1060 1061 for (i = 0; i < pool->limiter.limit; i++) { 1062 struct data_vio *data_vio = &pool->data_vios[i]; 1063 1064 if (!list_empty(&data_vio->pool_entry)) 1065 continue; 1066 1067 dump_data_vio(data_vio); 1068 if (++dumped >= ELEMENTS_PER_BATCH) { 1069 spin_unlock(&pool->lock); 1070 dumped = 0; 1071 fsleep(SLEEP_FOR_SYSLOG); 1072 spin_lock(&pool->lock); 1073 } 1074 } 1075 } 1076 1077 spin_unlock(&pool->lock); 1078 } 1079 1080 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool) 1081 { 1082 return READ_ONCE(pool->limiter.busy); 1083 } 1084 1085 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool) 1086 { 1087 return READ_ONCE(pool->limiter.limit); 1088 } 1089 1090 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool) 1091 { 1092 return READ_ONCE(pool->limiter.max_busy); 1093 } 1094 1095 static void update_data_vio_error_stats(struct data_vio *data_vio) 1096 { 1097 u8 index = 0; 1098 static const char * const operations[] = { 1099 [0] = "empty", 1100 [1] = "read", 1101 [2] = "write", 1102 [3] = "read-modify-write", 1103 [5] = "read+fua", 1104 [6] = "write+fua", 1105 [7] = "read-modify-write+fua", 1106 }; 1107 1108 if (data_vio->read) 1109 index = 1; 1110 1111 if (data_vio->write) 1112 index += 2; 1113 1114 if (data_vio->fua) 1115 index += 4; 1116 1117 update_vio_error_stats(&data_vio->vio, 1118 "Completing %s vio for LBN %llu with error after %s", 1119 operations[index], 1120 (unsigned long long) data_vio->logical.lbn, 1121 get_data_vio_operation_name(data_vio)); 1122 } 1123 1124 static void perform_cleanup_stage(struct data_vio *data_vio, 1125 enum data_vio_cleanup_stage stage); 1126 1127 /** 1128 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at 1129 * the end of processing a data_vio. 1130 * @completion: The data_vio holding the lock. 1131 */ 1132 static void release_allocated_lock(struct vdo_completion *completion) 1133 { 1134 struct data_vio *data_vio = as_data_vio(completion); 1135 1136 assert_data_vio_in_allocated_zone(data_vio); 1137 release_data_vio_allocation_lock(data_vio, false); 1138 perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS); 1139 } 1140 1141 /** release_lock() - Release an uncontended LBN lock. */ 1142 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock) 1143 { 1144 struct int_map *lock_map = lock->zone->lbn_operations; 1145 struct data_vio *lock_holder; 1146 1147 if (!lock->locked) { 1148 /* The lock is not locked, so it had better not be registered in the lock map. */ 1149 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn); 1150 1151 VDO_ASSERT_LOG_ONLY((data_vio != lock_holder), 1152 "no logical block lock held for block %llu", 1153 (unsigned long long) lock->lbn); 1154 return; 1155 } 1156 1157 /* Release the lock by removing the lock from the map. */ 1158 lock_holder = vdo_int_map_remove(lock_map, lock->lbn); 1159 VDO_ASSERT_LOG_ONLY((data_vio == lock_holder), 1160 "logical block lock mismatch for block %llu", 1161 (unsigned long long) lock->lbn); 1162 lock->locked = false; 1163 } 1164 1165 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */ 1166 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock) 1167 { 1168 struct data_vio *lock_holder, *next_lock_holder; 1169 int result; 1170 1171 VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked"); 1172 1173 /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */ 1174 next_lock_holder = 1175 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters)); 1176 1177 /* Transfer the remaining lock waiters to the next lock holder. */ 1178 vdo_waitq_transfer_all_waiters(&lock->waiters, 1179 &next_lock_holder->logical.waiters); 1180 1181 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, 1182 next_lock_holder, true, (void **) &lock_holder); 1183 if (result != VDO_SUCCESS) { 1184 continue_data_vio_with_error(next_lock_holder, result); 1185 return; 1186 } 1187 1188 VDO_ASSERT_LOG_ONLY((lock_holder == data_vio), 1189 "logical block lock mismatch for block %llu", 1190 (unsigned long long) lock->lbn); 1191 lock->locked = false; 1192 1193 /* 1194 * If there are still waiters, other data_vios must be trying to get the lock we just 1195 * transferred. We must ensure that the new lock holder doesn't block in the packer. 1196 */ 1197 if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters)) 1198 cancel_data_vio_compression(next_lock_holder); 1199 1200 /* 1201 * Avoid stack overflow on lock transfer. 1202 * FIXME: this is only an issue in the 1 thread config. 1203 */ 1204 next_lock_holder->vio.completion.requeue = true; 1205 launch_locked_request(next_lock_holder); 1206 } 1207 1208 /** 1209 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of 1210 * processing a data_vio. 1211 * @completion: The data_vio holding the lock. 1212 */ 1213 static void release_logical_lock(struct vdo_completion *completion) 1214 { 1215 struct data_vio *data_vio = as_data_vio(completion); 1216 struct lbn_lock *lock = &data_vio->logical; 1217 1218 assert_data_vio_in_logical_zone(data_vio); 1219 1220 if (vdo_waitq_has_waiters(&lock->waiters)) 1221 transfer_lock(data_vio, lock); 1222 else 1223 release_lock(data_vio, lock); 1224 1225 vdo_release_flush_generation_lock(data_vio); 1226 perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE); 1227 } 1228 1229 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */ 1230 static void clean_hash_lock(struct vdo_completion *completion) 1231 { 1232 struct data_vio *data_vio = as_data_vio(completion); 1233 1234 assert_data_vio_in_hash_zone(data_vio); 1235 if (completion->result != VDO_SUCCESS) { 1236 vdo_clean_failed_hash_lock(data_vio); 1237 return; 1238 } 1239 1240 vdo_release_hash_lock(data_vio); 1241 perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL); 1242 } 1243 1244 /** 1245 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up. 1246 * @data_vio: The data_vio. 1247 * 1248 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the 1249 * pool. 1250 */ 1251 static void finish_cleanup(struct data_vio *data_vio) 1252 { 1253 struct vdo_completion *completion = &data_vio->vio.completion; 1254 u32 discard_size = min_t(u32, data_vio->remaining_discard, 1255 VDO_BLOCK_SIZE - data_vio->offset); 1256 1257 VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL, 1258 "complete data_vio has no allocation lock"); 1259 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL, 1260 "complete data_vio has no hash lock"); 1261 if ((data_vio->remaining_discard <= discard_size) || 1262 (completion->result != VDO_SUCCESS)) { 1263 struct data_vio_pool *pool = completion->vdo->data_vio_pool; 1264 1265 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link); 1266 schedule_releases(pool); 1267 return; 1268 } 1269 1270 data_vio->remaining_discard -= discard_size; 1271 data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE); 1272 data_vio->read = data_vio->is_partial; 1273 data_vio->offset = 0; 1274 completion->requeue = true; 1275 data_vio->first_reference_operation_complete = false; 1276 launch_data_vio(data_vio, data_vio->logical.lbn + 1); 1277 } 1278 1279 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */ 1280 static void perform_cleanup_stage(struct data_vio *data_vio, 1281 enum data_vio_cleanup_stage stage) 1282 { 1283 struct vdo *vdo = vdo_from_data_vio(data_vio); 1284 1285 switch (stage) { 1286 case VIO_RELEASE_HASH_LOCK: 1287 if (data_vio->hash_lock != NULL) { 1288 launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock); 1289 return; 1290 } 1291 fallthrough; 1292 1293 case VIO_RELEASE_ALLOCATED: 1294 if (data_vio_has_allocation(data_vio)) { 1295 launch_data_vio_allocated_zone_callback(data_vio, 1296 release_allocated_lock); 1297 return; 1298 } 1299 fallthrough; 1300 1301 case VIO_RELEASE_RECOVERY_LOCKS: 1302 if ((data_vio->recovery_sequence_number > 0) && 1303 (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) && 1304 (data_vio->vio.completion.result != VDO_READ_ONLY)) 1305 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock"); 1306 fallthrough; 1307 1308 case VIO_RELEASE_LOGICAL: 1309 launch_data_vio_logical_callback(data_vio, release_logical_lock); 1310 return; 1311 1312 default: 1313 finish_cleanup(data_vio); 1314 } 1315 } 1316 1317 void complete_data_vio(struct vdo_completion *completion) 1318 { 1319 struct data_vio *data_vio = as_data_vio(completion); 1320 1321 completion->error_handler = NULL; 1322 data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP; 1323 perform_cleanup_stage(data_vio, 1324 (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL)); 1325 } 1326 1327 static void enter_read_only_mode(struct vdo_completion *completion) 1328 { 1329 if (vdo_is_read_only(completion->vdo)) 1330 return; 1331 1332 if (completion->result != VDO_READ_ONLY) { 1333 struct data_vio *data_vio = as_data_vio(completion); 1334 1335 vdo_log_error_strerror(completion->result, 1336 "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s", 1337 (unsigned long long) data_vio->logical.lbn, 1338 (unsigned long long) data_vio->new_mapped.pbn, 1339 (unsigned long long) data_vio->mapped.pbn, 1340 (unsigned long long) data_vio->allocation.pbn, 1341 get_data_vio_operation_name(data_vio)); 1342 } 1343 1344 vdo_enter_read_only_mode(completion->vdo, completion->result); 1345 } 1346 1347 void handle_data_vio_error(struct vdo_completion *completion) 1348 { 1349 struct data_vio *data_vio = as_data_vio(completion); 1350 1351 if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL)) 1352 enter_read_only_mode(completion); 1353 1354 update_data_vio_error_stats(data_vio); 1355 complete_data_vio(completion); 1356 } 1357 1358 /** 1359 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a 1360 * data_vio. 1361 * @data_vio: The data_vio. 1362 */ 1363 const char *get_data_vio_operation_name(struct data_vio *data_vio) 1364 { 1365 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) != 1366 ARRAY_SIZE(ASYNC_OPERATION_NAMES)); 1367 1368 return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ? 1369 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] : 1370 "unknown async operation"); 1371 } 1372 1373 /** 1374 * data_vio_allocate_data_block() - Allocate a data block. 1375 * @data_vio: The data_vio. 1376 * @write_lock_type: The type of write lock to obtain on the block. 1377 * @callback: The callback which will attempt an allocation in the current zone and continue if it 1378 * succeeds. 1379 * @error_handler: The handler for errors while allocating. 1380 */ 1381 void data_vio_allocate_data_block(struct data_vio *data_vio, 1382 enum pbn_lock_type write_lock_type, 1383 vdo_action_fn callback, vdo_action_fn error_handler) 1384 { 1385 struct allocation *allocation = &data_vio->allocation; 1386 1387 VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK), 1388 "data_vio does not have an allocation"); 1389 allocation->write_lock_type = write_lock_type; 1390 allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone); 1391 allocation->first_allocation_zone = allocation->zone->zone_number; 1392 1393 data_vio->vio.completion.error_handler = error_handler; 1394 launch_data_vio_allocated_zone_callback(data_vio, callback); 1395 } 1396 1397 /** 1398 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block. 1399 * @data_vio: The data_vio. 1400 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten). 1401 * 1402 * If the reference to the locked block is still provisional, it will be released as well. 1403 */ 1404 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset) 1405 { 1406 struct allocation *allocation = &data_vio->allocation; 1407 physical_block_number_t locked_pbn = allocation->pbn; 1408 1409 assert_data_vio_in_allocated_zone(data_vio); 1410 1411 if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock)) 1412 allocation->pbn = VDO_ZERO_BLOCK; 1413 1414 vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn, 1415 vdo_forget(allocation->lock)); 1416 } 1417 1418 /** 1419 * uncompress_data_vio() - Uncompress the data a data_vio has just read. 1420 * @data_vio: The data_vio. 1421 * @mapping_state: The mapping state indicating which fragment to decompress. 1422 * @buffer: The buffer to receive the uncompressed data. 1423 */ 1424 int uncompress_data_vio(struct data_vio *data_vio, 1425 enum block_mapping_state mapping_state, char *buffer) 1426 { 1427 int size; 1428 u16 fragment_offset, fragment_size; 1429 struct compressed_block *block = data_vio->compression.block; 1430 int result = vdo_get_compressed_block_fragment(mapping_state, block, 1431 &fragment_offset, &fragment_size); 1432 1433 if (result != VDO_SUCCESS) { 1434 vdo_log_debug("%s: compressed fragment error %d", __func__, result); 1435 return result; 1436 } 1437 1438 size = LZ4_decompress_safe((block->data + fragment_offset), buffer, 1439 fragment_size, VDO_BLOCK_SIZE); 1440 if (size != VDO_BLOCK_SIZE) { 1441 vdo_log_debug("%s: lz4 error", __func__); 1442 return VDO_INVALID_FRAGMENT; 1443 } 1444 1445 return VDO_SUCCESS; 1446 } 1447 1448 /** 1449 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle. 1450 * @completion: The data_vio which has just finished its read. 1451 * 1452 * This callback is registered in read_block(). 1453 */ 1454 static void modify_for_partial_write(struct vdo_completion *completion) 1455 { 1456 struct data_vio *data_vio = as_data_vio(completion); 1457 char *data = data_vio->vio.data; 1458 struct bio *bio = data_vio->user_bio; 1459 1460 assert_data_vio_on_cpu_thread(data_vio); 1461 1462 if (bio_op(bio) == REQ_OP_DISCARD) { 1463 memset(data + data_vio->offset, '\0', min_t(u32, 1464 data_vio->remaining_discard, 1465 VDO_BLOCK_SIZE - data_vio->offset)); 1466 } else { 1467 copy_from_bio(bio, data + data_vio->offset); 1468 } 1469 1470 data_vio->is_zero = mem_is_zero(data, VDO_BLOCK_SIZE); 1471 data_vio->read = false; 1472 launch_data_vio_logical_callback(data_vio, 1473 continue_data_vio_with_block_map_slot); 1474 } 1475 1476 static void complete_read(struct vdo_completion *completion) 1477 { 1478 struct data_vio *data_vio = as_data_vio(completion); 1479 char *data = data_vio->vio.data; 1480 bool compressed = vdo_is_state_compressed(data_vio->mapped.state); 1481 1482 assert_data_vio_on_cpu_thread(data_vio); 1483 1484 if (compressed) { 1485 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data); 1486 1487 if (result != VDO_SUCCESS) { 1488 continue_data_vio_with_error(data_vio, result); 1489 return; 1490 } 1491 } 1492 1493 if (data_vio->write) { 1494 modify_for_partial_write(completion); 1495 return; 1496 } 1497 1498 if (compressed || data_vio->is_partial) 1499 copy_to_bio(data_vio->user_bio, data + data_vio->offset); 1500 1501 acknowledge_data_vio(data_vio); 1502 complete_data_vio(completion); 1503 } 1504 1505 static void read_endio(struct bio *bio) 1506 { 1507 struct data_vio *data_vio = vio_as_data_vio(bio->bi_private); 1508 int result = blk_status_to_errno(bio->bi_status); 1509 1510 vdo_count_completed_bios(bio); 1511 if (result != VDO_SUCCESS) { 1512 continue_data_vio_with_error(data_vio, result); 1513 return; 1514 } 1515 1516 launch_data_vio_cpu_callback(data_vio, complete_read, 1517 CPU_Q_COMPLETE_READ_PRIORITY); 1518 } 1519 1520 static void complete_zero_read(struct vdo_completion *completion) 1521 { 1522 struct data_vio *data_vio = as_data_vio(completion); 1523 1524 assert_data_vio_on_cpu_thread(data_vio); 1525 1526 if (data_vio->is_partial) { 1527 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE); 1528 if (data_vio->write) { 1529 modify_for_partial_write(completion); 1530 return; 1531 } 1532 } else { 1533 zero_fill_bio(data_vio->user_bio); 1534 } 1535 1536 complete_read(completion); 1537 } 1538 1539 /** 1540 * read_block() - Read a block asynchronously. 1541 * @completion: The data_vio doing the read. 1542 * 1543 * This is the callback registered in read_block_mapping(). 1544 */ 1545 static void read_block(struct vdo_completion *completion) 1546 { 1547 struct data_vio *data_vio = as_data_vio(completion); 1548 struct vio *vio = as_vio(completion); 1549 int result = VDO_SUCCESS; 1550 1551 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { 1552 launch_data_vio_cpu_callback(data_vio, complete_zero_read, 1553 CPU_Q_COMPLETE_VIO_PRIORITY); 1554 return; 1555 } 1556 1557 data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO; 1558 if (vdo_is_state_compressed(data_vio->mapped.state)) { 1559 result = vio_reset_bio(vio, (char *) data_vio->compression.block, 1560 read_endio, REQ_OP_READ, data_vio->mapped.pbn); 1561 } else { 1562 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ); 1563 1564 if (data_vio->is_partial) { 1565 result = vio_reset_bio(vio, vio->data, read_endio, opf, 1566 data_vio->mapped.pbn); 1567 } else { 1568 /* A full 4k read. Use the incoming bio to avoid having to copy the data */ 1569 bio_reset(vio->bio, vio->bio->bi_bdev, opf); 1570 bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio, 1571 data_vio->user_bio, GFP_KERNEL); 1572 1573 /* Copy over the original bio iovec and opflags. */ 1574 vdo_set_bio_properties(vio->bio, vio, read_endio, opf, 1575 data_vio->mapped.pbn); 1576 } 1577 } 1578 1579 if (result != VDO_SUCCESS) { 1580 continue_data_vio_with_error(data_vio, result); 1581 return; 1582 } 1583 1584 vdo_submit_data_vio(data_vio); 1585 } 1586 1587 static inline struct data_vio * 1588 reference_count_update_completion_as_data_vio(struct vdo_completion *completion) 1589 { 1590 if (completion->type == VIO_COMPLETION) 1591 return as_data_vio(completion); 1592 1593 return container_of(completion, struct data_vio, decrement_completion); 1594 } 1595 1596 /** 1597 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has 1598 * made its reference updates. Handle any error from either, or proceed 1599 * to updating the block map. 1600 * @completion: The completion of the write in progress. 1601 */ 1602 static void update_block_map(struct vdo_completion *completion) 1603 { 1604 struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion); 1605 1606 assert_data_vio_in_logical_zone(data_vio); 1607 1608 if (!data_vio->first_reference_operation_complete) { 1609 /* Rendezvous, we're first */ 1610 data_vio->first_reference_operation_complete = true; 1611 return; 1612 } 1613 1614 completion = &data_vio->vio.completion; 1615 vdo_set_completion_result(completion, data_vio->decrement_completion.result); 1616 if (completion->result != VDO_SUCCESS) { 1617 handle_data_vio_error(completion); 1618 return; 1619 } 1620 1621 completion->error_handler = handle_data_vio_error; 1622 if (data_vio->hash_lock != NULL) 1623 set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock); 1624 else 1625 completion->callback = complete_data_vio; 1626 1627 data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK; 1628 vdo_put_mapped_block(data_vio); 1629 } 1630 1631 static void decrement_reference_count(struct vdo_completion *completion) 1632 { 1633 struct data_vio *data_vio = container_of(completion, struct data_vio, 1634 decrement_completion); 1635 1636 assert_data_vio_in_mapped_zone(data_vio); 1637 1638 vdo_set_completion_callback(completion, update_block_map, 1639 data_vio->logical.zone->thread_id); 1640 completion->error_handler = update_block_map; 1641 vdo_modify_reference_count(completion, &data_vio->decrement_updater); 1642 } 1643 1644 static void increment_reference_count(struct vdo_completion *completion) 1645 { 1646 struct data_vio *data_vio = as_data_vio(completion); 1647 1648 assert_data_vio_in_new_mapped_zone(data_vio); 1649 1650 if (data_vio->downgrade_allocation_lock) { 1651 /* 1652 * Now that the data has been written, it's safe to deduplicate against the 1653 * block. Downgrade the allocation lock to a read lock so it can be used later by 1654 * the hash lock. This is done here since it needs to happen sometime before we 1655 * return to the hash zone, and we are currently on the correct thread. For 1656 * compressed blocks, the downgrade will have already been done. 1657 */ 1658 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false); 1659 } 1660 1661 set_data_vio_logical_callback(data_vio, update_block_map); 1662 completion->error_handler = update_block_map; 1663 vdo_modify_reference_count(completion, &data_vio->increment_updater); 1664 } 1665 1666 /** journal_remapping() - Add a recovery journal entry for a data remapping. */ 1667 static void journal_remapping(struct vdo_completion *completion) 1668 { 1669 struct data_vio *data_vio = as_data_vio(completion); 1670 1671 assert_data_vio_in_journal_zone(data_vio); 1672 1673 data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING; 1674 data_vio->decrement_updater.zpbn = data_vio->mapped; 1675 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { 1676 data_vio->first_reference_operation_complete = true; 1677 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) 1678 set_data_vio_logical_callback(data_vio, update_block_map); 1679 } else { 1680 set_data_vio_new_mapped_zone_callback(data_vio, 1681 increment_reference_count); 1682 } 1683 1684 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { 1685 data_vio->first_reference_operation_complete = true; 1686 } else { 1687 vdo_set_completion_callback(&data_vio->decrement_completion, 1688 decrement_reference_count, 1689 data_vio->mapped.zone->thread_id); 1690 } 1691 1692 data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING; 1693 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 1694 } 1695 1696 /** 1697 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write. 1698 * @completion: The data_vio doing the read. 1699 * 1700 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate 1701 * journal entry referencing the removal of this LBN->PBN mapping. 1702 */ 1703 static void read_old_block_mapping(struct vdo_completion *completion) 1704 { 1705 struct data_vio *data_vio = as_data_vio(completion); 1706 1707 assert_data_vio_in_logical_zone(data_vio); 1708 1709 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE; 1710 set_data_vio_journal_callback(data_vio, journal_remapping); 1711 vdo_get_mapped_block(data_vio); 1712 } 1713 1714 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock) 1715 { 1716 data_vio->increment_updater = (struct reference_updater) { 1717 .operation = VDO_JOURNAL_DATA_REMAPPING, 1718 .increment = true, 1719 .zpbn = data_vio->new_mapped, 1720 .lock = lock, 1721 }; 1722 1723 launch_data_vio_logical_callback(data_vio, read_old_block_mapping); 1724 } 1725 1726 /** 1727 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block. 1728 * @completion: The data_vio. 1729 * 1730 * This is the callback registered in launch_compress_data_vio(). 1731 */ 1732 static void pack_compressed_data(struct vdo_completion *completion) 1733 { 1734 struct data_vio *data_vio = as_data_vio(completion); 1735 1736 assert_data_vio_in_packer_zone(data_vio); 1737 1738 if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) || 1739 get_data_vio_compression_status(data_vio).may_not_compress) { 1740 write_data_vio(data_vio); 1741 return; 1742 } 1743 1744 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING; 1745 vdo_attempt_packing(data_vio); 1746 } 1747 1748 /** 1749 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue. 1750 * @completion: The data_vio. 1751 * 1752 * This callback is registered in launch_compress_data_vio(). 1753 */ 1754 static void compress_data_vio(struct vdo_completion *completion) 1755 { 1756 struct data_vio *data_vio = as_data_vio(completion); 1757 int size; 1758 1759 assert_data_vio_on_cpu_thread(data_vio); 1760 1761 /* 1762 * By putting the compressed data at the start of the compressed block data field, we won't 1763 * need to copy it if this data_vio becomes a compressed write agent. 1764 */ 1765 size = LZ4_compress_default(data_vio->vio.data, 1766 data_vio->compression.block->data, VDO_BLOCK_SIZE, 1767 VDO_MAX_COMPRESSED_FRAGMENT_SIZE, 1768 (char *) vdo_get_work_queue_private_data()); 1769 if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) { 1770 data_vio->compression.size = size; 1771 launch_data_vio_packer_callback(data_vio, pack_compressed_data); 1772 return; 1773 } 1774 1775 write_data_vio(data_vio); 1776 } 1777 1778 /** 1779 * launch_compress_data_vio() - Continue a write by attempting to compress the data. 1780 * @data_vio: The data_vio. 1781 * 1782 * This is a re-entry point to vio_write used by hash locks. 1783 */ 1784 void launch_compress_data_vio(struct data_vio *data_vio) 1785 { 1786 VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block"); 1787 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL, 1788 "data_vio to compress has a hash_lock"); 1789 VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio), 1790 "data_vio to compress has an allocation"); 1791 1792 /* 1793 * There are 4 reasons why a data_vio which has reached this point will not be eligible for 1794 * compression: 1795 * 1796 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the 1797 * write request also requests FUA. 1798 * 1799 * 2) A data_vio should not be compressed when compression is disabled for the vdo. 1800 * 1801 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not 1802 * yet been acknowledged and hence blocking in the packer would be bad. 1803 * 1804 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the 1805 * packer would also be bad. 1806 */ 1807 if (data_vio->fua || 1808 !vdo_get_compressing(vdo_from_data_vio(data_vio)) || 1809 ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) || 1810 (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) { 1811 write_data_vio(data_vio); 1812 return; 1813 } 1814 1815 data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO; 1816 launch_data_vio_cpu_callback(data_vio, compress_data_vio, 1817 CPU_Q_COMPRESS_BLOCK_PRIORITY); 1818 } 1819 1820 /** 1821 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record 1822 * name as set). 1823 * @completion: The data_vio. 1824 * 1825 * This callback is registered in prepare_for_dedupe(). 1826 */ 1827 static void hash_data_vio(struct vdo_completion *completion) 1828 { 1829 struct data_vio *data_vio = as_data_vio(completion); 1830 1831 assert_data_vio_on_cpu_thread(data_vio); 1832 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed"); 1833 1834 murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be, 1835 &data_vio->record_name); 1836 1837 data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones, 1838 &data_vio->record_name); 1839 data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK; 1840 launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock); 1841 } 1842 1843 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */ 1844 static void prepare_for_dedupe(struct data_vio *data_vio) 1845 { 1846 /* We don't care what thread we are on. */ 1847 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks"); 1848 1849 /* 1850 * Before we can dedupe, we need to know the record name, so the first 1851 * step is to hash the block data. 1852 */ 1853 data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO; 1854 launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY); 1855 } 1856 1857 /** 1858 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called 1859 * when a data_vio's write to the underlying storage has completed. 1860 * @bio: The bio to update. 1861 */ 1862 static void write_bio_finished(struct bio *bio) 1863 { 1864 struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private); 1865 1866 vdo_count_completed_bios(bio); 1867 vdo_set_completion_result(&data_vio->vio.completion, 1868 blk_status_to_errno(bio->bi_status)); 1869 data_vio->downgrade_allocation_lock = true; 1870 update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock); 1871 } 1872 1873 /** write_data_vio() - Write a data block to storage without compression. */ 1874 void write_data_vio(struct data_vio *data_vio) 1875 { 1876 struct data_vio_compression_status status, new_status; 1877 int result; 1878 1879 if (!data_vio_has_allocation(data_vio)) { 1880 /* 1881 * There was no space to write this block and we failed to deduplicate or compress 1882 * it. 1883 */ 1884 continue_data_vio_with_error(data_vio, VDO_NO_SPACE); 1885 return; 1886 } 1887 1888 new_status = (struct data_vio_compression_status) { 1889 .stage = DATA_VIO_POST_PACKER, 1890 .may_not_compress = true, 1891 }; 1892 1893 do { 1894 status = get_data_vio_compression_status(data_vio); 1895 } while ((status.stage != DATA_VIO_POST_PACKER) && 1896 !set_data_vio_compression_status(data_vio, status, new_status)); 1897 1898 /* Write the data from the data block buffer. */ 1899 result = vio_reset_bio(&data_vio->vio, data_vio->vio.data, 1900 write_bio_finished, REQ_OP_WRITE, 1901 data_vio->allocation.pbn); 1902 if (result != VDO_SUCCESS) { 1903 continue_data_vio_with_error(data_vio, result); 1904 return; 1905 } 1906 1907 data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO; 1908 vdo_submit_data_vio(data_vio); 1909 } 1910 1911 /** 1912 * acknowledge_write_callback() - Acknowledge a write to the requestor. 1913 * @completion: The data_vio. 1914 * 1915 * This callback is registered in allocate_block() and continue_write_with_block_map_slot(). 1916 */ 1917 static void acknowledge_write_callback(struct vdo_completion *completion) 1918 { 1919 struct data_vio *data_vio = as_data_vio(completion); 1920 struct vdo *vdo = completion->vdo; 1921 1922 VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) || 1923 (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)), 1924 "%s() called on bio ack queue", __func__); 1925 VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio), 1926 "write VIO to be acknowledged has a flush generation lock"); 1927 acknowledge_data_vio(data_vio); 1928 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { 1929 /* This is a zero write or discard */ 1930 update_metadata_for_data_vio_write(data_vio, NULL); 1931 return; 1932 } 1933 1934 prepare_for_dedupe(data_vio); 1935 } 1936 1937 /** 1938 * allocate_block() - Attempt to allocate a block in the current allocation zone. 1939 * @completion: The data_vio. 1940 * 1941 * This callback is registered in continue_write_with_block_map_slot(). 1942 */ 1943 static void allocate_block(struct vdo_completion *completion) 1944 { 1945 struct data_vio *data_vio = as_data_vio(completion); 1946 1947 assert_data_vio_in_allocated_zone(data_vio); 1948 1949 if (!vdo_allocate_block_in_zone(data_vio)) 1950 return; 1951 1952 completion->error_handler = handle_data_vio_error; 1953 WRITE_ONCE(data_vio->allocation_succeeded, true); 1954 data_vio->new_mapped = (struct zoned_pbn) { 1955 .zone = data_vio->allocation.zone, 1956 .pbn = data_vio->allocation.pbn, 1957 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 1958 }; 1959 1960 if (data_vio->fua || 1961 data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) { 1962 prepare_for_dedupe(data_vio); 1963 return; 1964 } 1965 1966 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; 1967 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback); 1968 } 1969 1970 /** 1971 * handle_allocation_error() - Handle an error attempting to allocate a block. 1972 * @completion: The data_vio. 1973 * 1974 * This error handler is registered in continue_write_with_block_map_slot(). 1975 */ 1976 static void handle_allocation_error(struct vdo_completion *completion) 1977 { 1978 struct data_vio *data_vio = as_data_vio(completion); 1979 1980 if (completion->result == VDO_NO_SPACE) { 1981 /* We failed to get an allocation, but we can try to dedupe. */ 1982 vdo_reset_completion(completion); 1983 completion->error_handler = handle_data_vio_error; 1984 prepare_for_dedupe(data_vio); 1985 return; 1986 } 1987 1988 /* We got a "real" error, not just a failure to allocate, so fail the request. */ 1989 handle_data_vio_error(completion); 1990 } 1991 1992 static int assert_is_discard(struct data_vio *data_vio) 1993 { 1994 int result = VDO_ASSERT(data_vio->is_discard, 1995 "data_vio with no block map page is a discard"); 1996 1997 return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY); 1998 } 1999 2000 /** 2001 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map. 2002 * @completion: The data_vio to continue. 2003 * 2004 * This callback is registered in launch_read_data_vio(). 2005 */ 2006 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion) 2007 { 2008 struct data_vio *data_vio = as_data_vio(completion); 2009 2010 assert_data_vio_in_logical_zone(data_vio); 2011 if (data_vio->read) { 2012 set_data_vio_logical_callback(data_vio, read_block); 2013 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ; 2014 vdo_get_mapped_block(data_vio); 2015 return; 2016 } 2017 2018 vdo_acquire_flush_generation_lock(data_vio); 2019 2020 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 2021 /* 2022 * This is a discard for a block on a block map page which has not been allocated, so 2023 * there's nothing more we need to do. 2024 */ 2025 completion->callback = complete_data_vio; 2026 continue_data_vio_with_error(data_vio, assert_is_discard(data_vio)); 2027 return; 2028 } 2029 2030 /* 2031 * We need an allocation if this is neither a full-block discard nor a 2032 * full-block zero write. 2033 */ 2034 if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) { 2035 data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block, 2036 handle_allocation_error); 2037 return; 2038 } 2039 2040 /* 2041 * We don't need to write any data, so skip allocation and just update the block map and 2042 * reference counts (via the journal). 2043 */ 2044 data_vio->new_mapped.pbn = VDO_ZERO_BLOCK; 2045 if (data_vio->is_zero) 2046 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED; 2047 2048 if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) { 2049 /* This is not the final block of a discard so we can't acknowledge it yet. */ 2050 update_metadata_for_data_vio_write(data_vio, NULL); 2051 return; 2052 } 2053 2054 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; 2055 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback); 2056 } 2057