1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "data-vio.h" 7 8 #include <linux/atomic.h> 9 #include <linux/bio.h> 10 #include <linux/blkdev.h> 11 #include <linux/delay.h> 12 #include <linux/device-mapper.h> 13 #include <linux/jiffies.h> 14 #include <linux/kernel.h> 15 #include <linux/list.h> 16 #include <linux/lz4.h> 17 #include <linux/minmax.h> 18 #include <linux/sched.h> 19 #include <linux/spinlock.h> 20 #include <linux/wait.h> 21 22 #include "logger.h" 23 #include "memory-alloc.h" 24 #include "murmurhash3.h" 25 #include "permassert.h" 26 27 #include "block-map.h" 28 #include "dump.h" 29 #include "encodings.h" 30 #include "int-map.h" 31 #include "io-submitter.h" 32 #include "logical-zone.h" 33 #include "packer.h" 34 #include "recovery-journal.h" 35 #include "slab-depot.h" 36 #include "status-codes.h" 37 #include "types.h" 38 #include "vdo.h" 39 #include "vio.h" 40 #include "wait-queue.h" 41 42 /** 43 * DOC: Bio flags. 44 * 45 * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those 46 * flags on our own bio(s) for that request may help underlying layers better fulfill the user 47 * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other 48 * flags, as they convey incorrect information. 49 * 50 * These flags are always irrelevant if we have already finished the user bio as they are only 51 * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how 52 * important finishing the finished bio was. 53 * 54 * Note that bio.c contains the complete list of flags we believe may be set; the following list 55 * explains the action taken with each of those flags VDO could receive: 56 * 57 * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio 58 * completion is required for further work to be done by the issuer. 59 * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer 60 * treats it as more urgent, similar to REQ_SYNC. 61 * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is 62 * important. 63 * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO. 64 * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't 65 * match incoming IO, so this flag is incorrect for it. 66 * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise. 67 * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance. 68 * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled 69 * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load 70 * prioritization. 71 */ 72 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD); 73 74 /** 75 * DOC: 76 * 77 * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For 78 * correctness, and in order to avoid potentially expensive or blocking memory allocations during 79 * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order 80 * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for 81 * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios 82 * for which a data_vio or discard permit are not available will block until the necessary 83 * resources are available. The pool is also responsible for distributing resources to blocked 84 * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by 85 * performing the work of actually assigning resources to blocked threads or placing data_vios back 86 * into the pool on a single cpu at a time. 87 * 88 * The pool contains two "limiters", one for tracking data_vios and one for tracking discard 89 * permits. The limiters also provide safe cross-thread access to pool statistics without the need 90 * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to 91 * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources 92 * are available, the incoming bio will be assigned to the acquired data_vio, and it will be 93 * launched. However, if either of these are unavailable, the arrival time of the bio is recorded 94 * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate 95 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will 96 * break if jiffies are only 32 bits.) 97 * 98 * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio() 99 * will be called on it. This function will add the data_vio to a funnel queue, and then check the 100 * state of the pool. If the pool is not currently processing released data_vios, the pool's 101 * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to 102 * hold the pool's lock, and also batches release work while avoiding starvation of the cpu 103 * threads. 104 * 105 * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which 106 * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For 107 * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there 108 * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the 109 * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting 110 * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool. 111 * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or 112 * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the 113 * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit 114 * them are awakened. 115 */ 116 117 #define DATA_VIO_RELEASE_BATCH_SIZE 128 118 119 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1; 120 static const u32 COMPRESSION_STATUS_MASK = 0xff; 121 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000; 122 123 struct limiter; 124 typedef void (*assigner_fn)(struct limiter *limiter); 125 126 /* Bookkeeping structure for a single type of resource. */ 127 struct limiter { 128 /* The data_vio_pool to which this limiter belongs */ 129 struct data_vio_pool *pool; 130 /* The maximum number of data_vios available */ 131 data_vio_count_t limit; 132 /* The number of resources in use */ 133 data_vio_count_t busy; 134 /* The maximum number of resources ever simultaneously in use */ 135 data_vio_count_t max_busy; 136 /* The number of resources to release */ 137 data_vio_count_t release_count; 138 /* The number of waiters to wake */ 139 data_vio_count_t wake_count; 140 /* The list of waiting bios which are known to process_release_callback() */ 141 struct bio_list waiters; 142 /* The list of waiting bios which are not yet known to process_release_callback() */ 143 struct bio_list new_waiters; 144 /* The list of waiters which have their permits */ 145 struct bio_list *permitted_waiters; 146 /* The function for assigning a resource to a waiter */ 147 assigner_fn assigner; 148 /* The queue of blocked threads */ 149 wait_queue_head_t blocked_threads; 150 /* The arrival time of the eldest waiter */ 151 u64 arrival; 152 }; 153 154 /* 155 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread, 156 * and are released in batches. 157 */ 158 struct data_vio_pool { 159 /* Completion for scheduling releases */ 160 struct vdo_completion completion; 161 /* The administrative state of the pool */ 162 struct admin_state state; 163 /* Lock protecting the pool */ 164 spinlock_t lock; 165 /* The main limiter controlling the total data_vios in the pool. */ 166 struct limiter limiter; 167 /* The limiter controlling data_vios for discard */ 168 struct limiter discard_limiter; 169 /* The list of bios which have discard permits but still need a data_vio */ 170 struct bio_list permitted_discards; 171 /* The list of available data_vios */ 172 struct list_head available; 173 /* The queue of data_vios waiting to be returned to the pool */ 174 struct funnel_queue *queue; 175 /* Whether the pool is processing, or scheduled to process releases */ 176 atomic_t processing; 177 /* The data vios in the pool */ 178 struct data_vio data_vios[]; 179 }; 180 181 static const char * const ASYNC_OPERATION_NAMES[] = { 182 "launch", 183 "acknowledge_write", 184 "acquire_hash_lock", 185 "attempt_logical_block_lock", 186 "lock_duplicate_pbn", 187 "check_for_duplication", 188 "cleanup", 189 "compress_data_vio", 190 "find_block_map_slot", 191 "get_mapped_block_for_read", 192 "get_mapped_block_for_write", 193 "hash_data_vio", 194 "journal_remapping", 195 "vdo_attempt_packing", 196 "put_mapped_block", 197 "read_data_vio", 198 "update_dedupe_index", 199 "update_reference_counts", 200 "verify_duplication", 201 "write_data_vio", 202 }; 203 204 /* The steps taken cleaning up a VIO, in the order they are performed. */ 205 enum data_vio_cleanup_stage { 206 VIO_CLEANUP_START, 207 VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START, 208 VIO_RELEASE_ALLOCATED, 209 VIO_RELEASE_RECOVERY_LOCKS, 210 VIO_RELEASE_LOGICAL, 211 VIO_CLEANUP_DONE 212 }; 213 214 static inline struct data_vio_pool * __must_check 215 as_data_vio_pool(struct vdo_completion *completion) 216 { 217 vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION); 218 return container_of(completion, struct data_vio_pool, completion); 219 } 220 221 static inline u64 get_arrival_time(struct bio *bio) 222 { 223 return (u64) bio->bi_private; 224 } 225 226 /** 227 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios 228 * or waiters while holding the pool's lock. 229 */ 230 static bool check_for_drain_complete_locked(struct data_vio_pool *pool) 231 { 232 if (pool->limiter.busy > 0) 233 return false; 234 235 VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0), 236 "no outstanding discard permits"); 237 238 return (bio_list_empty(&pool->limiter.new_waiters) && 239 bio_list_empty(&pool->discard_limiter.new_waiters)); 240 } 241 242 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn) 243 { 244 struct vdo *vdo = vdo_from_data_vio(data_vio); 245 zone_count_t zone_number; 246 struct lbn_lock *lock = &data_vio->logical; 247 248 lock->lbn = lbn; 249 lock->locked = false; 250 vdo_waitq_init(&lock->waiters); 251 zone_number = vdo_compute_logical_zone(data_vio); 252 lock->zone = &vdo->logical_zones->zones[zone_number]; 253 } 254 255 static void launch_locked_request(struct data_vio *data_vio) 256 { 257 data_vio->logical.locked = true; 258 if (data_vio->write) { 259 struct vdo *vdo = vdo_from_data_vio(data_vio); 260 261 if (vdo_is_read_only(vdo)) { 262 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 263 return; 264 } 265 } 266 267 data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT; 268 vdo_find_block_map_slot(data_vio); 269 } 270 271 static void acknowledge_data_vio(struct data_vio *data_vio) 272 { 273 struct vdo *vdo = vdo_from_data_vio(data_vio); 274 struct bio *bio = data_vio->user_bio; 275 int error = vdo_status_to_errno(data_vio->vio.completion.result); 276 277 if (bio == NULL) 278 return; 279 280 VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <= 281 (u32) (VDO_BLOCK_SIZE - data_vio->offset)), 282 "data_vio to acknowledge is not an incomplete discard"); 283 284 data_vio->user_bio = NULL; 285 vdo_count_bios(&vdo->stats.bios_acknowledged, bio); 286 if (data_vio->is_partial) 287 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio); 288 289 bio->bi_status = errno_to_blk_status(error); 290 bio_endio(bio); 291 } 292 293 static void copy_to_bio(struct bio *bio, char *data_ptr) 294 { 295 struct bio_vec biovec; 296 struct bvec_iter iter; 297 298 bio_for_each_segment(biovec, bio, iter) { 299 memcpy_to_bvec(&biovec, data_ptr); 300 data_ptr += biovec.bv_len; 301 } 302 } 303 304 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio) 305 { 306 u32 packed = atomic_read(&data_vio->compression.status); 307 308 /* pairs with cmpxchg in set_data_vio_compression_status */ 309 smp_rmb(); 310 return (struct data_vio_compression_status) { 311 .stage = packed & COMPRESSION_STATUS_MASK, 312 .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0), 313 }; 314 } 315 316 /** 317 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored 318 * atomically. 319 * @status: The state to convert. 320 * 321 * Return: The compression state packed into a u32. 322 */ 323 static u32 __must_check pack_status(struct data_vio_compression_status status) 324 { 325 return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0); 326 } 327 328 /** 329 * set_data_vio_compression_status() - Set the compression status of a data_vio. 330 * @data_vio: The data_vio to change. 331 * @status: The expected current status of the data_vio. 332 * @new_status: The status to set. 333 * 334 * Return: true if the new status was set, false if the data_vio's compression status did not 335 * match the expected state, and so was left unchanged. 336 */ 337 static bool __must_check 338 set_data_vio_compression_status(struct data_vio *data_vio, 339 struct data_vio_compression_status status, 340 struct data_vio_compression_status new_status) 341 { 342 u32 actual; 343 u32 expected = pack_status(status); 344 u32 replacement = pack_status(new_status); 345 346 /* 347 * Extra barriers because this was original developed using a CAS operation that implicitly 348 * had them. 349 */ 350 smp_mb__before_atomic(); 351 actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement); 352 /* same as before_atomic */ 353 smp_mb__after_atomic(); 354 return (expected == actual); 355 } 356 357 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio) 358 { 359 for (;;) { 360 struct data_vio_compression_status status = 361 get_data_vio_compression_status(data_vio); 362 struct data_vio_compression_status new_status = status; 363 364 if (status.stage == DATA_VIO_POST_PACKER) { 365 /* We're already in the last stage. */ 366 return status; 367 } 368 369 if (status.may_not_compress) { 370 /* 371 * Compression has been dis-allowed for this VIO, so skip the rest of the 372 * path and go to the end. 373 */ 374 new_status.stage = DATA_VIO_POST_PACKER; 375 } else { 376 /* Go to the next state. */ 377 new_status.stage++; 378 } 379 380 if (set_data_vio_compression_status(data_vio, status, new_status)) 381 return new_status; 382 383 /* Another thread changed the status out from under us so try again. */ 384 } 385 } 386 387 /** 388 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed. 389 * 390 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it. 391 */ 392 bool cancel_data_vio_compression(struct data_vio *data_vio) 393 { 394 struct data_vio_compression_status status, new_status; 395 396 for (;;) { 397 status = get_data_vio_compression_status(data_vio); 398 if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) { 399 /* This data_vio is already set up to not block in the packer. */ 400 break; 401 } 402 403 new_status.stage = status.stage; 404 new_status.may_not_compress = true; 405 406 if (set_data_vio_compression_status(data_vio, status, new_status)) 407 break; 408 } 409 410 return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress); 411 } 412 413 /** 414 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block. 415 * @completion: The data_vio for an external data request as a completion. 416 * 417 * This is the start of the path for all external requests. It is registered in launch_data_vio(). 418 */ 419 static void attempt_logical_block_lock(struct vdo_completion *completion) 420 { 421 struct data_vio *data_vio = as_data_vio(completion); 422 struct lbn_lock *lock = &data_vio->logical; 423 struct vdo *vdo = vdo_from_data_vio(data_vio); 424 struct data_vio *lock_holder; 425 int result; 426 427 assert_data_vio_in_logical_zone(data_vio); 428 429 if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) { 430 continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE); 431 return; 432 } 433 434 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, 435 data_vio, false, (void **) &lock_holder); 436 if (result != VDO_SUCCESS) { 437 continue_data_vio_with_error(data_vio, result); 438 return; 439 } 440 441 if (lock_holder == NULL) { 442 /* We got the lock */ 443 launch_locked_request(data_vio); 444 return; 445 } 446 447 result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held"); 448 if (result != VDO_SUCCESS) { 449 continue_data_vio_with_error(data_vio, result); 450 return; 451 } 452 453 /* 454 * If the new request is a pure read request (not read-modify-write) and the lock_holder is 455 * writing and has received an allocation, service the read request immediately by copying 456 * data from the lock_holder to avoid having to flush the write out of the packer just to 457 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an 458 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in 459 * order to prevent returning data that may not have actually been written. 460 */ 461 if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) { 462 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset); 463 acknowledge_data_vio(data_vio); 464 complete_data_vio(completion); 465 return; 466 } 467 468 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK; 469 vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter); 470 471 /* 472 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the 473 * packer. 474 */ 475 if (lock_holder->write && cancel_data_vio_compression(lock_holder)) { 476 data_vio->compression.lock_holder = lock_holder; 477 launch_data_vio_packer_callback(data_vio, 478 vdo_remove_lock_holder_from_packer); 479 } 480 } 481 482 /** 483 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the 484 * same parent and other state and send it on its way. 485 */ 486 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn) 487 { 488 struct vdo_completion *completion = &data_vio->vio.completion; 489 490 /* 491 * Clearing the tree lock must happen before initializing the LBN lock, which also adds 492 * information to the tree lock. 493 */ 494 memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock)); 495 initialize_lbn_lock(data_vio, lbn); 496 INIT_LIST_HEAD(&data_vio->hash_lock_entry); 497 INIT_LIST_HEAD(&data_vio->write_entry); 498 499 memset(&data_vio->allocation, 0, sizeof(data_vio->allocation)); 500 501 data_vio->is_duplicate = false; 502 503 memset(&data_vio->record_name, 0, sizeof(data_vio->record_name)); 504 memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate)); 505 vdo_reset_completion(&data_vio->decrement_completion); 506 vdo_reset_completion(completion); 507 completion->error_handler = handle_data_vio_error; 508 set_data_vio_logical_callback(data_vio, attempt_logical_block_lock); 509 vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY); 510 } 511 512 static bool is_zero_block(char *block) 513 { 514 int i; 515 516 for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) { 517 if (*((u64 *) &block[i])) 518 return false; 519 } 520 521 return true; 522 } 523 524 static void copy_from_bio(struct bio *bio, char *data_ptr) 525 { 526 struct bio_vec biovec; 527 struct bvec_iter iter; 528 529 bio_for_each_segment(biovec, bio, iter) { 530 memcpy_from_bvec(data_ptr, &biovec); 531 data_ptr += biovec.bv_len; 532 } 533 } 534 535 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio) 536 { 537 logical_block_number_t lbn; 538 /* 539 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to 540 * separately allocated objects). 541 */ 542 memset(data_vio, 0, offsetof(struct data_vio, vio)); 543 memset(&data_vio->compression, 0, offsetof(struct compression_state, block)); 544 545 data_vio->user_bio = bio; 546 data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK); 547 data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0); 548 549 /* 550 * Discards behave very differently than other requests when coming in from device-mapper. 551 * We have to be able to handle any size discards and various sector offsets within a 552 * block. 553 */ 554 if (bio_op(bio) == REQ_OP_DISCARD) { 555 data_vio->remaining_discard = bio->bi_iter.bi_size; 556 data_vio->write = true; 557 data_vio->is_discard = true; 558 if (data_vio->is_partial) { 559 vdo_count_bios(&vdo->stats.bios_in_partial, bio); 560 data_vio->read = true; 561 } 562 } else if (data_vio->is_partial) { 563 vdo_count_bios(&vdo->stats.bios_in_partial, bio); 564 data_vio->read = true; 565 if (bio_data_dir(bio) == WRITE) 566 data_vio->write = true; 567 } else if (bio_data_dir(bio) == READ) { 568 data_vio->read = true; 569 } else { 570 /* 571 * Copy the bio data to a char array so that we can continue to use the data after 572 * we acknowledge the bio. 573 */ 574 copy_from_bio(bio, data_vio->vio.data); 575 data_vio->is_zero = is_zero_block(data_vio->vio.data); 576 data_vio->write = true; 577 } 578 579 if (data_vio->user_bio->bi_opf & REQ_FUA) 580 data_vio->fua = true; 581 582 lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK; 583 launch_data_vio(data_vio, lbn); 584 } 585 586 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio) 587 { 588 struct bio *bio = bio_list_pop(limiter->permitted_waiters); 589 590 launch_bio(limiter->pool->completion.vdo, data_vio, bio); 591 limiter->wake_count++; 592 593 bio = bio_list_peek(limiter->permitted_waiters); 594 limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio)); 595 } 596 597 static void assign_discard_permit(struct limiter *limiter) 598 { 599 struct bio *bio = bio_list_pop(&limiter->waiters); 600 601 if (limiter->arrival == U64_MAX) 602 limiter->arrival = get_arrival_time(bio); 603 604 bio_list_add(limiter->permitted_waiters, bio); 605 } 606 607 static void get_waiters(struct limiter *limiter) 608 { 609 bio_list_merge_init(&limiter->waiters, &limiter->new_waiters); 610 } 611 612 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool) 613 { 614 struct data_vio *data_vio = 615 list_first_entry(&pool->available, struct data_vio, pool_entry); 616 617 list_del_init(&data_vio->pool_entry); 618 return data_vio; 619 } 620 621 static void assign_data_vio_to_waiter(struct limiter *limiter) 622 { 623 assign_data_vio(limiter, get_available_data_vio(limiter->pool)); 624 } 625 626 static void update_limiter(struct limiter *limiter) 627 { 628 struct bio_list *waiters = &limiter->waiters; 629 data_vio_count_t available = limiter->limit - limiter->busy; 630 631 VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy), 632 "Release count %u is not more than busy count %u", 633 limiter->release_count, limiter->busy); 634 635 get_waiters(limiter); 636 for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--) 637 limiter->assigner(limiter); 638 639 if (limiter->release_count > 0) { 640 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count); 641 limiter->release_count = 0; 642 return; 643 } 644 645 for (; (available > 0) && !bio_list_empty(waiters); available--) 646 limiter->assigner(limiter); 647 648 WRITE_ONCE(limiter->busy, limiter->limit - available); 649 if (limiter->max_busy < limiter->busy) 650 WRITE_ONCE(limiter->max_busy, limiter->busy); 651 } 652 653 /** 654 * schedule_releases() - Ensure that release processing is scheduled. 655 * 656 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already 657 * done so. 658 */ 659 static void schedule_releases(struct data_vio_pool *pool) 660 { 661 /* Pairs with the barrier in process_release_callback(). */ 662 smp_mb__before_atomic(); 663 if (atomic_cmpxchg(&pool->processing, false, true)) 664 return; 665 666 pool->completion.requeue = true; 667 vdo_launch_completion_with_priority(&pool->completion, 668 CPU_Q_COMPLETE_VIO_PRIORITY); 669 } 670 671 static void reuse_or_release_resources(struct data_vio_pool *pool, 672 struct data_vio *data_vio, 673 struct list_head *returned) 674 { 675 if (data_vio->remaining_discard > 0) { 676 if (bio_list_empty(&pool->discard_limiter.waiters)) { 677 /* Return the data_vio's discard permit. */ 678 pool->discard_limiter.release_count++; 679 } else { 680 assign_discard_permit(&pool->discard_limiter); 681 } 682 } 683 684 if (pool->limiter.arrival < pool->discard_limiter.arrival) { 685 assign_data_vio(&pool->limiter, data_vio); 686 } else if (pool->discard_limiter.arrival < U64_MAX) { 687 assign_data_vio(&pool->discard_limiter, data_vio); 688 } else { 689 list_add(&data_vio->pool_entry, returned); 690 pool->limiter.release_count++; 691 } 692 } 693 694 /** 695 * process_release_callback() - Process a batch of data_vio releases. 696 * @completion: The pool with data_vios to release. 697 */ 698 static void process_release_callback(struct vdo_completion *completion) 699 { 700 struct data_vio_pool *pool = as_data_vio_pool(completion); 701 bool reschedule; 702 bool drained; 703 data_vio_count_t processed; 704 data_vio_count_t to_wake; 705 data_vio_count_t discards_to_wake; 706 LIST_HEAD(returned); 707 708 spin_lock(&pool->lock); 709 get_waiters(&pool->discard_limiter); 710 get_waiters(&pool->limiter); 711 spin_unlock(&pool->lock); 712 713 if (pool->limiter.arrival == U64_MAX) { 714 struct bio *bio = bio_list_peek(&pool->limiter.waiters); 715 716 if (bio != NULL) 717 pool->limiter.arrival = get_arrival_time(bio); 718 } 719 720 for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) { 721 struct data_vio *data_vio; 722 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue); 723 724 if (entry == NULL) 725 break; 726 727 data_vio = as_data_vio(container_of(entry, struct vdo_completion, 728 work_queue_entry_link)); 729 acknowledge_data_vio(data_vio); 730 reuse_or_release_resources(pool, data_vio, &returned); 731 } 732 733 spin_lock(&pool->lock); 734 /* 735 * There is a race where waiters could be added while we are in the unlocked section above. 736 * Those waiters could not see the resources we are now about to release, so we assign 737 * those resources now as we have no guarantee of being rescheduled. This is handled in 738 * update_limiter(). 739 */ 740 update_limiter(&pool->discard_limiter); 741 list_splice(&returned, &pool->available); 742 update_limiter(&pool->limiter); 743 to_wake = pool->limiter.wake_count; 744 pool->limiter.wake_count = 0; 745 discards_to_wake = pool->discard_limiter.wake_count; 746 pool->discard_limiter.wake_count = 0; 747 748 atomic_set(&pool->processing, false); 749 /* Pairs with the barrier in schedule_releases(). */ 750 smp_mb(); 751 752 reschedule = !vdo_is_funnel_queue_empty(pool->queue); 753 drained = (!reschedule && 754 vdo_is_state_draining(&pool->state) && 755 check_for_drain_complete_locked(pool)); 756 spin_unlock(&pool->lock); 757 758 if (to_wake > 0) 759 wake_up_nr(&pool->limiter.blocked_threads, to_wake); 760 761 if (discards_to_wake > 0) 762 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake); 763 764 if (reschedule) 765 schedule_releases(pool); 766 else if (drained) 767 vdo_finish_draining(&pool->state); 768 } 769 770 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool, 771 assigner_fn assigner, data_vio_count_t limit) 772 { 773 limiter->pool = pool; 774 limiter->assigner = assigner; 775 limiter->limit = limit; 776 limiter->arrival = U64_MAX; 777 init_waitqueue_head(&limiter->blocked_threads); 778 } 779 780 /** 781 * initialize_data_vio() - Allocate the components of a data_vio. 782 * 783 * The caller is responsible for cleaning up the data_vio on error. 784 * 785 * Return: VDO_SUCCESS or an error. 786 */ 787 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo) 788 { 789 struct bio *bio; 790 int result; 791 792 BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE); 793 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data", 794 &data_vio->vio.data); 795 if (result != VDO_SUCCESS) 796 return vdo_log_error_strerror(result, 797 "data_vio data allocation failure"); 798 799 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block", 800 &data_vio->compression.block); 801 if (result != VDO_SUCCESS) { 802 return vdo_log_error_strerror(result, 803 "data_vio compressed block allocation failure"); 804 } 805 806 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch", 807 &data_vio->scratch_block); 808 if (result != VDO_SUCCESS) 809 return vdo_log_error_strerror(result, 810 "data_vio scratch allocation failure"); 811 812 result = vdo_create_bio(&bio); 813 if (result != VDO_SUCCESS) 814 return vdo_log_error_strerror(result, 815 "data_vio data bio allocation failure"); 816 817 vdo_initialize_completion(&data_vio->decrement_completion, vdo, 818 VDO_DECREMENT_COMPLETION); 819 initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo); 820 821 return VDO_SUCCESS; 822 } 823 824 static void destroy_data_vio(struct data_vio *data_vio) 825 { 826 if (data_vio == NULL) 827 return; 828 829 vdo_free_bio(vdo_forget(data_vio->vio.bio)); 830 vdo_free(vdo_forget(data_vio->vio.data)); 831 vdo_free(vdo_forget(data_vio->compression.block)); 832 vdo_free(vdo_forget(data_vio->scratch_block)); 833 } 834 835 /** 836 * make_data_vio_pool() - Initialize a data_vio pool. 837 * @vdo: The vdo to which the pool will belong. 838 * @pool_size: The number of data_vios in the pool. 839 * @discard_limit: The maximum number of data_vios which may be used for discards. 840 * @pool_ptr: A pointer to hold the newly allocated pool. 841 */ 842 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size, 843 data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr) 844 { 845 int result; 846 struct data_vio_pool *pool; 847 data_vio_count_t i; 848 849 result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio, 850 __func__, &pool); 851 if (result != VDO_SUCCESS) 852 return result; 853 854 VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size), 855 "discard limit does not exceed pool size"); 856 initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit, 857 discard_limit); 858 pool->discard_limiter.permitted_waiters = &pool->permitted_discards; 859 initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size); 860 pool->limiter.permitted_waiters = &pool->limiter.waiters; 861 INIT_LIST_HEAD(&pool->available); 862 spin_lock_init(&pool->lock); 863 vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 864 vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION); 865 vdo_prepare_completion(&pool->completion, process_release_callback, 866 process_release_callback, vdo->thread_config.cpu_thread, 867 NULL); 868 869 result = vdo_make_funnel_queue(&pool->queue); 870 if (result != VDO_SUCCESS) { 871 free_data_vio_pool(vdo_forget(pool)); 872 return result; 873 } 874 875 for (i = 0; i < pool_size; i++) { 876 struct data_vio *data_vio = &pool->data_vios[i]; 877 878 result = initialize_data_vio(data_vio, vdo); 879 if (result != VDO_SUCCESS) { 880 destroy_data_vio(data_vio); 881 free_data_vio_pool(pool); 882 return result; 883 } 884 885 list_add(&data_vio->pool_entry, &pool->available); 886 } 887 888 *pool_ptr = pool; 889 return VDO_SUCCESS; 890 } 891 892 /** 893 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it. 894 * 895 * All data_vios must be returned to the pool before calling this function. 896 */ 897 void free_data_vio_pool(struct data_vio_pool *pool) 898 { 899 struct data_vio *data_vio, *tmp; 900 901 if (pool == NULL) 902 return; 903 904 /* 905 * Pairs with the barrier in process_release_callback(). Possibly not needed since it 906 * caters to an enqueue vs. free race. 907 */ 908 smp_mb(); 909 BUG_ON(atomic_read(&pool->processing)); 910 911 spin_lock(&pool->lock); 912 VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0), 913 "data_vio pool must not have %u busy entries when being freed", 914 pool->limiter.busy); 915 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) && 916 bio_list_empty(&pool->limiter.new_waiters)), 917 "data_vio pool must not have threads waiting to read or write when being freed"); 918 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) && 919 bio_list_empty(&pool->discard_limiter.new_waiters)), 920 "data_vio pool must not have threads waiting to discard when being freed"); 921 spin_unlock(&pool->lock); 922 923 list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) { 924 list_del_init(&data_vio->pool_entry); 925 destroy_data_vio(data_vio); 926 } 927 928 vdo_free_funnel_queue(vdo_forget(pool->queue)); 929 vdo_free(pool); 930 } 931 932 static bool acquire_permit(struct limiter *limiter) 933 { 934 if (limiter->busy >= limiter->limit) 935 return false; 936 937 WRITE_ONCE(limiter->busy, limiter->busy + 1); 938 if (limiter->max_busy < limiter->busy) 939 WRITE_ONCE(limiter->max_busy, limiter->busy); 940 return true; 941 } 942 943 static void wait_permit(struct limiter *limiter, struct bio *bio) 944 __releases(&limiter->pool->lock) 945 { 946 DEFINE_WAIT(wait); 947 948 bio_list_add(&limiter->new_waiters, bio); 949 prepare_to_wait_exclusive(&limiter->blocked_threads, &wait, 950 TASK_UNINTERRUPTIBLE); 951 spin_unlock(&limiter->pool->lock); 952 io_schedule(); 953 finish_wait(&limiter->blocked_threads, &wait); 954 } 955 956 /** 957 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it. 958 * 959 * This will block if data_vios or discard permits are not available. 960 */ 961 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio) 962 { 963 struct data_vio *data_vio; 964 965 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state), 966 "data_vio_pool not quiescent on acquire"); 967 968 bio->bi_private = (void *) jiffies; 969 spin_lock(&pool->lock); 970 if ((bio_op(bio) == REQ_OP_DISCARD) && 971 !acquire_permit(&pool->discard_limiter)) { 972 wait_permit(&pool->discard_limiter, bio); 973 return; 974 } 975 976 if (!acquire_permit(&pool->limiter)) { 977 wait_permit(&pool->limiter, bio); 978 return; 979 } 980 981 data_vio = get_available_data_vio(pool); 982 spin_unlock(&pool->lock); 983 launch_bio(pool->completion.vdo, data_vio, bio); 984 } 985 986 /* Implements vdo_admin_initiator_fn. */ 987 static void initiate_drain(struct admin_state *state) 988 { 989 bool drained; 990 struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state); 991 992 spin_lock(&pool->lock); 993 drained = check_for_drain_complete_locked(pool); 994 spin_unlock(&pool->lock); 995 996 if (drained) 997 vdo_finish_draining(state); 998 } 999 1000 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name) 1001 { 1002 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread), 1003 "%s called on cpu thread", name); 1004 } 1005 1006 /** 1007 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool. 1008 * @completion: The completion to notify when the pool has drained. 1009 */ 1010 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion) 1011 { 1012 assert_on_vdo_cpu_thread(completion->vdo, __func__); 1013 vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion, 1014 initiate_drain); 1015 } 1016 1017 /** 1018 * resume_data_vio_pool() - Resume a data_vio pool. 1019 * @completion: The completion to notify when the pool has resumed. 1020 */ 1021 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion) 1022 { 1023 assert_on_vdo_cpu_thread(completion->vdo, __func__); 1024 vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state)); 1025 } 1026 1027 static void dump_limiter(const char *name, struct limiter *limiter) 1028 { 1029 vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy, 1030 limiter->limit, limiter->max_busy, 1031 ((bio_list_empty(&limiter->waiters) && 1032 bio_list_empty(&limiter->new_waiters)) ? 1033 "no waiters" : "has waiters")); 1034 } 1035 1036 /** 1037 * dump_data_vio_pool() - Dump a data_vio pool to the log. 1038 * @dump_vios: Whether to dump the details of each busy data_vio as well. 1039 */ 1040 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios) 1041 { 1042 /* 1043 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the 1044 * second clock tick). These numbers were picked based on experiments with lab machines. 1045 */ 1046 static const int ELEMENTS_PER_BATCH = 35; 1047 static const int SLEEP_FOR_SYSLOG = 4000; 1048 1049 if (pool == NULL) 1050 return; 1051 1052 spin_lock(&pool->lock); 1053 dump_limiter("data_vios", &pool->limiter); 1054 dump_limiter("discard permits", &pool->discard_limiter); 1055 if (dump_vios) { 1056 int i; 1057 int dumped = 0; 1058 1059 for (i = 0; i < pool->limiter.limit; i++) { 1060 struct data_vio *data_vio = &pool->data_vios[i]; 1061 1062 if (!list_empty(&data_vio->pool_entry)) 1063 continue; 1064 1065 dump_data_vio(data_vio); 1066 if (++dumped >= ELEMENTS_PER_BATCH) { 1067 spin_unlock(&pool->lock); 1068 dumped = 0; 1069 fsleep(SLEEP_FOR_SYSLOG); 1070 spin_lock(&pool->lock); 1071 } 1072 } 1073 } 1074 1075 spin_unlock(&pool->lock); 1076 } 1077 1078 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool) 1079 { 1080 return READ_ONCE(pool->limiter.busy); 1081 } 1082 1083 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool) 1084 { 1085 return READ_ONCE(pool->limiter.limit); 1086 } 1087 1088 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool) 1089 { 1090 return READ_ONCE(pool->limiter.max_busy); 1091 } 1092 1093 static void update_data_vio_error_stats(struct data_vio *data_vio) 1094 { 1095 u8 index = 0; 1096 static const char * const operations[] = { 1097 [0] = "empty", 1098 [1] = "read", 1099 [2] = "write", 1100 [3] = "read-modify-write", 1101 [5] = "read+fua", 1102 [6] = "write+fua", 1103 [7] = "read-modify-write+fua", 1104 }; 1105 1106 if (data_vio->read) 1107 index = 1; 1108 1109 if (data_vio->write) 1110 index += 2; 1111 1112 if (data_vio->fua) 1113 index += 4; 1114 1115 update_vio_error_stats(&data_vio->vio, 1116 "Completing %s vio for LBN %llu with error after %s", 1117 operations[index], 1118 (unsigned long long) data_vio->logical.lbn, 1119 get_data_vio_operation_name(data_vio)); 1120 } 1121 1122 static void perform_cleanup_stage(struct data_vio *data_vio, 1123 enum data_vio_cleanup_stage stage); 1124 1125 /** 1126 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at 1127 * the end of processing a data_vio. 1128 */ 1129 static void release_allocated_lock(struct vdo_completion *completion) 1130 { 1131 struct data_vio *data_vio = as_data_vio(completion); 1132 1133 assert_data_vio_in_allocated_zone(data_vio); 1134 release_data_vio_allocation_lock(data_vio, false); 1135 perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS); 1136 } 1137 1138 /** release_lock() - Release an uncontended LBN lock. */ 1139 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock) 1140 { 1141 struct int_map *lock_map = lock->zone->lbn_operations; 1142 struct data_vio *lock_holder; 1143 1144 if (!lock->locked) { 1145 /* The lock is not locked, so it had better not be registered in the lock map. */ 1146 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn); 1147 1148 VDO_ASSERT_LOG_ONLY((data_vio != lock_holder), 1149 "no logical block lock held for block %llu", 1150 (unsigned long long) lock->lbn); 1151 return; 1152 } 1153 1154 /* Release the lock by removing the lock from the map. */ 1155 lock_holder = vdo_int_map_remove(lock_map, lock->lbn); 1156 VDO_ASSERT_LOG_ONLY((data_vio == lock_holder), 1157 "logical block lock mismatch for block %llu", 1158 (unsigned long long) lock->lbn); 1159 lock->locked = false; 1160 } 1161 1162 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */ 1163 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock) 1164 { 1165 struct data_vio *lock_holder, *next_lock_holder; 1166 int result; 1167 1168 VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked"); 1169 1170 /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */ 1171 next_lock_holder = 1172 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters)); 1173 1174 /* Transfer the remaining lock waiters to the next lock holder. */ 1175 vdo_waitq_transfer_all_waiters(&lock->waiters, 1176 &next_lock_holder->logical.waiters); 1177 1178 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, 1179 next_lock_holder, true, (void **) &lock_holder); 1180 if (result != VDO_SUCCESS) { 1181 continue_data_vio_with_error(next_lock_holder, result); 1182 return; 1183 } 1184 1185 VDO_ASSERT_LOG_ONLY((lock_holder == data_vio), 1186 "logical block lock mismatch for block %llu", 1187 (unsigned long long) lock->lbn); 1188 lock->locked = false; 1189 1190 /* 1191 * If there are still waiters, other data_vios must be trying to get the lock we just 1192 * transferred. We must ensure that the new lock holder doesn't block in the packer. 1193 */ 1194 if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters)) 1195 cancel_data_vio_compression(next_lock_holder); 1196 1197 /* 1198 * Avoid stack overflow on lock transfer. 1199 * FIXME: this is only an issue in the 1 thread config. 1200 */ 1201 next_lock_holder->vio.completion.requeue = true; 1202 launch_locked_request(next_lock_holder); 1203 } 1204 1205 /** 1206 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of 1207 * processing a data_vio. 1208 */ 1209 static void release_logical_lock(struct vdo_completion *completion) 1210 { 1211 struct data_vio *data_vio = as_data_vio(completion); 1212 struct lbn_lock *lock = &data_vio->logical; 1213 1214 assert_data_vio_in_logical_zone(data_vio); 1215 1216 if (vdo_waitq_has_waiters(&lock->waiters)) 1217 transfer_lock(data_vio, lock); 1218 else 1219 release_lock(data_vio, lock); 1220 1221 vdo_release_flush_generation_lock(data_vio); 1222 perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE); 1223 } 1224 1225 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */ 1226 static void clean_hash_lock(struct vdo_completion *completion) 1227 { 1228 struct data_vio *data_vio = as_data_vio(completion); 1229 1230 assert_data_vio_in_hash_zone(data_vio); 1231 if (completion->result != VDO_SUCCESS) { 1232 vdo_clean_failed_hash_lock(data_vio); 1233 return; 1234 } 1235 1236 vdo_release_hash_lock(data_vio); 1237 perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL); 1238 } 1239 1240 /** 1241 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up. 1242 * 1243 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the 1244 * pool. 1245 */ 1246 static void finish_cleanup(struct data_vio *data_vio) 1247 { 1248 struct vdo_completion *completion = &data_vio->vio.completion; 1249 u32 discard_size = min_t(u32, data_vio->remaining_discard, 1250 VDO_BLOCK_SIZE - data_vio->offset); 1251 1252 VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL, 1253 "complete data_vio has no allocation lock"); 1254 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL, 1255 "complete data_vio has no hash lock"); 1256 if ((data_vio->remaining_discard <= discard_size) || 1257 (completion->result != VDO_SUCCESS)) { 1258 struct data_vio_pool *pool = completion->vdo->data_vio_pool; 1259 1260 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link); 1261 schedule_releases(pool); 1262 return; 1263 } 1264 1265 data_vio->remaining_discard -= discard_size; 1266 data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE); 1267 data_vio->read = data_vio->is_partial; 1268 data_vio->offset = 0; 1269 completion->requeue = true; 1270 data_vio->first_reference_operation_complete = false; 1271 launch_data_vio(data_vio, data_vio->logical.lbn + 1); 1272 } 1273 1274 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */ 1275 static void perform_cleanup_stage(struct data_vio *data_vio, 1276 enum data_vio_cleanup_stage stage) 1277 { 1278 struct vdo *vdo = vdo_from_data_vio(data_vio); 1279 1280 switch (stage) { 1281 case VIO_RELEASE_HASH_LOCK: 1282 if (data_vio->hash_lock != NULL) { 1283 launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock); 1284 return; 1285 } 1286 fallthrough; 1287 1288 case VIO_RELEASE_ALLOCATED: 1289 if (data_vio_has_allocation(data_vio)) { 1290 launch_data_vio_allocated_zone_callback(data_vio, 1291 release_allocated_lock); 1292 return; 1293 } 1294 fallthrough; 1295 1296 case VIO_RELEASE_RECOVERY_LOCKS: 1297 if ((data_vio->recovery_sequence_number > 0) && 1298 (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) && 1299 (data_vio->vio.completion.result != VDO_READ_ONLY)) 1300 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock"); 1301 fallthrough; 1302 1303 case VIO_RELEASE_LOGICAL: 1304 launch_data_vio_logical_callback(data_vio, release_logical_lock); 1305 return; 1306 1307 default: 1308 finish_cleanup(data_vio); 1309 } 1310 } 1311 1312 void complete_data_vio(struct vdo_completion *completion) 1313 { 1314 struct data_vio *data_vio = as_data_vio(completion); 1315 1316 completion->error_handler = NULL; 1317 data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP; 1318 perform_cleanup_stage(data_vio, 1319 (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL)); 1320 } 1321 1322 static void enter_read_only_mode(struct vdo_completion *completion) 1323 { 1324 if (vdo_is_read_only(completion->vdo)) 1325 return; 1326 1327 if (completion->result != VDO_READ_ONLY) { 1328 struct data_vio *data_vio = as_data_vio(completion); 1329 1330 vdo_log_error_strerror(completion->result, 1331 "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s", 1332 (unsigned long long) data_vio->logical.lbn, 1333 (unsigned long long) data_vio->new_mapped.pbn, 1334 (unsigned long long) data_vio->mapped.pbn, 1335 (unsigned long long) data_vio->allocation.pbn, 1336 get_data_vio_operation_name(data_vio)); 1337 } 1338 1339 vdo_enter_read_only_mode(completion->vdo, completion->result); 1340 } 1341 1342 void handle_data_vio_error(struct vdo_completion *completion) 1343 { 1344 struct data_vio *data_vio = as_data_vio(completion); 1345 1346 if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL)) 1347 enter_read_only_mode(completion); 1348 1349 update_data_vio_error_stats(data_vio); 1350 complete_data_vio(completion); 1351 } 1352 1353 /** 1354 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a 1355 * data_vio. 1356 */ 1357 const char *get_data_vio_operation_name(struct data_vio *data_vio) 1358 { 1359 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) != 1360 ARRAY_SIZE(ASYNC_OPERATION_NAMES)); 1361 1362 return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ? 1363 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] : 1364 "unknown async operation"); 1365 } 1366 1367 /** 1368 * data_vio_allocate_data_block() - Allocate a data block. 1369 * 1370 * @write_lock_type: The type of write lock to obtain on the block. 1371 * @callback: The callback which will attempt an allocation in the current zone and continue if it 1372 * succeeds. 1373 * @error_handler: The handler for errors while allocating. 1374 */ 1375 void data_vio_allocate_data_block(struct data_vio *data_vio, 1376 enum pbn_lock_type write_lock_type, 1377 vdo_action_fn callback, vdo_action_fn error_handler) 1378 { 1379 struct allocation *allocation = &data_vio->allocation; 1380 1381 VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK), 1382 "data_vio does not have an allocation"); 1383 allocation->write_lock_type = write_lock_type; 1384 allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone); 1385 allocation->first_allocation_zone = allocation->zone->zone_number; 1386 1387 data_vio->vio.completion.error_handler = error_handler; 1388 launch_data_vio_allocated_zone_callback(data_vio, callback); 1389 } 1390 1391 /** 1392 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block. 1393 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten). 1394 * 1395 * If the reference to the locked block is still provisional, it will be released as well. 1396 */ 1397 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset) 1398 { 1399 struct allocation *allocation = &data_vio->allocation; 1400 physical_block_number_t locked_pbn = allocation->pbn; 1401 1402 assert_data_vio_in_allocated_zone(data_vio); 1403 1404 if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock)) 1405 allocation->pbn = VDO_ZERO_BLOCK; 1406 1407 vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn, 1408 vdo_forget(allocation->lock)); 1409 } 1410 1411 /** 1412 * uncompress_data_vio() - Uncompress the data a data_vio has just read. 1413 * @mapping_state: The mapping state indicating which fragment to decompress. 1414 * @buffer: The buffer to receive the uncompressed data. 1415 */ 1416 int uncompress_data_vio(struct data_vio *data_vio, 1417 enum block_mapping_state mapping_state, char *buffer) 1418 { 1419 int size; 1420 u16 fragment_offset, fragment_size; 1421 struct compressed_block *block = data_vio->compression.block; 1422 int result = vdo_get_compressed_block_fragment(mapping_state, block, 1423 &fragment_offset, &fragment_size); 1424 1425 if (result != VDO_SUCCESS) { 1426 vdo_log_debug("%s: compressed fragment error %d", __func__, result); 1427 return result; 1428 } 1429 1430 size = LZ4_decompress_safe((block->data + fragment_offset), buffer, 1431 fragment_size, VDO_BLOCK_SIZE); 1432 if (size != VDO_BLOCK_SIZE) { 1433 vdo_log_debug("%s: lz4 error", __func__); 1434 return VDO_INVALID_FRAGMENT; 1435 } 1436 1437 return VDO_SUCCESS; 1438 } 1439 1440 /** 1441 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle. 1442 * @completion: The data_vio which has just finished its read. 1443 * 1444 * This callback is registered in read_block(). 1445 */ 1446 static void modify_for_partial_write(struct vdo_completion *completion) 1447 { 1448 struct data_vio *data_vio = as_data_vio(completion); 1449 char *data = data_vio->vio.data; 1450 struct bio *bio = data_vio->user_bio; 1451 1452 assert_data_vio_on_cpu_thread(data_vio); 1453 1454 if (bio_op(bio) == REQ_OP_DISCARD) { 1455 memset(data + data_vio->offset, '\0', min_t(u32, 1456 data_vio->remaining_discard, 1457 VDO_BLOCK_SIZE - data_vio->offset)); 1458 } else { 1459 copy_from_bio(bio, data + data_vio->offset); 1460 } 1461 1462 data_vio->is_zero = is_zero_block(data); 1463 data_vio->read = false; 1464 launch_data_vio_logical_callback(data_vio, 1465 continue_data_vio_with_block_map_slot); 1466 } 1467 1468 static void complete_read(struct vdo_completion *completion) 1469 { 1470 struct data_vio *data_vio = as_data_vio(completion); 1471 char *data = data_vio->vio.data; 1472 bool compressed = vdo_is_state_compressed(data_vio->mapped.state); 1473 1474 assert_data_vio_on_cpu_thread(data_vio); 1475 1476 if (compressed) { 1477 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data); 1478 1479 if (result != VDO_SUCCESS) { 1480 continue_data_vio_with_error(data_vio, result); 1481 return; 1482 } 1483 } 1484 1485 if (data_vio->write) { 1486 modify_for_partial_write(completion); 1487 return; 1488 } 1489 1490 if (compressed || data_vio->is_partial) 1491 copy_to_bio(data_vio->user_bio, data + data_vio->offset); 1492 1493 acknowledge_data_vio(data_vio); 1494 complete_data_vio(completion); 1495 } 1496 1497 static void read_endio(struct bio *bio) 1498 { 1499 struct data_vio *data_vio = vio_as_data_vio(bio->bi_private); 1500 int result = blk_status_to_errno(bio->bi_status); 1501 1502 vdo_count_completed_bios(bio); 1503 if (result != VDO_SUCCESS) { 1504 continue_data_vio_with_error(data_vio, result); 1505 return; 1506 } 1507 1508 launch_data_vio_cpu_callback(data_vio, complete_read, 1509 CPU_Q_COMPLETE_READ_PRIORITY); 1510 } 1511 1512 static void complete_zero_read(struct vdo_completion *completion) 1513 { 1514 struct data_vio *data_vio = as_data_vio(completion); 1515 1516 assert_data_vio_on_cpu_thread(data_vio); 1517 1518 if (data_vio->is_partial) { 1519 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE); 1520 if (data_vio->write) { 1521 modify_for_partial_write(completion); 1522 return; 1523 } 1524 } else { 1525 zero_fill_bio(data_vio->user_bio); 1526 } 1527 1528 complete_read(completion); 1529 } 1530 1531 /** 1532 * read_block() - Read a block asynchronously. 1533 * 1534 * This is the callback registered in read_block_mapping(). 1535 */ 1536 static void read_block(struct vdo_completion *completion) 1537 { 1538 struct data_vio *data_vio = as_data_vio(completion); 1539 struct vio *vio = as_vio(completion); 1540 int result = VDO_SUCCESS; 1541 1542 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { 1543 launch_data_vio_cpu_callback(data_vio, complete_zero_read, 1544 CPU_Q_COMPLETE_VIO_PRIORITY); 1545 return; 1546 } 1547 1548 data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO; 1549 if (vdo_is_state_compressed(data_vio->mapped.state)) { 1550 result = vio_reset_bio(vio, (char *) data_vio->compression.block, 1551 read_endio, REQ_OP_READ, data_vio->mapped.pbn); 1552 } else { 1553 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ); 1554 1555 if (data_vio->is_partial) { 1556 result = vio_reset_bio(vio, vio->data, read_endio, opf, 1557 data_vio->mapped.pbn); 1558 } else { 1559 /* A full 4k read. Use the incoming bio to avoid having to copy the data */ 1560 bio_reset(vio->bio, vio->bio->bi_bdev, opf); 1561 bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio, 1562 data_vio->user_bio, GFP_KERNEL); 1563 1564 /* Copy over the original bio iovec and opflags. */ 1565 vdo_set_bio_properties(vio->bio, vio, read_endio, opf, 1566 data_vio->mapped.pbn); 1567 } 1568 } 1569 1570 if (result != VDO_SUCCESS) { 1571 continue_data_vio_with_error(data_vio, result); 1572 return; 1573 } 1574 1575 vdo_submit_data_vio(data_vio); 1576 } 1577 1578 static inline struct data_vio * 1579 reference_count_update_completion_as_data_vio(struct vdo_completion *completion) 1580 { 1581 if (completion->type == VIO_COMPLETION) 1582 return as_data_vio(completion); 1583 1584 return container_of(completion, struct data_vio, decrement_completion); 1585 } 1586 1587 /** 1588 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has 1589 * made its reference updates. Handle any error from either, or proceed 1590 * to updating the block map. 1591 * @completion: The completion of the write in progress. 1592 */ 1593 static void update_block_map(struct vdo_completion *completion) 1594 { 1595 struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion); 1596 1597 assert_data_vio_in_logical_zone(data_vio); 1598 1599 if (!data_vio->first_reference_operation_complete) { 1600 /* Rendezvous, we're first */ 1601 data_vio->first_reference_operation_complete = true; 1602 return; 1603 } 1604 1605 completion = &data_vio->vio.completion; 1606 vdo_set_completion_result(completion, data_vio->decrement_completion.result); 1607 if (completion->result != VDO_SUCCESS) { 1608 handle_data_vio_error(completion); 1609 return; 1610 } 1611 1612 completion->error_handler = handle_data_vio_error; 1613 if (data_vio->hash_lock != NULL) 1614 set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock); 1615 else 1616 completion->callback = complete_data_vio; 1617 1618 data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK; 1619 vdo_put_mapped_block(data_vio); 1620 } 1621 1622 static void decrement_reference_count(struct vdo_completion *completion) 1623 { 1624 struct data_vio *data_vio = container_of(completion, struct data_vio, 1625 decrement_completion); 1626 1627 assert_data_vio_in_mapped_zone(data_vio); 1628 1629 vdo_set_completion_callback(completion, update_block_map, 1630 data_vio->logical.zone->thread_id); 1631 completion->error_handler = update_block_map; 1632 vdo_modify_reference_count(completion, &data_vio->decrement_updater); 1633 } 1634 1635 static void increment_reference_count(struct vdo_completion *completion) 1636 { 1637 struct data_vio *data_vio = as_data_vio(completion); 1638 1639 assert_data_vio_in_new_mapped_zone(data_vio); 1640 1641 if (data_vio->downgrade_allocation_lock) { 1642 /* 1643 * Now that the data has been written, it's safe to deduplicate against the 1644 * block. Downgrade the allocation lock to a read lock so it can be used later by 1645 * the hash lock. This is done here since it needs to happen sometime before we 1646 * return to the hash zone, and we are currently on the correct thread. For 1647 * compressed blocks, the downgrade will have already been done. 1648 */ 1649 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false); 1650 } 1651 1652 set_data_vio_logical_callback(data_vio, update_block_map); 1653 completion->error_handler = update_block_map; 1654 vdo_modify_reference_count(completion, &data_vio->increment_updater); 1655 } 1656 1657 /** journal_remapping() - Add a recovery journal entry for a data remapping. */ 1658 static void journal_remapping(struct vdo_completion *completion) 1659 { 1660 struct data_vio *data_vio = as_data_vio(completion); 1661 1662 assert_data_vio_in_journal_zone(data_vio); 1663 1664 data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING; 1665 data_vio->decrement_updater.zpbn = data_vio->mapped; 1666 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { 1667 data_vio->first_reference_operation_complete = true; 1668 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) 1669 set_data_vio_logical_callback(data_vio, update_block_map); 1670 } else { 1671 set_data_vio_new_mapped_zone_callback(data_vio, 1672 increment_reference_count); 1673 } 1674 1675 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { 1676 data_vio->first_reference_operation_complete = true; 1677 } else { 1678 vdo_set_completion_callback(&data_vio->decrement_completion, 1679 decrement_reference_count, 1680 data_vio->mapped.zone->thread_id); 1681 } 1682 1683 data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING; 1684 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 1685 } 1686 1687 /** 1688 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write. 1689 * 1690 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate 1691 * journal entry referencing the removal of this LBN->PBN mapping. 1692 */ 1693 static void read_old_block_mapping(struct vdo_completion *completion) 1694 { 1695 struct data_vio *data_vio = as_data_vio(completion); 1696 1697 assert_data_vio_in_logical_zone(data_vio); 1698 1699 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE; 1700 set_data_vio_journal_callback(data_vio, journal_remapping); 1701 vdo_get_mapped_block(data_vio); 1702 } 1703 1704 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock) 1705 { 1706 data_vio->increment_updater = (struct reference_updater) { 1707 .operation = VDO_JOURNAL_DATA_REMAPPING, 1708 .increment = true, 1709 .zpbn = data_vio->new_mapped, 1710 .lock = lock, 1711 }; 1712 1713 launch_data_vio_logical_callback(data_vio, read_old_block_mapping); 1714 } 1715 1716 /** 1717 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block. 1718 * 1719 * This is the callback registered in launch_compress_data_vio(). 1720 */ 1721 static void pack_compressed_data(struct vdo_completion *completion) 1722 { 1723 struct data_vio *data_vio = as_data_vio(completion); 1724 1725 assert_data_vio_in_packer_zone(data_vio); 1726 1727 if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) || 1728 get_data_vio_compression_status(data_vio).may_not_compress) { 1729 write_data_vio(data_vio); 1730 return; 1731 } 1732 1733 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING; 1734 vdo_attempt_packing(data_vio); 1735 } 1736 1737 /** 1738 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue. 1739 * 1740 * This callback is registered in launch_compress_data_vio(). 1741 */ 1742 static void compress_data_vio(struct vdo_completion *completion) 1743 { 1744 struct data_vio *data_vio = as_data_vio(completion); 1745 int size; 1746 1747 assert_data_vio_on_cpu_thread(data_vio); 1748 1749 /* 1750 * By putting the compressed data at the start of the compressed block data field, we won't 1751 * need to copy it if this data_vio becomes a compressed write agent. 1752 */ 1753 size = LZ4_compress_default(data_vio->vio.data, 1754 data_vio->compression.block->data, VDO_BLOCK_SIZE, 1755 VDO_MAX_COMPRESSED_FRAGMENT_SIZE, 1756 (char *) vdo_get_work_queue_private_data()); 1757 if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) { 1758 data_vio->compression.size = size; 1759 launch_data_vio_packer_callback(data_vio, pack_compressed_data); 1760 return; 1761 } 1762 1763 write_data_vio(data_vio); 1764 } 1765 1766 /** 1767 * launch_compress_data_vio() - Continue a write by attempting to compress the data. 1768 * 1769 * This is a re-entry point to vio_write used by hash locks. 1770 */ 1771 void launch_compress_data_vio(struct data_vio *data_vio) 1772 { 1773 VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block"); 1774 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL, 1775 "data_vio to compress has a hash_lock"); 1776 VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio), 1777 "data_vio to compress has an allocation"); 1778 1779 /* 1780 * There are 4 reasons why a data_vio which has reached this point will not be eligible for 1781 * compression: 1782 * 1783 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the 1784 * write request also requests FUA. 1785 * 1786 * 2) A data_vio should not be compressed when compression is disabled for the vdo. 1787 * 1788 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not 1789 * yet been acknowledged and hence blocking in the packer would be bad. 1790 * 1791 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the 1792 * packer would also be bad. 1793 */ 1794 if (data_vio->fua || 1795 !vdo_get_compressing(vdo_from_data_vio(data_vio)) || 1796 ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) || 1797 (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) { 1798 write_data_vio(data_vio); 1799 return; 1800 } 1801 1802 data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO; 1803 launch_data_vio_cpu_callback(data_vio, compress_data_vio, 1804 CPU_Q_COMPRESS_BLOCK_PRIORITY); 1805 } 1806 1807 /** 1808 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record 1809 * name as set). 1810 1811 * This callback is registered in prepare_for_dedupe(). 1812 */ 1813 static void hash_data_vio(struct vdo_completion *completion) 1814 { 1815 struct data_vio *data_vio = as_data_vio(completion); 1816 1817 assert_data_vio_on_cpu_thread(data_vio); 1818 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed"); 1819 1820 murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be, 1821 &data_vio->record_name); 1822 1823 data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones, 1824 &data_vio->record_name); 1825 data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK; 1826 launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock); 1827 } 1828 1829 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */ 1830 static void prepare_for_dedupe(struct data_vio *data_vio) 1831 { 1832 /* We don't care what thread we are on. */ 1833 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks"); 1834 1835 /* 1836 * Before we can dedupe, we need to know the record name, so the first 1837 * step is to hash the block data. 1838 */ 1839 data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO; 1840 launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY); 1841 } 1842 1843 /** 1844 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called 1845 * when a data_vio's write to the underlying storage has completed. 1846 */ 1847 static void write_bio_finished(struct bio *bio) 1848 { 1849 struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private); 1850 1851 vdo_count_completed_bios(bio); 1852 vdo_set_completion_result(&data_vio->vio.completion, 1853 blk_status_to_errno(bio->bi_status)); 1854 data_vio->downgrade_allocation_lock = true; 1855 update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock); 1856 } 1857 1858 /** write_data_vio() - Write a data block to storage without compression. */ 1859 void write_data_vio(struct data_vio *data_vio) 1860 { 1861 struct data_vio_compression_status status, new_status; 1862 int result; 1863 1864 if (!data_vio_has_allocation(data_vio)) { 1865 /* 1866 * There was no space to write this block and we failed to deduplicate or compress 1867 * it. 1868 */ 1869 continue_data_vio_with_error(data_vio, VDO_NO_SPACE); 1870 return; 1871 } 1872 1873 new_status = (struct data_vio_compression_status) { 1874 .stage = DATA_VIO_POST_PACKER, 1875 .may_not_compress = true, 1876 }; 1877 1878 do { 1879 status = get_data_vio_compression_status(data_vio); 1880 } while ((status.stage != DATA_VIO_POST_PACKER) && 1881 !set_data_vio_compression_status(data_vio, status, new_status)); 1882 1883 /* Write the data from the data block buffer. */ 1884 result = vio_reset_bio(&data_vio->vio, data_vio->vio.data, 1885 write_bio_finished, REQ_OP_WRITE, 1886 data_vio->allocation.pbn); 1887 if (result != VDO_SUCCESS) { 1888 continue_data_vio_with_error(data_vio, result); 1889 return; 1890 } 1891 1892 data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO; 1893 vdo_submit_data_vio(data_vio); 1894 } 1895 1896 /** 1897 * acknowledge_write_callback() - Acknowledge a write to the requestor. 1898 * 1899 * This callback is registered in allocate_block() and continue_write_with_block_map_slot(). 1900 */ 1901 static void acknowledge_write_callback(struct vdo_completion *completion) 1902 { 1903 struct data_vio *data_vio = as_data_vio(completion); 1904 struct vdo *vdo = completion->vdo; 1905 1906 VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) || 1907 (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)), 1908 "%s() called on bio ack queue", __func__); 1909 VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio), 1910 "write VIO to be acknowledged has a flush generation lock"); 1911 acknowledge_data_vio(data_vio); 1912 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { 1913 /* This is a zero write or discard */ 1914 update_metadata_for_data_vio_write(data_vio, NULL); 1915 return; 1916 } 1917 1918 prepare_for_dedupe(data_vio); 1919 } 1920 1921 /** 1922 * allocate_block() - Attempt to allocate a block in the current allocation zone. 1923 * 1924 * This callback is registered in continue_write_with_block_map_slot(). 1925 */ 1926 static void allocate_block(struct vdo_completion *completion) 1927 { 1928 struct data_vio *data_vio = as_data_vio(completion); 1929 1930 assert_data_vio_in_allocated_zone(data_vio); 1931 1932 if (!vdo_allocate_block_in_zone(data_vio)) 1933 return; 1934 1935 completion->error_handler = handle_data_vio_error; 1936 WRITE_ONCE(data_vio->allocation_succeeded, true); 1937 data_vio->new_mapped = (struct zoned_pbn) { 1938 .zone = data_vio->allocation.zone, 1939 .pbn = data_vio->allocation.pbn, 1940 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 1941 }; 1942 1943 if (data_vio->fua || 1944 data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) { 1945 prepare_for_dedupe(data_vio); 1946 return; 1947 } 1948 1949 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; 1950 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback); 1951 } 1952 1953 /** 1954 * handle_allocation_error() - Handle an error attempting to allocate a block. 1955 * 1956 * This error handler is registered in continue_write_with_block_map_slot(). 1957 */ 1958 static void handle_allocation_error(struct vdo_completion *completion) 1959 { 1960 struct data_vio *data_vio = as_data_vio(completion); 1961 1962 if (completion->result == VDO_NO_SPACE) { 1963 /* We failed to get an allocation, but we can try to dedupe. */ 1964 vdo_reset_completion(completion); 1965 completion->error_handler = handle_data_vio_error; 1966 prepare_for_dedupe(data_vio); 1967 return; 1968 } 1969 1970 /* We got a "real" error, not just a failure to allocate, so fail the request. */ 1971 handle_data_vio_error(completion); 1972 } 1973 1974 static int assert_is_discard(struct data_vio *data_vio) 1975 { 1976 int result = VDO_ASSERT(data_vio->is_discard, 1977 "data_vio with no block map page is a discard"); 1978 1979 return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY); 1980 } 1981 1982 /** 1983 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map. 1984 * 1985 * This callback is registered in launch_read_data_vio(). 1986 */ 1987 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion) 1988 { 1989 struct data_vio *data_vio = as_data_vio(completion); 1990 1991 assert_data_vio_in_logical_zone(data_vio); 1992 if (data_vio->read) { 1993 set_data_vio_logical_callback(data_vio, read_block); 1994 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ; 1995 vdo_get_mapped_block(data_vio); 1996 return; 1997 } 1998 1999 vdo_acquire_flush_generation_lock(data_vio); 2000 2001 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 2002 /* 2003 * This is a discard for a block on a block map page which has not been allocated, so 2004 * there's nothing more we need to do. 2005 */ 2006 completion->callback = complete_data_vio; 2007 continue_data_vio_with_error(data_vio, assert_is_discard(data_vio)); 2008 return; 2009 } 2010 2011 /* 2012 * We need an allocation if this is neither a full-block discard nor a 2013 * full-block zero write. 2014 */ 2015 if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) { 2016 data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block, 2017 handle_allocation_error); 2018 return; 2019 } 2020 2021 /* 2022 * We don't need to write any data, so skip allocation and just update the block map and 2023 * reference counts (via the journal). 2024 */ 2025 data_vio->new_mapped.pbn = VDO_ZERO_BLOCK; 2026 if (data_vio->is_zero) 2027 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED; 2028 2029 if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) { 2030 /* This is not the final block of a discard so we can't acknowledge it yet. */ 2031 update_metadata_for_data_vio_write(data_vio, NULL); 2032 return; 2033 } 2034 2035 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; 2036 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback); 2037 } 2038