Lines Matching +full:current +full:- +full:limiter
1 // SPDX-License-Identifier: GPL-2.0-only
6 #include "data-vio.h"
12 #include <linux/device-mapper.h>
24 #include "memory-alloc.h"
28 #include "block-map.h"
31 #include "int-map.h"
32 #include "io-submitter.h"
33 #include "logical-zone.h"
35 #include "recovery-journal.h"
36 #include "slab-depot.h"
37 #include "status-codes.h"
41 #include "wait-queue.h"
90 * permits. The limiters also provide safe cross-thread access to pool statistics without the need
96 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
120 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
124 struct limiter;
125 typedef void (*assigner_fn)(struct limiter *limiter);
128 struct limiter { struct
129 /* The data_vio_pool to which this limiter belongs */
166 /* The main limiter controlling the total data_vios in the pool. */ argument
167 struct limiter limiter; member
168 /* The limiter controlling data_vios for discard */
169 struct limiter discard_limiter;
224 return (u64) bio->bi_private; in get_arrival_time()
228 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
234 if (pool->limiter.busy > 0) in check_for_drain_complete_locked()
237 VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0), in check_for_drain_complete_locked()
240 return (bio_list_empty(&pool->limiter.new_waiters) && in check_for_drain_complete_locked()
241 bio_list_empty(&pool->discard_limiter.new_waiters)); in check_for_drain_complete_locked()
248 struct lbn_lock *lock = &data_vio->logical; in initialize_lbn_lock()
250 lock->lbn = lbn; in initialize_lbn_lock()
251 lock->locked = false; in initialize_lbn_lock()
252 vdo_waitq_init(&lock->waiters); in initialize_lbn_lock()
254 lock->zone = &vdo->logical_zones->zones[zone_number]; in initialize_lbn_lock()
259 data_vio->logical.locked = true; in launch_locked_request()
260 if (data_vio->write) { in launch_locked_request()
269 data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT; in launch_locked_request()
276 struct bio *bio = data_vio->user_bio; in acknowledge_data_vio()
277 int error = vdo_status_to_errno(data_vio->vio.completion.result); in acknowledge_data_vio()
282 VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <= in acknowledge_data_vio()
283 (u32) (VDO_BLOCK_SIZE - data_vio->offset)), in acknowledge_data_vio()
286 data_vio->user_bio = NULL; in acknowledge_data_vio()
287 vdo_count_bios(&vdo->stats.bios_acknowledged, bio); in acknowledge_data_vio()
288 if (data_vio->is_partial) in acknowledge_data_vio()
289 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio); in acknowledge_data_vio()
291 bio->bi_status = errno_to_blk_status(error); in acknowledge_data_vio()
308 u32 packed = atomic_read(&data_vio->compression.status); in get_data_vio_compression_status()
319 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
331 * set_data_vio_compression_status() - Set the compression status of a data_vio.
333 * @status: The expected current status of the data_vio.
353 actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement); in set_data_vio_compression_status()
373 * Compression has been dis-allowed for this VIO, so skip the rest of the in advance_data_vio_compression_stage()
390 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
417 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
425 struct lbn_lock *lock = &data_vio->logical; in attempt_logical_block_lock()
432 if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) { in attempt_logical_block_lock()
437 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, in attempt_logical_block_lock()
450 result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held"); in attempt_logical_block_lock()
457 * If the new request is a pure read request (not read-modify-write) and the lock_holder is in attempt_logical_block_lock()
464 if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) { in attempt_logical_block_lock()
465 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset); in attempt_logical_block_lock()
471 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK; in attempt_logical_block_lock()
472 vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter); in attempt_logical_block_lock()
475 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the in attempt_logical_block_lock()
478 if (lock_holder->write && cancel_data_vio_compression(lock_holder)) { in attempt_logical_block_lock()
479 data_vio->compression.lock_holder = lock_holder; in attempt_logical_block_lock()
486 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
493 struct vdo_completion *completion = &data_vio->vio.completion; in launch_data_vio()
499 memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock)); in launch_data_vio()
501 INIT_LIST_HEAD(&data_vio->hash_lock_entry); in launch_data_vio()
502 INIT_LIST_HEAD(&data_vio->write_entry); in launch_data_vio()
504 memset(&data_vio->allocation, 0, sizeof(data_vio->allocation)); in launch_data_vio()
506 data_vio->is_duplicate = false; in launch_data_vio()
508 memset(&data_vio->record_name, 0, sizeof(data_vio->record_name)); in launch_data_vio()
509 memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate)); in launch_data_vio()
510 vdo_reset_completion(&data_vio->decrement_completion); in launch_data_vio()
512 completion->error_handler = handle_data_vio_error; in launch_data_vio()
536 memset(&data_vio->compression, 0, offsetof(struct compression_state, block)); in launch_bio()
538 data_vio->user_bio = bio; in launch_bio()
539 data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK); in launch_bio()
540 data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0); in launch_bio()
543 * Discards behave very differently than other requests when coming in from device-mapper. in launch_bio()
548 data_vio->remaining_discard = bio->bi_iter.bi_size; in launch_bio()
549 data_vio->write = true; in launch_bio()
550 data_vio->is_discard = true; in launch_bio()
551 if (data_vio->is_partial) { in launch_bio()
552 vdo_count_bios(&vdo->stats.bios_in_partial, bio); in launch_bio()
553 data_vio->read = true; in launch_bio()
555 } else if (data_vio->is_partial) { in launch_bio()
556 vdo_count_bios(&vdo->stats.bios_in_partial, bio); in launch_bio()
557 data_vio->read = true; in launch_bio()
559 data_vio->write = true; in launch_bio()
561 data_vio->read = true; in launch_bio()
567 copy_from_bio(bio, data_vio->vio.data); in launch_bio()
568 data_vio->is_zero = mem_is_zero(data_vio->vio.data, VDO_BLOCK_SIZE); in launch_bio()
569 data_vio->write = true; in launch_bio()
572 if (data_vio->user_bio->bi_opf & REQ_FUA) in launch_bio()
573 data_vio->fua = true; in launch_bio()
575 lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK; in launch_bio()
579 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio) in assign_data_vio() argument
581 struct bio *bio = bio_list_pop(limiter->permitted_waiters); in assign_data_vio()
583 launch_bio(limiter->pool->completion.vdo, data_vio, bio); in assign_data_vio()
584 limiter->wake_count++; in assign_data_vio()
586 bio = bio_list_peek(limiter->permitted_waiters); in assign_data_vio()
587 limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio)); in assign_data_vio()
590 static void assign_discard_permit(struct limiter *limiter) in assign_discard_permit() argument
592 struct bio *bio = bio_list_pop(&limiter->waiters); in assign_discard_permit()
594 if (limiter->arrival == U64_MAX) in assign_discard_permit()
595 limiter->arrival = get_arrival_time(bio); in assign_discard_permit()
597 bio_list_add(limiter->permitted_waiters, bio); in assign_discard_permit()
600 static void get_waiters(struct limiter *limiter) in get_waiters() argument
602 bio_list_merge_init(&limiter->waiters, &limiter->new_waiters); in get_waiters()
608 list_first_entry(&pool->available, struct data_vio, pool_entry); in get_available_data_vio()
610 list_del_init(&data_vio->pool_entry); in get_available_data_vio()
614 static void assign_data_vio_to_waiter(struct limiter *limiter) in assign_data_vio_to_waiter() argument
616 assign_data_vio(limiter, get_available_data_vio(limiter->pool)); in assign_data_vio_to_waiter()
619 static void update_limiter(struct limiter *limiter) in update_limiter() argument
621 struct bio_list *waiters = &limiter->waiters; in update_limiter()
622 data_vio_count_t available = limiter->limit - limiter->busy; in update_limiter()
624 VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy), in update_limiter()
626 limiter->release_count, limiter->busy); in update_limiter()
628 get_waiters(limiter); in update_limiter()
629 for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--) in update_limiter()
630 limiter->assigner(limiter); in update_limiter()
632 if (limiter->release_count > 0) { in update_limiter()
633 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count); in update_limiter()
634 limiter->release_count = 0; in update_limiter()
638 for (; (available > 0) && !bio_list_empty(waiters); available--) in update_limiter()
639 limiter->assigner(limiter); in update_limiter()
641 WRITE_ONCE(limiter->busy, limiter->limit - available); in update_limiter()
642 if (limiter->max_busy < limiter->busy) in update_limiter()
643 WRITE_ONCE(limiter->max_busy, limiter->busy); in update_limiter()
647 * schedule_releases() - Ensure that release processing is scheduled.
657 if (atomic_cmpxchg(&pool->processing, false, true)) in schedule_releases()
660 pool->completion.requeue = true; in schedule_releases()
661 vdo_launch_completion_with_priority(&pool->completion, in schedule_releases()
669 if (data_vio->remaining_discard > 0) { in reuse_or_release_resources()
670 if (bio_list_empty(&pool->discard_limiter.waiters)) { in reuse_or_release_resources()
672 pool->discard_limiter.release_count++; in reuse_or_release_resources()
674 assign_discard_permit(&pool->discard_limiter); in reuse_or_release_resources()
678 if (pool->limiter.arrival < pool->discard_limiter.arrival) { in reuse_or_release_resources()
679 assign_data_vio(&pool->limiter, data_vio); in reuse_or_release_resources()
680 } else if (pool->discard_limiter.arrival < U64_MAX) { in reuse_or_release_resources()
681 assign_data_vio(&pool->discard_limiter, data_vio); in reuse_or_release_resources()
683 list_add(&data_vio->pool_entry, returned); in reuse_or_release_resources()
684 pool->limiter.release_count++; in reuse_or_release_resources()
689 * process_release_callback() - Process a batch of data_vio releases.
702 spin_lock(&pool->lock); in process_release_callback()
703 get_waiters(&pool->discard_limiter); in process_release_callback()
704 get_waiters(&pool->limiter); in process_release_callback()
705 spin_unlock(&pool->lock); in process_release_callback()
707 if (pool->limiter.arrival == U64_MAX) { in process_release_callback()
708 struct bio *bio = bio_list_peek(&pool->limiter.waiters); in process_release_callback()
711 pool->limiter.arrival = get_arrival_time(bio); in process_release_callback()
716 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue); in process_release_callback()
727 spin_lock(&pool->lock); in process_release_callback()
734 update_limiter(&pool->discard_limiter); in process_release_callback()
735 list_splice(&returned, &pool->available); in process_release_callback()
736 update_limiter(&pool->limiter); in process_release_callback()
737 to_wake = pool->limiter.wake_count; in process_release_callback()
738 pool->limiter.wake_count = 0; in process_release_callback()
739 discards_to_wake = pool->discard_limiter.wake_count; in process_release_callback()
740 pool->discard_limiter.wake_count = 0; in process_release_callback()
742 atomic_set(&pool->processing, false); in process_release_callback()
746 reschedule = !vdo_is_funnel_queue_empty(pool->queue); in process_release_callback()
748 vdo_is_state_draining(&pool->state) && in process_release_callback()
750 spin_unlock(&pool->lock); in process_release_callback()
753 wake_up_nr(&pool->limiter.blocked_threads, to_wake); in process_release_callback()
756 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake); in process_release_callback()
761 vdo_finish_draining(&pool->state); in process_release_callback()
764 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool, in initialize_limiter() argument
767 limiter->pool = pool; in initialize_limiter()
768 limiter->assigner = assigner; in initialize_limiter()
769 limiter->limit = limit; in initialize_limiter()
770 limiter->arrival = U64_MAX; in initialize_limiter()
771 init_waitqueue_head(&limiter->blocked_threads); in initialize_limiter()
775 * initialize_data_vio() - Allocate the components of a data_vio.
790 &data_vio->vio.data); in initialize_data_vio()
796 &data_vio->compression.block); in initialize_data_vio()
803 &data_vio->scratch_block); in initialize_data_vio()
813 vdo_initialize_completion(&data_vio->decrement_completion, vdo, in initialize_data_vio()
815 initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo); in initialize_data_vio()
825 vdo_free_bio(vdo_forget(data_vio->vio.bio)); in destroy_data_vio()
826 vdo_free(vdo_forget(data_vio->vio.data)); in destroy_data_vio()
827 vdo_free(vdo_forget(data_vio->compression.block)); in destroy_data_vio()
828 vdo_free(vdo_forget(data_vio->scratch_block)); in destroy_data_vio()
832 * make_data_vio_pool() - Initialize a data_vio pool.
852 initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit, in make_data_vio_pool()
854 pool->discard_limiter.permitted_waiters = &pool->permitted_discards; in make_data_vio_pool()
855 initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size); in make_data_vio_pool()
856 pool->limiter.permitted_waiters = &pool->limiter.waiters; in make_data_vio_pool()
857 INIT_LIST_HEAD(&pool->available); in make_data_vio_pool()
858 spin_lock_init(&pool->lock); in make_data_vio_pool()
859 vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION); in make_data_vio_pool()
860 vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION); in make_data_vio_pool()
861 vdo_prepare_completion(&pool->completion, process_release_callback, in make_data_vio_pool()
862 process_release_callback, vdo->thread_config.cpu_thread, in make_data_vio_pool()
865 result = vdo_make_funnel_queue(&pool->queue); in make_data_vio_pool()
872 struct data_vio *data_vio = &pool->data_vios[i]; in make_data_vio_pool()
881 list_add(&data_vio->pool_entry, &pool->available); in make_data_vio_pool()
889 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
906 BUG_ON(atomic_read(&pool->processing)); in free_data_vio_pool()
908 spin_lock(&pool->lock); in free_data_vio_pool()
909 VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0), in free_data_vio_pool()
911 pool->limiter.busy); in free_data_vio_pool()
912 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) && in free_data_vio_pool()
913 bio_list_empty(&pool->limiter.new_waiters)), in free_data_vio_pool()
915 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) && in free_data_vio_pool()
916 bio_list_empty(&pool->discard_limiter.new_waiters)), in free_data_vio_pool()
918 spin_unlock(&pool->lock); in free_data_vio_pool()
920 list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) { in free_data_vio_pool()
921 list_del_init(&data_vio->pool_entry); in free_data_vio_pool()
925 vdo_free_funnel_queue(vdo_forget(pool->queue)); in free_data_vio_pool()
929 static bool acquire_permit(struct limiter *limiter) in acquire_permit() argument
931 if (limiter->busy >= limiter->limit) in acquire_permit()
934 WRITE_ONCE(limiter->busy, limiter->busy + 1); in acquire_permit()
935 if (limiter->max_busy < limiter->busy) in acquire_permit()
936 WRITE_ONCE(limiter->max_busy, limiter->busy); in acquire_permit()
940 static void wait_permit(struct limiter *limiter, struct bio *bio) in wait_permit() argument
941 __releases(&limiter->pool->lock) in wait_permit()
945 bio_list_add(&limiter->new_waiters, bio); in wait_permit()
946 prepare_to_wait_exclusive(&limiter->blocked_threads, &wait, in wait_permit()
948 spin_unlock(&limiter->pool->lock); in wait_permit()
950 finish_wait(&limiter->blocked_threads, &wait); in wait_permit()
954 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
964 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state), in vdo_launch_bio()
967 bio->bi_private = (void *) jiffies; in vdo_launch_bio()
968 spin_lock(&pool->lock); in vdo_launch_bio()
970 !acquire_permit(&pool->discard_limiter)) { in vdo_launch_bio()
971 wait_permit(&pool->discard_limiter, bio); in vdo_launch_bio()
975 if (!acquire_permit(&pool->limiter)) { in vdo_launch_bio()
976 wait_permit(&pool->limiter, bio); in vdo_launch_bio()
981 spin_unlock(&pool->lock); in vdo_launch_bio()
982 launch_bio(pool->completion.vdo, data_vio, bio); in vdo_launch_bio()
991 spin_lock(&pool->lock); in initiate_drain()
993 spin_unlock(&pool->lock); in initiate_drain()
1001 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread), in assert_on_vdo_cpu_thread()
1006 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1012 assert_on_vdo_cpu_thread(completion->vdo, __func__); in drain_data_vio_pool()
1013 vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion, in drain_data_vio_pool()
1018 * resume_data_vio_pool() - Resume a data_vio pool.
1024 assert_on_vdo_cpu_thread(completion->vdo, __func__); in resume_data_vio_pool()
1025 vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state)); in resume_data_vio_pool()
1028 static void dump_limiter(const char *name, struct limiter *limiter) in dump_limiter() argument
1030 vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy, in dump_limiter()
1031 limiter->limit, limiter->max_busy, in dump_limiter()
1032 ((bio_list_empty(&limiter->waiters) && in dump_limiter()
1033 bio_list_empty(&limiter->new_waiters)) ? in dump_limiter()
1038 * dump_data_vio_pool() - Dump a data_vio pool to the log.
1054 spin_lock(&pool->lock); in dump_data_vio_pool()
1055 dump_limiter("data_vios", &pool->limiter); in dump_data_vio_pool()
1056 dump_limiter("discard permits", &pool->discard_limiter); in dump_data_vio_pool()
1061 for (i = 0; i < pool->limiter.limit; i++) { in dump_data_vio_pool()
1062 struct data_vio *data_vio = &pool->data_vios[i]; in dump_data_vio_pool()
1064 if (!list_empty(&data_vio->pool_entry)) in dump_data_vio_pool()
1069 spin_unlock(&pool->lock); in dump_data_vio_pool()
1072 spin_lock(&pool->lock); in dump_data_vio_pool()
1077 spin_unlock(&pool->lock); in dump_data_vio_pool()
1082 return READ_ONCE(pool->limiter.busy); in get_data_vio_pool_active_requests()
1087 return READ_ONCE(pool->limiter.limit); in get_data_vio_pool_request_limit()
1092 return READ_ONCE(pool->limiter.max_busy); in get_data_vio_pool_maximum_requests()
1102 [3] = "read-modify-write", in update_data_vio_error_stats()
1105 [7] = "read-modify-write+fua", in update_data_vio_error_stats()
1108 if (data_vio->read) in update_data_vio_error_stats()
1111 if (data_vio->write) in update_data_vio_error_stats()
1114 if (data_vio->fua) in update_data_vio_error_stats()
1117 update_vio_error_stats(&data_vio->vio, in update_data_vio_error_stats()
1120 (unsigned long long) data_vio->logical.lbn, in update_data_vio_error_stats()
1128 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1141 /** release_lock() - Release an uncontended LBN lock. */
1144 struct int_map *lock_map = lock->zone->lbn_operations; in release_lock()
1147 if (!lock->locked) { in release_lock()
1149 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn); in release_lock()
1153 (unsigned long long) lock->lbn); in release_lock()
1158 lock_holder = vdo_int_map_remove(lock_map, lock->lbn); in release_lock()
1161 (unsigned long long) lock->lbn); in release_lock()
1162 lock->locked = false; in release_lock()
1165 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
1171 VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked"); in transfer_lock()
1175 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters)); in transfer_lock()
1178 vdo_waitq_transfer_all_waiters(&lock->waiters, in transfer_lock()
1179 &next_lock_holder->logical.waiters); in transfer_lock()
1181 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, in transfer_lock()
1190 (unsigned long long) lock->lbn); in transfer_lock()
1191 lock->locked = false; in transfer_lock()
1197 if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters)) in transfer_lock()
1204 next_lock_holder->vio.completion.requeue = true; in transfer_lock()
1209 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1216 struct lbn_lock *lock = &data_vio->logical; in release_logical_lock()
1220 if (vdo_waitq_has_waiters(&lock->waiters)) in release_logical_lock()
1229 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
1235 if (completion->result != VDO_SUCCESS) { in clean_hash_lock()
1245 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1248 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1253 struct vdo_completion *completion = &data_vio->vio.completion; in finish_cleanup()
1254 u32 discard_size = min_t(u32, data_vio->remaining_discard, in finish_cleanup()
1255 VDO_BLOCK_SIZE - data_vio->offset); in finish_cleanup()
1257 VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL, in finish_cleanup()
1259 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL, in finish_cleanup()
1261 if ((data_vio->remaining_discard <= discard_size) || in finish_cleanup()
1262 (completion->result != VDO_SUCCESS)) { in finish_cleanup()
1263 struct data_vio_pool *pool = completion->vdo->data_vio_pool; in finish_cleanup()
1265 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link); in finish_cleanup()
1270 data_vio->remaining_discard -= discard_size; in finish_cleanup()
1271 data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE); in finish_cleanup()
1272 data_vio->read = data_vio->is_partial; in finish_cleanup()
1273 data_vio->offset = 0; in finish_cleanup()
1274 completion->requeue = true; in finish_cleanup()
1275 data_vio->first_reference_operation_complete = false; in finish_cleanup()
1276 launch_data_vio(data_vio, data_vio->logical.lbn + 1); in finish_cleanup()
1279 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
1287 if (data_vio->hash_lock != NULL) { in perform_cleanup_stage()
1302 if ((data_vio->recovery_sequence_number > 0) && in perform_cleanup_stage()
1303 (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) && in perform_cleanup_stage()
1304 (data_vio->vio.completion.result != VDO_READ_ONLY)) in perform_cleanup_stage()
1305 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock"); in perform_cleanup_stage()
1321 completion->error_handler = NULL; in complete_data_vio()
1322 data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP; in complete_data_vio()
1324 (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL)); in complete_data_vio()
1329 if (vdo_is_read_only(completion->vdo)) in enter_read_only_mode()
1332 if (completion->result != VDO_READ_ONLY) { in enter_read_only_mode()
1335 vdo_log_error_strerror(completion->result, in enter_read_only_mode()
1336 …"Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously map… in enter_read_only_mode()
1337 (unsigned long long) data_vio->logical.lbn, in enter_read_only_mode()
1338 (unsigned long long) data_vio->new_mapped.pbn, in enter_read_only_mode()
1339 (unsigned long long) data_vio->mapped.pbn, in enter_read_only_mode()
1340 (unsigned long long) data_vio->allocation.pbn, in enter_read_only_mode()
1344 vdo_enter_read_only_mode(completion->vdo, completion->result); in enter_read_only_mode()
1351 if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL)) in handle_data_vio_error()
1359 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1365 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) != in get_data_vio_operation_name()
1368 return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ? in get_data_vio_operation_name()
1369 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] : in get_data_vio_operation_name()
1374 * data_vio_allocate_data_block() - Allocate a data block.
1377 * @callback: The callback which will attempt an allocation in the current zone and continue if it
1385 struct allocation *allocation = &data_vio->allocation; in data_vio_allocate_data_block()
1387 VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK), in data_vio_allocate_data_block()
1389 allocation->write_lock_type = write_lock_type; in data_vio_allocate_data_block()
1390 allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone); in data_vio_allocate_data_block()
1391 allocation->first_allocation_zone = allocation->zone->zone_number; in data_vio_allocate_data_block()
1393 data_vio->vio.completion.error_handler = error_handler; in data_vio_allocate_data_block()
1398 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1406 struct allocation *allocation = &data_vio->allocation; in release_data_vio_allocation_lock()
1407 physical_block_number_t locked_pbn = allocation->pbn; in release_data_vio_allocation_lock()
1411 if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock)) in release_data_vio_allocation_lock()
1412 allocation->pbn = VDO_ZERO_BLOCK; in release_data_vio_allocation_lock()
1414 vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn, in release_data_vio_allocation_lock()
1415 vdo_forget(allocation->lock)); in release_data_vio_allocation_lock()
1419 * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1429 struct compressed_block *block = data_vio->compression.block; in uncompress_data_vio()
1438 size = LZ4_decompress_safe((block->data + fragment_offset), buffer, in uncompress_data_vio()
1449 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1457 char *data = data_vio->vio.data; in modify_for_partial_write()
1458 struct bio *bio = data_vio->user_bio; in modify_for_partial_write()
1463 memset(data + data_vio->offset, '\0', min_t(u32, in modify_for_partial_write()
1464 data_vio->remaining_discard, in modify_for_partial_write()
1465 VDO_BLOCK_SIZE - data_vio->offset)); in modify_for_partial_write()
1467 copy_from_bio(bio, data + data_vio->offset); in modify_for_partial_write()
1470 data_vio->is_zero = mem_is_zero(data, VDO_BLOCK_SIZE); in modify_for_partial_write()
1471 data_vio->read = false; in modify_for_partial_write()
1479 char *data = data_vio->vio.data; in complete_read()
1480 bool compressed = vdo_is_state_compressed(data_vio->mapped.state); in complete_read()
1485 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data); in complete_read()
1493 if (data_vio->write) { in complete_read()
1498 if (compressed || data_vio->is_partial) in complete_read()
1499 copy_to_bio(data_vio->user_bio, data + data_vio->offset); in complete_read()
1507 struct data_vio *data_vio = vio_as_data_vio(bio->bi_private); in read_endio()
1508 int result = blk_status_to_errno(bio->bi_status); in read_endio()
1526 if (data_vio->is_partial) { in complete_zero_read()
1527 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE); in complete_zero_read()
1528 if (data_vio->write) { in complete_zero_read()
1533 zero_fill_bio(data_vio->user_bio); in complete_zero_read()
1540 * read_block() - Read a block asynchronously.
1551 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { in read_block()
1557 data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO; in read_block()
1558 if (vdo_is_state_compressed(data_vio->mapped.state)) { in read_block()
1559 result = vio_reset_bio(vio, (char *) data_vio->compression.block, in read_block()
1560 read_endio, REQ_OP_READ, data_vio->mapped.pbn); in read_block()
1562 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ); in read_block()
1564 if (data_vio->is_partial) { in read_block()
1565 result = vio_reset_bio(vio, vio->data, read_endio, opf, in read_block()
1566 data_vio->mapped.pbn); in read_block()
1569 bio_reset(vio->bio, vio->bio->bi_bdev, opf); in read_block()
1570 bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio, in read_block()
1571 data_vio->user_bio, GFP_KERNEL); in read_block()
1574 vdo_set_bio_properties(vio->bio, vio, read_endio, opf, in read_block()
1575 data_vio->mapped.pbn); in read_block()
1590 if (completion->type == VIO_COMPLETION) in reference_count_update_completion_as_data_vio()
1597 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1608 if (!data_vio->first_reference_operation_complete) { in update_block_map()
1610 data_vio->first_reference_operation_complete = true; in update_block_map()
1614 completion = &data_vio->vio.completion; in update_block_map()
1615 vdo_set_completion_result(completion, data_vio->decrement_completion.result); in update_block_map()
1616 if (completion->result != VDO_SUCCESS) { in update_block_map()
1621 completion->error_handler = handle_data_vio_error; in update_block_map()
1622 if (data_vio->hash_lock != NULL) in update_block_map()
1625 completion->callback = complete_data_vio; in update_block_map()
1627 data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK; in update_block_map()
1639 data_vio->logical.zone->thread_id); in decrement_reference_count()
1640 completion->error_handler = update_block_map; in decrement_reference_count()
1641 vdo_modify_reference_count(completion, &data_vio->decrement_updater); in decrement_reference_count()
1650 if (data_vio->downgrade_allocation_lock) { in increment_reference_count()
1658 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false); in increment_reference_count()
1662 completion->error_handler = update_block_map; in increment_reference_count()
1663 vdo_modify_reference_count(completion, &data_vio->increment_updater); in increment_reference_count()
1666 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
1673 data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING; in journal_remapping()
1674 data_vio->decrement_updater.zpbn = data_vio->mapped; in journal_remapping()
1675 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { in journal_remapping()
1676 data_vio->first_reference_operation_complete = true; in journal_remapping()
1677 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) in journal_remapping()
1684 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { in journal_remapping()
1685 data_vio->first_reference_operation_complete = true; in journal_remapping()
1687 vdo_set_completion_callback(&data_vio->decrement_completion, in journal_remapping()
1689 data_vio->mapped.zone->thread_id); in journal_remapping()
1692 data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING; in journal_remapping()
1693 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); in journal_remapping()
1697 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1701 * journal entry referencing the removal of this LBN->PBN mapping.
1709 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE; in read_old_block_mapping()
1716 data_vio->increment_updater = (struct reference_updater) { in update_metadata_for_data_vio_write()
1719 .zpbn = data_vio->new_mapped, in update_metadata_for_data_vio_write()
1727 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1744 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING; in pack_compressed_data()
1749 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1765 size = LZ4_compress_default(data_vio->vio.data, in compress_data_vio()
1766 data_vio->compression.block->data, VDO_BLOCK_SIZE, in compress_data_vio()
1770 data_vio->compression.size = size; in compress_data_vio()
1779 * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1782 * This is a re-entry point to vio_write used by hash locks.
1786 VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block"); in launch_compress_data_vio()
1787 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL, in launch_compress_data_vio()
1807 if (data_vio->fua || in launch_compress_data_vio()
1809 ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) || in launch_compress_data_vio()
1815 data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO; in launch_compress_data_vio()
1821 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1832 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed"); in hash_data_vio()
1834 murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be, in hash_data_vio()
1835 &data_vio->record_name); in hash_data_vio()
1837 data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones, in hash_data_vio()
1838 &data_vio->record_name); in hash_data_vio()
1839 data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK; in hash_data_vio()
1843 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
1847 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks"); in prepare_for_dedupe()
1853 data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO; in prepare_for_dedupe()
1858 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1864 struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private); in write_bio_finished()
1867 vdo_set_completion_result(&data_vio->vio.completion, in write_bio_finished()
1868 blk_status_to_errno(bio->bi_status)); in write_bio_finished()
1869 data_vio->downgrade_allocation_lock = true; in write_bio_finished()
1870 update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock); in write_bio_finished()
1873 /** write_data_vio() - Write a data block to storage without compression. */
1899 result = vio_reset_bio(&data_vio->vio, data_vio->vio.data, in write_data_vio()
1901 data_vio->allocation.pbn); in write_data_vio()
1907 data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO; in write_data_vio()
1912 * acknowledge_write_callback() - Acknowledge a write to the requestor.
1920 struct vdo *vdo = completion->vdo; in acknowledge_write_callback()
1923 (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)), in acknowledge_write_callback()
1928 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { in acknowledge_write_callback()
1938 * allocate_block() - Attempt to allocate a block in the current allocation zone.
1952 completion->error_handler = handle_data_vio_error; in allocate_block()
1953 WRITE_ONCE(data_vio->allocation_succeeded, true); in allocate_block()
1954 data_vio->new_mapped = (struct zoned_pbn) { in allocate_block()
1955 .zone = data_vio->allocation.zone, in allocate_block()
1956 .pbn = data_vio->allocation.pbn, in allocate_block()
1960 if (data_vio->fua || in allocate_block()
1961 data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) { in allocate_block()
1966 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; in allocate_block()
1971 * handle_allocation_error() - Handle an error attempting to allocate a block.
1980 if (completion->result == VDO_NO_SPACE) { in handle_allocation_error()
1983 completion->error_handler = handle_data_vio_error; in handle_allocation_error()
1994 int result = VDO_ASSERT(data_vio->is_discard, in assert_is_discard()
2001 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
2011 if (data_vio->read) { in continue_data_vio_with_block_map_slot()
2013 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ; in continue_data_vio_with_block_map_slot()
2020 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { in continue_data_vio_with_block_map_slot()
2025 completion->callback = complete_data_vio; in continue_data_vio_with_block_map_slot()
2031 * We need an allocation if this is neither a full-block discard nor a in continue_data_vio_with_block_map_slot()
2032 * full-block zero write. in continue_data_vio_with_block_map_slot()
2034 if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) { in continue_data_vio_with_block_map_slot()
2044 data_vio->new_mapped.pbn = VDO_ZERO_BLOCK; in continue_data_vio_with_block_map_slot()
2045 if (data_vio->is_zero) in continue_data_vio_with_block_map_slot()
2046 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED; in continue_data_vio_with_block_map_slot()
2048 if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) { in continue_data_vio_with_block_map_slot()
2054 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; in continue_data_vio_with_block_map_slot()