xref: /linux/drivers/md/dm-vdo/data-vio.c (revision 90d32e92011eaae8e70a9169b4e7acf4ca8f9d3a)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "data-vio.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/blkdev.h>
11 #include <linux/delay.h>
12 #include <linux/device-mapper.h>
13 #include <linux/jiffies.h>
14 #include <linux/kernel.h>
15 #include <linux/list.h>
16 #include <linux/lz4.h>
17 #include <linux/minmax.h>
18 #include <linux/sched.h>
19 #include <linux/spinlock.h>
20 #include <linux/wait.h>
21 
22 #include "logger.h"
23 #include "memory-alloc.h"
24 #include "murmurhash3.h"
25 #include "permassert.h"
26 
27 #include "block-map.h"
28 #include "dump.h"
29 #include "encodings.h"
30 #include "int-map.h"
31 #include "io-submitter.h"
32 #include "logical-zone.h"
33 #include "packer.h"
34 #include "recovery-journal.h"
35 #include "slab-depot.h"
36 #include "status-codes.h"
37 #include "types.h"
38 #include "vdo.h"
39 #include "vio.h"
40 #include "wait-queue.h"
41 
42 /**
43  * DOC: Bio flags.
44  *
45  * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
46  * flags on our own bio(s) for that request may help underlying layers better fulfill the user
47  * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
48  * flags, as they convey incorrect information.
49  *
50  * These flags are always irrelevant if we have already finished the user bio as they are only
51  * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
52  * important finishing the finished bio was.
53  *
54  * Note that bio.c contains the complete list of flags we believe may be set; the following list
55  * explains the action taken with each of those flags VDO could receive:
56  *
57  * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
58  *   completion is required for further work to be done by the issuer.
59  * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
60  *   treats it as more urgent, similar to REQ_SYNC.
61  * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
62  *   important.
63  * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
64  * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
65  *   match incoming IO, so this flag is incorrect for it.
66  * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
67  * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
68  * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
69  *   ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
70  *   prioritization.
71  */
72 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
73 
74 /**
75  * DOC:
76  *
77  * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
78  * correctness, and in order to avoid potentially expensive or blocking memory allocations during
79  * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
80  * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
81  * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
82  * for which a data_vio or discard permit are not available will block until the necessary
83  * resources are available. The pool is also responsible for distributing resources to blocked
84  * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
85  * performing the work of actually assigning resources to blocked threads or placing data_vios back
86  * into the pool on a single cpu at a time.
87  *
88  * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
89  * permits. The limiters also provide safe cross-thread access to pool statistics without the need
90  * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
91  * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
92  * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
93  * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
94  * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
95  * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
96  * break if jiffies are only 32 bits.)
97  *
98  * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
99  * will be called on it. This function will add the data_vio to a funnel queue, and then check the
100  * state of the pool. If the pool is not currently processing released data_vios, the pool's
101  * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
102  * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
103  * threads.
104  *
105  * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
106  * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
107  * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
108  * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
109  * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
110  * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
111  * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
112  * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
113  * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
114  * them are awakened.
115  */
116 
117 #define DATA_VIO_RELEASE_BATCH_SIZE 128
118 
119 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
120 static const u32 COMPRESSION_STATUS_MASK = 0xff;
121 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
122 
123 struct limiter;
124 typedef void (*assigner_fn)(struct limiter *limiter);
125 
126 /* Bookkeeping structure for a single type of resource. */
127 struct limiter {
128 	/* The data_vio_pool to which this limiter belongs */
129 	struct data_vio_pool *pool;
130 	/* The maximum number of data_vios available */
131 	data_vio_count_t limit;
132 	/* The number of resources in use */
133 	data_vio_count_t busy;
134 	/* The maximum number of resources ever simultaneously in use */
135 	data_vio_count_t max_busy;
136 	/* The number of resources to release */
137 	data_vio_count_t release_count;
138 	/* The number of waiters to wake */
139 	data_vio_count_t wake_count;
140 	/* The list of waiting bios which are known to process_release_callback() */
141 	struct bio_list waiters;
142 	/* The list of waiting bios which are not yet known to process_release_callback() */
143 	struct bio_list new_waiters;
144 	/* The list of waiters which have their permits */
145 	struct bio_list *permitted_waiters;
146 	/* The function for assigning a resource to a waiter */
147 	assigner_fn assigner;
148 	/* The queue of blocked threads */
149 	wait_queue_head_t blocked_threads;
150 	/* The arrival time of the eldest waiter */
151 	u64 arrival;
152 };
153 
154 /*
155  * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
156  * and are released in batches.
157  */
158 struct data_vio_pool {
159 	/* Completion for scheduling releases */
160 	struct vdo_completion completion;
161 	/* The administrative state of the pool */
162 	struct admin_state state;
163 	/* Lock protecting the pool */
164 	spinlock_t lock;
165 	/* The main limiter controlling the total data_vios in the pool. */
166 	struct limiter limiter;
167 	/* The limiter controlling data_vios for discard */
168 	struct limiter discard_limiter;
169 	/* The list of bios which have discard permits but still need a data_vio */
170 	struct bio_list permitted_discards;
171 	/* The list of available data_vios */
172 	struct list_head available;
173 	/* The queue of data_vios waiting to be returned to the pool */
174 	struct funnel_queue *queue;
175 	/* Whether the pool is processing, or scheduled to process releases */
176 	atomic_t processing;
177 	/* The data vios in the pool */
178 	struct data_vio data_vios[];
179 };
180 
181 static const char * const ASYNC_OPERATION_NAMES[] = {
182 	"launch",
183 	"acknowledge_write",
184 	"acquire_hash_lock",
185 	"attempt_logical_block_lock",
186 	"lock_duplicate_pbn",
187 	"check_for_duplication",
188 	"cleanup",
189 	"compress_data_vio",
190 	"find_block_map_slot",
191 	"get_mapped_block_for_read",
192 	"get_mapped_block_for_write",
193 	"hash_data_vio",
194 	"journal_remapping",
195 	"vdo_attempt_packing",
196 	"put_mapped_block",
197 	"read_data_vio",
198 	"update_dedupe_index",
199 	"update_reference_counts",
200 	"verify_duplication",
201 	"write_data_vio",
202 };
203 
204 /* The steps taken cleaning up a VIO, in the order they are performed. */
205 enum data_vio_cleanup_stage {
206 	VIO_CLEANUP_START,
207 	VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
208 	VIO_RELEASE_ALLOCATED,
209 	VIO_RELEASE_RECOVERY_LOCKS,
210 	VIO_RELEASE_LOGICAL,
211 	VIO_CLEANUP_DONE
212 };
213 
214 static inline struct data_vio_pool * __must_check
215 as_data_vio_pool(struct vdo_completion *completion)
216 {
217 	vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
218 	return container_of(completion, struct data_vio_pool, completion);
219 }
220 
221 static inline u64 get_arrival_time(struct bio *bio)
222 {
223 	return (u64) bio->bi_private;
224 }
225 
226 /**
227  * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
228  *				       or waiters while holding the pool's lock.
229  */
230 static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
231 {
232 	if (pool->limiter.busy > 0)
233 		return false;
234 
235 	VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
236 			    "no outstanding discard permits");
237 
238 	return (bio_list_empty(&pool->limiter.new_waiters) &&
239 		bio_list_empty(&pool->discard_limiter.new_waiters));
240 }
241 
242 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
243 {
244 	struct vdo *vdo = vdo_from_data_vio(data_vio);
245 	zone_count_t zone_number;
246 	struct lbn_lock *lock = &data_vio->logical;
247 
248 	lock->lbn = lbn;
249 	lock->locked = false;
250 	vdo_waitq_init(&lock->waiters);
251 	zone_number = vdo_compute_logical_zone(data_vio);
252 	lock->zone = &vdo->logical_zones->zones[zone_number];
253 }
254 
255 static void launch_locked_request(struct data_vio *data_vio)
256 {
257 	data_vio->logical.locked = true;
258 	if (data_vio->write) {
259 		struct vdo *vdo = vdo_from_data_vio(data_vio);
260 
261 		if (vdo_is_read_only(vdo)) {
262 			continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
263 			return;
264 		}
265 	}
266 
267 	data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
268 	vdo_find_block_map_slot(data_vio);
269 }
270 
271 static void acknowledge_data_vio(struct data_vio *data_vio)
272 {
273 	struct vdo *vdo = vdo_from_data_vio(data_vio);
274 	struct bio *bio = data_vio->user_bio;
275 	int error = vdo_status_to_errno(data_vio->vio.completion.result);
276 
277 	if (bio == NULL)
278 		return;
279 
280 	VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
281 			     (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
282 			    "data_vio to acknowledge is not an incomplete discard");
283 
284 	data_vio->user_bio = NULL;
285 	vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
286 	if (data_vio->is_partial)
287 		vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
288 
289 	bio->bi_status = errno_to_blk_status(error);
290 	bio_endio(bio);
291 }
292 
293 static void copy_to_bio(struct bio *bio, char *data_ptr)
294 {
295 	struct bio_vec biovec;
296 	struct bvec_iter iter;
297 
298 	bio_for_each_segment(biovec, bio, iter) {
299 		memcpy_to_bvec(&biovec, data_ptr);
300 		data_ptr += biovec.bv_len;
301 	}
302 }
303 
304 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
305 {
306 	u32 packed = atomic_read(&data_vio->compression.status);
307 
308 	/* pairs with cmpxchg in set_data_vio_compression_status */
309 	smp_rmb();
310 	return (struct data_vio_compression_status) {
311 		.stage = packed & COMPRESSION_STATUS_MASK,
312 		.may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
313 	};
314 }
315 
316 /**
317  * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
318  *                 atomically.
319  * @status: The state to convert.
320  *
321  * Return: The compression state packed into a u32.
322  */
323 static u32 __must_check pack_status(struct data_vio_compression_status status)
324 {
325 	return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
326 }
327 
328 /**
329  * set_data_vio_compression_status() - Set the compression status of a data_vio.
330  * @state: The expected current status of the data_vio.
331  * @new_state: The status to set.
332  *
333  * Return: true if the new status was set, false if the data_vio's compression status did not
334  *         match the expected state, and so was left unchanged.
335  */
336 static bool __must_check
337 set_data_vio_compression_status(struct data_vio *data_vio,
338 				struct data_vio_compression_status status,
339 				struct data_vio_compression_status new_status)
340 {
341 	u32 actual;
342 	u32 expected = pack_status(status);
343 	u32 replacement = pack_status(new_status);
344 
345 	/*
346 	 * Extra barriers because this was original developed using a CAS operation that implicitly
347 	 * had them.
348 	 */
349 	smp_mb__before_atomic();
350 	actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
351 	/* same as before_atomic */
352 	smp_mb__after_atomic();
353 	return (expected == actual);
354 }
355 
356 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
357 {
358 	for (;;) {
359 		struct data_vio_compression_status status =
360 			get_data_vio_compression_status(data_vio);
361 		struct data_vio_compression_status new_status = status;
362 
363 		if (status.stage == DATA_VIO_POST_PACKER) {
364 			/* We're already in the last stage. */
365 			return status;
366 		}
367 
368 		if (status.may_not_compress) {
369 			/*
370 			 * Compression has been dis-allowed for this VIO, so skip the rest of the
371 			 * path and go to the end.
372 			 */
373 			new_status.stage = DATA_VIO_POST_PACKER;
374 		} else {
375 			/* Go to the next state. */
376 			new_status.stage++;
377 		}
378 
379 		if (set_data_vio_compression_status(data_vio, status, new_status))
380 			return new_status;
381 
382 		/* Another thread changed the status out from under us so try again. */
383 	}
384 }
385 
386 /**
387  * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
388  *
389  * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
390  */
391 bool cancel_data_vio_compression(struct data_vio *data_vio)
392 {
393 	struct data_vio_compression_status status, new_status;
394 
395 	for (;;) {
396 		status = get_data_vio_compression_status(data_vio);
397 		if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
398 			/* This data_vio is already set up to not block in the packer. */
399 			break;
400 		}
401 
402 		new_status.stage = status.stage;
403 		new_status.may_not_compress = true;
404 
405 		if (set_data_vio_compression_status(data_vio, status, new_status))
406 			break;
407 	}
408 
409 	return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
410 }
411 
412 /**
413  * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
414  * @completion: The data_vio for an external data request as a completion.
415  *
416  * This is the start of the path for all external requests. It is registered in launch_data_vio().
417  */
418 static void attempt_logical_block_lock(struct vdo_completion *completion)
419 {
420 	struct data_vio *data_vio = as_data_vio(completion);
421 	struct lbn_lock *lock = &data_vio->logical;
422 	struct vdo *vdo = vdo_from_data_vio(data_vio);
423 	struct data_vio *lock_holder;
424 	int result;
425 
426 	assert_data_vio_in_logical_zone(data_vio);
427 
428 	if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
429 		continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
430 		return;
431 	}
432 
433 	result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
434 				 data_vio, false, (void **) &lock_holder);
435 	if (result != VDO_SUCCESS) {
436 		continue_data_vio_with_error(data_vio, result);
437 		return;
438 	}
439 
440 	if (lock_holder == NULL) {
441 		/* We got the lock */
442 		launch_locked_request(data_vio);
443 		return;
444 	}
445 
446 	result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
447 	if (result != VDO_SUCCESS) {
448 		continue_data_vio_with_error(data_vio, result);
449 		return;
450 	}
451 
452 	/*
453 	 * If the new request is a pure read request (not read-modify-write) and the lock_holder is
454 	 * writing and has received an allocation, service the read request immediately by copying
455 	 * data from the lock_holder to avoid having to flush the write out of the packer just to
456 	 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
457 	 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
458 	 * order to prevent returning data that may not have actually been written.
459 	 */
460 	if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
461 		copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
462 		acknowledge_data_vio(data_vio);
463 		complete_data_vio(completion);
464 		return;
465 	}
466 
467 	data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
468 	vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
469 
470 	/*
471 	 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
472 	 * packer.
473 	 */
474 	if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
475 		data_vio->compression.lock_holder = lock_holder;
476 		launch_data_vio_packer_callback(data_vio,
477 						vdo_remove_lock_holder_from_packer);
478 	}
479 }
480 
481 /**
482  * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
483  *		       same parent and other state and send it on its way.
484  */
485 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
486 {
487 	struct vdo_completion *completion = &data_vio->vio.completion;
488 
489 	/*
490 	 * Clearing the tree lock must happen before initializing the LBN lock, which also adds
491 	 * information to the tree lock.
492 	 */
493 	memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
494 	initialize_lbn_lock(data_vio, lbn);
495 	INIT_LIST_HEAD(&data_vio->hash_lock_entry);
496 	INIT_LIST_HEAD(&data_vio->write_entry);
497 
498 	memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
499 
500 	data_vio->is_duplicate = false;
501 
502 	memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
503 	memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
504 	vdo_reset_completion(completion);
505 	completion->error_handler = handle_data_vio_error;
506 	set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
507 	vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
508 }
509 
510 static bool is_zero_block(char *block)
511 {
512 	int i;
513 
514 	for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
515 		if (*((u64 *) &block[i]))
516 			return false;
517 	}
518 
519 	return true;
520 }
521 
522 static void copy_from_bio(struct bio *bio, char *data_ptr)
523 {
524 	struct bio_vec biovec;
525 	struct bvec_iter iter;
526 
527 	bio_for_each_segment(biovec, bio, iter) {
528 		memcpy_from_bvec(data_ptr, &biovec);
529 		data_ptr += biovec.bv_len;
530 	}
531 }
532 
533 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
534 {
535 	logical_block_number_t lbn;
536 	/*
537 	 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
538 	 * separately allocated objects).
539 	 */
540 	memset(data_vio, 0, offsetof(struct data_vio, vio));
541 	memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
542 
543 	data_vio->user_bio = bio;
544 	data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
545 	data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
546 
547 	/*
548 	 * Discards behave very differently than other requests when coming in from device-mapper.
549 	 * We have to be able to handle any size discards and various sector offsets within a
550 	 * block.
551 	 */
552 	if (bio_op(bio) == REQ_OP_DISCARD) {
553 		data_vio->remaining_discard = bio->bi_iter.bi_size;
554 		data_vio->write = true;
555 		data_vio->is_discard = true;
556 		if (data_vio->is_partial) {
557 			vdo_count_bios(&vdo->stats.bios_in_partial, bio);
558 			data_vio->read = true;
559 		}
560 	} else if (data_vio->is_partial) {
561 		vdo_count_bios(&vdo->stats.bios_in_partial, bio);
562 		data_vio->read = true;
563 		if (bio_data_dir(bio) == WRITE)
564 			data_vio->write = true;
565 	} else if (bio_data_dir(bio) == READ) {
566 		data_vio->read = true;
567 	} else {
568 		/*
569 		 * Copy the bio data to a char array so that we can continue to use the data after
570 		 * we acknowledge the bio.
571 		 */
572 		copy_from_bio(bio, data_vio->vio.data);
573 		data_vio->is_zero = is_zero_block(data_vio->vio.data);
574 		data_vio->write = true;
575 	}
576 
577 	if (data_vio->user_bio->bi_opf & REQ_FUA)
578 		data_vio->fua = true;
579 
580 	lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
581 	launch_data_vio(data_vio, lbn);
582 }
583 
584 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
585 {
586 	struct bio *bio = bio_list_pop(limiter->permitted_waiters);
587 
588 	launch_bio(limiter->pool->completion.vdo, data_vio, bio);
589 	limiter->wake_count++;
590 
591 	bio = bio_list_peek(limiter->permitted_waiters);
592 	limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
593 }
594 
595 static void assign_discard_permit(struct limiter *limiter)
596 {
597 	struct bio *bio = bio_list_pop(&limiter->waiters);
598 
599 	if (limiter->arrival == U64_MAX)
600 		limiter->arrival = get_arrival_time(bio);
601 
602 	bio_list_add(limiter->permitted_waiters, bio);
603 }
604 
605 static void get_waiters(struct limiter *limiter)
606 {
607 	bio_list_merge_init(&limiter->waiters, &limiter->new_waiters);
608 }
609 
610 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
611 {
612 	struct data_vio *data_vio =
613 		list_first_entry(&pool->available, struct data_vio, pool_entry);
614 
615 	list_del_init(&data_vio->pool_entry);
616 	return data_vio;
617 }
618 
619 static void assign_data_vio_to_waiter(struct limiter *limiter)
620 {
621 	assign_data_vio(limiter, get_available_data_vio(limiter->pool));
622 }
623 
624 static void update_limiter(struct limiter *limiter)
625 {
626 	struct bio_list *waiters = &limiter->waiters;
627 	data_vio_count_t available = limiter->limit - limiter->busy;
628 
629 	VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
630 			    "Release count %u is not more than busy count %u",
631 			    limiter->release_count, limiter->busy);
632 
633 	get_waiters(limiter);
634 	for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
635 		limiter->assigner(limiter);
636 
637 	if (limiter->release_count > 0) {
638 		WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
639 		limiter->release_count = 0;
640 		return;
641 	}
642 
643 	for (; (available > 0) && !bio_list_empty(waiters); available--)
644 		limiter->assigner(limiter);
645 
646 	WRITE_ONCE(limiter->busy, limiter->limit - available);
647 	if (limiter->max_busy < limiter->busy)
648 		WRITE_ONCE(limiter->max_busy, limiter->busy);
649 }
650 
651 /**
652  * schedule_releases() - Ensure that release processing is scheduled.
653  *
654  * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
655  * done so.
656  */
657 static void schedule_releases(struct data_vio_pool *pool)
658 {
659 	/* Pairs with the barrier in process_release_callback(). */
660 	smp_mb__before_atomic();
661 	if (atomic_cmpxchg(&pool->processing, false, true))
662 		return;
663 
664 	pool->completion.requeue = true;
665 	vdo_launch_completion_with_priority(&pool->completion,
666 					    CPU_Q_COMPLETE_VIO_PRIORITY);
667 }
668 
669 static void reuse_or_release_resources(struct data_vio_pool *pool,
670 				       struct data_vio *data_vio,
671 				       struct list_head *returned)
672 {
673 	if (data_vio->remaining_discard > 0) {
674 		if (bio_list_empty(&pool->discard_limiter.waiters)) {
675 			/* Return the data_vio's discard permit. */
676 			pool->discard_limiter.release_count++;
677 		} else {
678 			assign_discard_permit(&pool->discard_limiter);
679 		}
680 	}
681 
682 	if (pool->limiter.arrival < pool->discard_limiter.arrival) {
683 		assign_data_vio(&pool->limiter, data_vio);
684 	} else if (pool->discard_limiter.arrival < U64_MAX) {
685 		assign_data_vio(&pool->discard_limiter, data_vio);
686 	} else {
687 		list_add(&data_vio->pool_entry, returned);
688 		pool->limiter.release_count++;
689 	}
690 }
691 
692 /**
693  * process_release_callback() - Process a batch of data_vio releases.
694  * @completion: The pool with data_vios to release.
695  */
696 static void process_release_callback(struct vdo_completion *completion)
697 {
698 	struct data_vio_pool *pool = as_data_vio_pool(completion);
699 	bool reschedule;
700 	bool drained;
701 	data_vio_count_t processed;
702 	data_vio_count_t to_wake;
703 	data_vio_count_t discards_to_wake;
704 	LIST_HEAD(returned);
705 
706 	spin_lock(&pool->lock);
707 	get_waiters(&pool->discard_limiter);
708 	get_waiters(&pool->limiter);
709 	spin_unlock(&pool->lock);
710 
711 	if (pool->limiter.arrival == U64_MAX) {
712 		struct bio *bio = bio_list_peek(&pool->limiter.waiters);
713 
714 		if (bio != NULL)
715 			pool->limiter.arrival = get_arrival_time(bio);
716 	}
717 
718 	for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
719 		struct data_vio *data_vio;
720 		struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
721 
722 		if (entry == NULL)
723 			break;
724 
725 		data_vio = as_data_vio(container_of(entry, struct vdo_completion,
726 						    work_queue_entry_link));
727 		acknowledge_data_vio(data_vio);
728 		reuse_or_release_resources(pool, data_vio, &returned);
729 	}
730 
731 	spin_lock(&pool->lock);
732 	/*
733 	 * There is a race where waiters could be added while we are in the unlocked section above.
734 	 * Those waiters could not see the resources we are now about to release, so we assign
735 	 * those resources now as we have no guarantee of being rescheduled. This is handled in
736 	 * update_limiter().
737 	 */
738 	update_limiter(&pool->discard_limiter);
739 	list_splice(&returned, &pool->available);
740 	update_limiter(&pool->limiter);
741 	to_wake = pool->limiter.wake_count;
742 	pool->limiter.wake_count = 0;
743 	discards_to_wake = pool->discard_limiter.wake_count;
744 	pool->discard_limiter.wake_count = 0;
745 
746 	atomic_set(&pool->processing, false);
747 	/* Pairs with the barrier in schedule_releases(). */
748 	smp_mb();
749 
750 	reschedule = !vdo_is_funnel_queue_empty(pool->queue);
751 	drained = (!reschedule &&
752 		   vdo_is_state_draining(&pool->state) &&
753 		   check_for_drain_complete_locked(pool));
754 	spin_unlock(&pool->lock);
755 
756 	if (to_wake > 0)
757 		wake_up_nr(&pool->limiter.blocked_threads, to_wake);
758 
759 	if (discards_to_wake > 0)
760 		wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
761 
762 	if (reschedule)
763 		schedule_releases(pool);
764 	else if (drained)
765 		vdo_finish_draining(&pool->state);
766 }
767 
768 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
769 			       assigner_fn assigner, data_vio_count_t limit)
770 {
771 	limiter->pool = pool;
772 	limiter->assigner = assigner;
773 	limiter->limit = limit;
774 	limiter->arrival = U64_MAX;
775 	init_waitqueue_head(&limiter->blocked_threads);
776 }
777 
778 /**
779  * initialize_data_vio() - Allocate the components of a data_vio.
780  *
781  * The caller is responsible for cleaning up the data_vio on error.
782  *
783  * Return: VDO_SUCCESS or an error.
784  */
785 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
786 {
787 	struct bio *bio;
788 	int result;
789 
790 	BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
791 	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
792 				     &data_vio->vio.data);
793 	if (result != VDO_SUCCESS)
794 		return vdo_log_error_strerror(result,
795 					      "data_vio data allocation failure");
796 
797 	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
798 				     &data_vio->compression.block);
799 	if (result != VDO_SUCCESS) {
800 		return vdo_log_error_strerror(result,
801 					      "data_vio compressed block allocation failure");
802 	}
803 
804 	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
805 				     &data_vio->scratch_block);
806 	if (result != VDO_SUCCESS)
807 		return vdo_log_error_strerror(result,
808 					      "data_vio scratch allocation failure");
809 
810 	result = vdo_create_bio(&bio);
811 	if (result != VDO_SUCCESS)
812 		return vdo_log_error_strerror(result,
813 					      "data_vio data bio allocation failure");
814 
815 	vdo_initialize_completion(&data_vio->decrement_completion, vdo,
816 				  VDO_DECREMENT_COMPLETION);
817 	initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
818 
819 	return VDO_SUCCESS;
820 }
821 
822 static void destroy_data_vio(struct data_vio *data_vio)
823 {
824 	if (data_vio == NULL)
825 		return;
826 
827 	vdo_free_bio(vdo_forget(data_vio->vio.bio));
828 	vdo_free(vdo_forget(data_vio->vio.data));
829 	vdo_free(vdo_forget(data_vio->compression.block));
830 	vdo_free(vdo_forget(data_vio->scratch_block));
831 }
832 
833 /**
834  * make_data_vio_pool() - Initialize a data_vio pool.
835  * @vdo: The vdo to which the pool will belong.
836  * @pool_size: The number of data_vios in the pool.
837  * @discard_limit: The maximum number of data_vios which may be used for discards.
838  * @pool: A pointer to hold the newly allocated pool.
839  */
840 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
841 		       data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
842 {
843 	int result;
844 	struct data_vio_pool *pool;
845 	data_vio_count_t i;
846 
847 	result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
848 				       __func__, &pool);
849 	if (result != VDO_SUCCESS)
850 		return result;
851 
852 	VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
853 			    "discard limit does not exceed pool size");
854 	initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
855 			   discard_limit);
856 	pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
857 	initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
858 	pool->limiter.permitted_waiters = &pool->limiter.waiters;
859 	INIT_LIST_HEAD(&pool->available);
860 	spin_lock_init(&pool->lock);
861 	vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
862 	vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
863 	vdo_prepare_completion(&pool->completion, process_release_callback,
864 			       process_release_callback, vdo->thread_config.cpu_thread,
865 			       NULL);
866 
867 	result = vdo_make_funnel_queue(&pool->queue);
868 	if (result != VDO_SUCCESS) {
869 		free_data_vio_pool(vdo_forget(pool));
870 		return result;
871 	}
872 
873 	for (i = 0; i < pool_size; i++) {
874 		struct data_vio *data_vio = &pool->data_vios[i];
875 
876 		result = initialize_data_vio(data_vio, vdo);
877 		if (result != VDO_SUCCESS) {
878 			destroy_data_vio(data_vio);
879 			free_data_vio_pool(pool);
880 			return result;
881 		}
882 
883 		list_add(&data_vio->pool_entry, &pool->available);
884 	}
885 
886 	*pool_ptr = pool;
887 	return VDO_SUCCESS;
888 }
889 
890 /**
891  * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
892  *
893  * All data_vios must be returned to the pool before calling this function.
894  */
895 void free_data_vio_pool(struct data_vio_pool *pool)
896 {
897 	struct data_vio *data_vio, *tmp;
898 
899 	if (pool == NULL)
900 		return;
901 
902 	/*
903 	 * Pairs with the barrier in process_release_callback(). Possibly not needed since it
904 	 * caters to an enqueue vs. free race.
905 	 */
906 	smp_mb();
907 	BUG_ON(atomic_read(&pool->processing));
908 
909 	spin_lock(&pool->lock);
910 	VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
911 			    "data_vio pool must not have %u busy entries when being freed",
912 			    pool->limiter.busy);
913 	VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
914 			     bio_list_empty(&pool->limiter.new_waiters)),
915 			    "data_vio pool must not have threads waiting to read or write when being freed");
916 	VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
917 			     bio_list_empty(&pool->discard_limiter.new_waiters)),
918 			    "data_vio pool must not have threads waiting to discard when being freed");
919 	spin_unlock(&pool->lock);
920 
921 	list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
922 		list_del_init(&data_vio->pool_entry);
923 		destroy_data_vio(data_vio);
924 	}
925 
926 	vdo_free_funnel_queue(vdo_forget(pool->queue));
927 	vdo_free(pool);
928 }
929 
930 static bool acquire_permit(struct limiter *limiter)
931 {
932 	if (limiter->busy >= limiter->limit)
933 		return false;
934 
935 	WRITE_ONCE(limiter->busy, limiter->busy + 1);
936 	if (limiter->max_busy < limiter->busy)
937 		WRITE_ONCE(limiter->max_busy, limiter->busy);
938 	return true;
939 }
940 
941 static void wait_permit(struct limiter *limiter, struct bio *bio)
942 	__releases(&limiter->pool->lock)
943 {
944 	DEFINE_WAIT(wait);
945 
946 	bio_list_add(&limiter->new_waiters, bio);
947 	prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
948 				  TASK_UNINTERRUPTIBLE);
949 	spin_unlock(&limiter->pool->lock);
950 	io_schedule();
951 	finish_wait(&limiter->blocked_threads, &wait);
952 }
953 
954 /**
955  * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
956  *
957  * This will block if data_vios or discard permits are not available.
958  */
959 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
960 {
961 	struct data_vio *data_vio;
962 
963 	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
964 			    "data_vio_pool not quiescent on acquire");
965 
966 	bio->bi_private = (void *) jiffies;
967 	spin_lock(&pool->lock);
968 	if ((bio_op(bio) == REQ_OP_DISCARD) &&
969 	    !acquire_permit(&pool->discard_limiter)) {
970 		wait_permit(&pool->discard_limiter, bio);
971 		return;
972 	}
973 
974 	if (!acquire_permit(&pool->limiter)) {
975 		wait_permit(&pool->limiter, bio);
976 		return;
977 	}
978 
979 	data_vio = get_available_data_vio(pool);
980 	spin_unlock(&pool->lock);
981 	launch_bio(pool->completion.vdo, data_vio, bio);
982 }
983 
984 /* Implements vdo_admin_initiator_fn. */
985 static void initiate_drain(struct admin_state *state)
986 {
987 	bool drained;
988 	struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
989 
990 	spin_lock(&pool->lock);
991 	drained = check_for_drain_complete_locked(pool);
992 	spin_unlock(&pool->lock);
993 
994 	if (drained)
995 		vdo_finish_draining(state);
996 }
997 
998 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
999 {
1000 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
1001 			    "%s called on cpu thread", name);
1002 }
1003 
1004 /**
1005  * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1006  * @completion: The completion to notify when the pool has drained.
1007  */
1008 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1009 {
1010 	assert_on_vdo_cpu_thread(completion->vdo, __func__);
1011 	vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
1012 			   initiate_drain);
1013 }
1014 
1015 /**
1016  * resume_data_vio_pool() - Resume a data_vio pool.
1017  * @completion: The completion to notify when the pool has resumed.
1018  */
1019 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1020 {
1021 	assert_on_vdo_cpu_thread(completion->vdo, __func__);
1022 	vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
1023 }
1024 
1025 static void dump_limiter(const char *name, struct limiter *limiter)
1026 {
1027 	vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
1028 		     limiter->limit, limiter->max_busy,
1029 		     ((bio_list_empty(&limiter->waiters) &&
1030 		       bio_list_empty(&limiter->new_waiters)) ?
1031 		      "no waiters" : "has waiters"));
1032 }
1033 
1034 /**
1035  * dump_data_vio_pool() - Dump a data_vio pool to the log.
1036  * @dump_vios: Whether to dump the details of each busy data_vio as well.
1037  */
1038 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
1039 {
1040 	/*
1041 	 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1042 	 * second clock tick).  These numbers were picked based on experiments with lab machines.
1043 	 */
1044 	static const int ELEMENTS_PER_BATCH = 35;
1045 	static const int SLEEP_FOR_SYSLOG = 4000;
1046 
1047 	if (pool == NULL)
1048 		return;
1049 
1050 	spin_lock(&pool->lock);
1051 	dump_limiter("data_vios", &pool->limiter);
1052 	dump_limiter("discard permits", &pool->discard_limiter);
1053 	if (dump_vios) {
1054 		int i;
1055 		int dumped = 0;
1056 
1057 		for (i = 0; i < pool->limiter.limit; i++) {
1058 			struct data_vio *data_vio = &pool->data_vios[i];
1059 
1060 			if (!list_empty(&data_vio->pool_entry))
1061 				continue;
1062 
1063 			dump_data_vio(data_vio);
1064 			if (++dumped >= ELEMENTS_PER_BATCH) {
1065 				spin_unlock(&pool->lock);
1066 				dumped = 0;
1067 				fsleep(SLEEP_FOR_SYSLOG);
1068 				spin_lock(&pool->lock);
1069 			}
1070 		}
1071 	}
1072 
1073 	spin_unlock(&pool->lock);
1074 }
1075 
1076 data_vio_count_t get_data_vio_pool_active_discards(struct data_vio_pool *pool)
1077 {
1078 	return READ_ONCE(pool->discard_limiter.busy);
1079 }
1080 
1081 data_vio_count_t get_data_vio_pool_discard_limit(struct data_vio_pool *pool)
1082 {
1083 	return READ_ONCE(pool->discard_limiter.limit);
1084 }
1085 
1086 data_vio_count_t get_data_vio_pool_maximum_discards(struct data_vio_pool *pool)
1087 {
1088 	return READ_ONCE(pool->discard_limiter.max_busy);
1089 }
1090 
1091 int set_data_vio_pool_discard_limit(struct data_vio_pool *pool, data_vio_count_t limit)
1092 {
1093 	if (get_data_vio_pool_request_limit(pool) < limit) {
1094 		// The discard limit may not be higher than the data_vio limit.
1095 		return -EINVAL;
1096 	}
1097 
1098 	spin_lock(&pool->lock);
1099 	pool->discard_limiter.limit = limit;
1100 	spin_unlock(&pool->lock);
1101 
1102 	return VDO_SUCCESS;
1103 }
1104 
1105 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
1106 {
1107 	return READ_ONCE(pool->limiter.busy);
1108 }
1109 
1110 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
1111 {
1112 	return READ_ONCE(pool->limiter.limit);
1113 }
1114 
1115 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
1116 {
1117 	return READ_ONCE(pool->limiter.max_busy);
1118 }
1119 
1120 static void update_data_vio_error_stats(struct data_vio *data_vio)
1121 {
1122 	u8 index = 0;
1123 	static const char * const operations[] = {
1124 		[0] = "empty",
1125 		[1] = "read",
1126 		[2] = "write",
1127 		[3] = "read-modify-write",
1128 		[5] = "read+fua",
1129 		[6] = "write+fua",
1130 		[7] = "read-modify-write+fua",
1131 	};
1132 
1133 	if (data_vio->read)
1134 		index = 1;
1135 
1136 	if (data_vio->write)
1137 		index += 2;
1138 
1139 	if (data_vio->fua)
1140 		index += 4;
1141 
1142 	update_vio_error_stats(&data_vio->vio,
1143 			       "Completing %s vio for LBN %llu with error after %s",
1144 			       operations[index],
1145 			       (unsigned long long) data_vio->logical.lbn,
1146 			       get_data_vio_operation_name(data_vio));
1147 }
1148 
1149 static void perform_cleanup_stage(struct data_vio *data_vio,
1150 				  enum data_vio_cleanup_stage stage);
1151 
1152 /**
1153  * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1154  *			      the end of processing a data_vio.
1155  */
1156 static void release_allocated_lock(struct vdo_completion *completion)
1157 {
1158 	struct data_vio *data_vio = as_data_vio(completion);
1159 
1160 	assert_data_vio_in_allocated_zone(data_vio);
1161 	release_data_vio_allocation_lock(data_vio, false);
1162 	perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
1163 }
1164 
1165 /** release_lock() - Release an uncontended LBN lock. */
1166 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1167 {
1168 	struct int_map *lock_map = lock->zone->lbn_operations;
1169 	struct data_vio *lock_holder;
1170 
1171 	if (!lock->locked) {
1172 		/*  The lock is not locked, so it had better not be registered in the lock map. */
1173 		struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
1174 
1175 		VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
1176 				    "no logical block lock held for block %llu",
1177 				    (unsigned long long) lock->lbn);
1178 		return;
1179 	}
1180 
1181 	/* Release the lock by removing the lock from the map. */
1182 	lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
1183 	VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
1184 			    "logical block lock mismatch for block %llu",
1185 			    (unsigned long long) lock->lbn);
1186 	lock->locked = false;
1187 }
1188 
1189 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
1190 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1191 {
1192 	struct data_vio *lock_holder, *next_lock_holder;
1193 	int result;
1194 
1195 	VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
1196 
1197 	/* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1198 	next_lock_holder =
1199 		vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
1200 
1201 	/* Transfer the remaining lock waiters to the next lock holder. */
1202 	vdo_waitq_transfer_all_waiters(&lock->waiters,
1203 				       &next_lock_holder->logical.waiters);
1204 
1205 	result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
1206 				 next_lock_holder, true, (void **) &lock_holder);
1207 	if (result != VDO_SUCCESS) {
1208 		continue_data_vio_with_error(next_lock_holder, result);
1209 		return;
1210 	}
1211 
1212 	VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
1213 			    "logical block lock mismatch for block %llu",
1214 			    (unsigned long long) lock->lbn);
1215 	lock->locked = false;
1216 
1217 	/*
1218 	 * If there are still waiters, other data_vios must be trying to get the lock we just
1219 	 * transferred. We must ensure that the new lock holder doesn't block in the packer.
1220 	 */
1221 	if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
1222 		cancel_data_vio_compression(next_lock_holder);
1223 
1224 	/*
1225 	 * Avoid stack overflow on lock transfer.
1226 	 * FIXME: this is only an issue in the 1 thread config.
1227 	 */
1228 	next_lock_holder->vio.completion.requeue = true;
1229 	launch_locked_request(next_lock_holder);
1230 }
1231 
1232 /**
1233  * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1234  *			    processing a data_vio.
1235  */
1236 static void release_logical_lock(struct vdo_completion *completion)
1237 {
1238 	struct data_vio *data_vio = as_data_vio(completion);
1239 	struct lbn_lock *lock = &data_vio->logical;
1240 
1241 	assert_data_vio_in_logical_zone(data_vio);
1242 
1243 	if (vdo_waitq_has_waiters(&lock->waiters))
1244 		transfer_lock(data_vio, lock);
1245 	else
1246 		release_lock(data_vio, lock);
1247 
1248 	vdo_release_flush_generation_lock(data_vio);
1249 	perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
1250 }
1251 
1252 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
1253 static void clean_hash_lock(struct vdo_completion *completion)
1254 {
1255 	struct data_vio *data_vio = as_data_vio(completion);
1256 
1257 	assert_data_vio_in_hash_zone(data_vio);
1258 	if (completion->result != VDO_SUCCESS) {
1259 		vdo_clean_failed_hash_lock(data_vio);
1260 		return;
1261 	}
1262 
1263 	vdo_release_hash_lock(data_vio);
1264 	perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
1265 }
1266 
1267 /**
1268  * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1269  *
1270  * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1271  * pool.
1272  */
1273 static void finish_cleanup(struct data_vio *data_vio)
1274 {
1275 	struct vdo_completion *completion = &data_vio->vio.completion;
1276 
1277 	VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
1278 			    "complete data_vio has no allocation lock");
1279 	VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
1280 			    "complete data_vio has no hash lock");
1281 	if ((data_vio->remaining_discard <= VDO_BLOCK_SIZE) ||
1282 	    (completion->result != VDO_SUCCESS)) {
1283 		struct data_vio_pool *pool = completion->vdo->data_vio_pool;
1284 
1285 		vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
1286 		schedule_releases(pool);
1287 		return;
1288 	}
1289 
1290 	data_vio->remaining_discard -= min_t(u32, data_vio->remaining_discard,
1291 					     VDO_BLOCK_SIZE - data_vio->offset);
1292 	data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
1293 	data_vio->read = data_vio->is_partial;
1294 	data_vio->offset = 0;
1295 	completion->requeue = true;
1296 	launch_data_vio(data_vio, data_vio->logical.lbn + 1);
1297 }
1298 
1299 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
1300 static void perform_cleanup_stage(struct data_vio *data_vio,
1301 				  enum data_vio_cleanup_stage stage)
1302 {
1303 	struct vdo *vdo = vdo_from_data_vio(data_vio);
1304 
1305 	switch (stage) {
1306 	case VIO_RELEASE_HASH_LOCK:
1307 		if (data_vio->hash_lock != NULL) {
1308 			launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
1309 			return;
1310 		}
1311 		fallthrough;
1312 
1313 	case VIO_RELEASE_ALLOCATED:
1314 		if (data_vio_has_allocation(data_vio)) {
1315 			launch_data_vio_allocated_zone_callback(data_vio,
1316 								release_allocated_lock);
1317 			return;
1318 		}
1319 		fallthrough;
1320 
1321 	case VIO_RELEASE_RECOVERY_LOCKS:
1322 		if ((data_vio->recovery_sequence_number > 0) &&
1323 		    (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
1324 		    (data_vio->vio.completion.result != VDO_READ_ONLY))
1325 			vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1326 		fallthrough;
1327 
1328 	case VIO_RELEASE_LOGICAL:
1329 		launch_data_vio_logical_callback(data_vio, release_logical_lock);
1330 		return;
1331 
1332 	default:
1333 		finish_cleanup(data_vio);
1334 	}
1335 }
1336 
1337 void complete_data_vio(struct vdo_completion *completion)
1338 {
1339 	struct data_vio *data_vio = as_data_vio(completion);
1340 
1341 	completion->error_handler = NULL;
1342 	data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
1343 	perform_cleanup_stage(data_vio,
1344 			      (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
1345 }
1346 
1347 static void enter_read_only_mode(struct vdo_completion *completion)
1348 {
1349 	if (vdo_is_read_only(completion->vdo))
1350 		return;
1351 
1352 	if (completion->result != VDO_READ_ONLY) {
1353 		struct data_vio *data_vio = as_data_vio(completion);
1354 
1355 		vdo_log_error_strerror(completion->result,
1356 				       "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1357 				       (unsigned long long) data_vio->logical.lbn,
1358 				       (unsigned long long) data_vio->new_mapped.pbn,
1359 				       (unsigned long long) data_vio->mapped.pbn,
1360 				       (unsigned long long) data_vio->allocation.pbn,
1361 				       get_data_vio_operation_name(data_vio));
1362 	}
1363 
1364 	vdo_enter_read_only_mode(completion->vdo, completion->result);
1365 }
1366 
1367 void handle_data_vio_error(struct vdo_completion *completion)
1368 {
1369 	struct data_vio *data_vio = as_data_vio(completion);
1370 
1371 	if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
1372 		enter_read_only_mode(completion);
1373 
1374 	update_data_vio_error_stats(data_vio);
1375 	complete_data_vio(completion);
1376 }
1377 
1378 /**
1379  * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1380  *				   data_vio.
1381  */
1382 const char *get_data_vio_operation_name(struct data_vio *data_vio)
1383 {
1384 	BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
1385 		     ARRAY_SIZE(ASYNC_OPERATION_NAMES));
1386 
1387 	return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
1388 		ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
1389 		"unknown async operation");
1390 }
1391 
1392 /**
1393  * data_vio_allocate_data_block() - Allocate a data block.
1394  *
1395  * @write_lock_type: The type of write lock to obtain on the block.
1396  * @callback: The callback which will attempt an allocation in the current zone and continue if it
1397  *	      succeeds.
1398  * @error_handler: The handler for errors while allocating.
1399  */
1400 void data_vio_allocate_data_block(struct data_vio *data_vio,
1401 				  enum pbn_lock_type write_lock_type,
1402 				  vdo_action_fn callback, vdo_action_fn error_handler)
1403 {
1404 	struct allocation *allocation = &data_vio->allocation;
1405 
1406 	VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
1407 			    "data_vio does not have an allocation");
1408 	allocation->write_lock_type = write_lock_type;
1409 	allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
1410 	allocation->first_allocation_zone = allocation->zone->zone_number;
1411 
1412 	data_vio->vio.completion.error_handler = error_handler;
1413 	launch_data_vio_allocated_zone_callback(data_vio, callback);
1414 }
1415 
1416 /**
1417  * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1418  * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1419  *
1420  * If the reference to the locked block is still provisional, it will be released as well.
1421  */
1422 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
1423 {
1424 	struct allocation *allocation = &data_vio->allocation;
1425 	physical_block_number_t locked_pbn = allocation->pbn;
1426 
1427 	assert_data_vio_in_allocated_zone(data_vio);
1428 
1429 	if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
1430 		allocation->pbn = VDO_ZERO_BLOCK;
1431 
1432 	vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
1433 					   vdo_forget(allocation->lock));
1434 }
1435 
1436 /**
1437  * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1438  * @mapping_state: The mapping state indicating which fragment to decompress.
1439  * @buffer: The buffer to receive the uncompressed data.
1440  */
1441 int uncompress_data_vio(struct data_vio *data_vio,
1442 			enum block_mapping_state mapping_state, char *buffer)
1443 {
1444 	int size;
1445 	u16 fragment_offset, fragment_size;
1446 	struct compressed_block *block = data_vio->compression.block;
1447 	int result = vdo_get_compressed_block_fragment(mapping_state, block,
1448 						       &fragment_offset, &fragment_size);
1449 
1450 	if (result != VDO_SUCCESS) {
1451 		vdo_log_debug("%s: compressed fragment error %d", __func__, result);
1452 		return result;
1453 	}
1454 
1455 	size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
1456 				   fragment_size, VDO_BLOCK_SIZE);
1457 	if (size != VDO_BLOCK_SIZE) {
1458 		vdo_log_debug("%s: lz4 error", __func__);
1459 		return VDO_INVALID_FRAGMENT;
1460 	}
1461 
1462 	return VDO_SUCCESS;
1463 }
1464 
1465 /**
1466  * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1467  * @completion: The data_vio which has just finished its read.
1468  *
1469  * This callback is registered in read_block().
1470  */
1471 static void modify_for_partial_write(struct vdo_completion *completion)
1472 {
1473 	struct data_vio *data_vio = as_data_vio(completion);
1474 	char *data = data_vio->vio.data;
1475 	struct bio *bio = data_vio->user_bio;
1476 
1477 	assert_data_vio_on_cpu_thread(data_vio);
1478 
1479 	if (bio_op(bio) == REQ_OP_DISCARD) {
1480 		memset(data + data_vio->offset, '\0', min_t(u32,
1481 							    data_vio->remaining_discard,
1482 							    VDO_BLOCK_SIZE - data_vio->offset));
1483 	} else {
1484 		copy_from_bio(bio, data + data_vio->offset);
1485 	}
1486 
1487 	data_vio->is_zero = is_zero_block(data);
1488 	data_vio->read = false;
1489 	launch_data_vio_logical_callback(data_vio,
1490 					 continue_data_vio_with_block_map_slot);
1491 }
1492 
1493 static void complete_read(struct vdo_completion *completion)
1494 {
1495 	struct data_vio *data_vio = as_data_vio(completion);
1496 	char *data = data_vio->vio.data;
1497 	bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
1498 
1499 	assert_data_vio_on_cpu_thread(data_vio);
1500 
1501 	if (compressed) {
1502 		int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
1503 
1504 		if (result != VDO_SUCCESS) {
1505 			continue_data_vio_with_error(data_vio, result);
1506 			return;
1507 		}
1508 	}
1509 
1510 	if (data_vio->write) {
1511 		modify_for_partial_write(completion);
1512 		return;
1513 	}
1514 
1515 	if (compressed || data_vio->is_partial)
1516 		copy_to_bio(data_vio->user_bio, data + data_vio->offset);
1517 
1518 	acknowledge_data_vio(data_vio);
1519 	complete_data_vio(completion);
1520 }
1521 
1522 static void read_endio(struct bio *bio)
1523 {
1524 	struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
1525 	int result = blk_status_to_errno(bio->bi_status);
1526 
1527 	vdo_count_completed_bios(bio);
1528 	if (result != VDO_SUCCESS) {
1529 		continue_data_vio_with_error(data_vio, result);
1530 		return;
1531 	}
1532 
1533 	launch_data_vio_cpu_callback(data_vio, complete_read,
1534 				     CPU_Q_COMPLETE_READ_PRIORITY);
1535 }
1536 
1537 static void complete_zero_read(struct vdo_completion *completion)
1538 {
1539 	struct data_vio *data_vio = as_data_vio(completion);
1540 
1541 	assert_data_vio_on_cpu_thread(data_vio);
1542 
1543 	if (data_vio->is_partial) {
1544 		memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
1545 		if (data_vio->write) {
1546 			modify_for_partial_write(completion);
1547 			return;
1548 		}
1549 	} else {
1550 		zero_fill_bio(data_vio->user_bio);
1551 	}
1552 
1553 	complete_read(completion);
1554 }
1555 
1556 /**
1557  * read_block() - Read a block asynchronously.
1558  *
1559  * This is the callback registered in read_block_mapping().
1560  */
1561 static void read_block(struct vdo_completion *completion)
1562 {
1563 	struct data_vio *data_vio = as_data_vio(completion);
1564 	struct vio *vio = as_vio(completion);
1565 	int result = VDO_SUCCESS;
1566 
1567 	if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1568 		launch_data_vio_cpu_callback(data_vio, complete_zero_read,
1569 					     CPU_Q_COMPLETE_VIO_PRIORITY);
1570 		return;
1571 	}
1572 
1573 	data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
1574 	if (vdo_is_state_compressed(data_vio->mapped.state)) {
1575 		result = vio_reset_bio(vio, (char *) data_vio->compression.block,
1576 				       read_endio, REQ_OP_READ, data_vio->mapped.pbn);
1577 	} else {
1578 		blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
1579 
1580 		if (data_vio->is_partial) {
1581 			result = vio_reset_bio(vio, vio->data, read_endio, opf,
1582 					       data_vio->mapped.pbn);
1583 		} else {
1584 			/* A full 4k read. Use the incoming bio to avoid having to copy the data */
1585 			bio_reset(vio->bio, vio->bio->bi_bdev, opf);
1586 			bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
1587 				       data_vio->user_bio, GFP_KERNEL);
1588 
1589 			/* Copy over the original bio iovec and opflags. */
1590 			vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
1591 					       data_vio->mapped.pbn);
1592 		}
1593 	}
1594 
1595 	if (result != VDO_SUCCESS) {
1596 		continue_data_vio_with_error(data_vio, result);
1597 		return;
1598 	}
1599 
1600 	vdo_submit_data_vio(data_vio);
1601 }
1602 
1603 static inline struct data_vio *
1604 reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
1605 {
1606 	if (completion->type == VIO_COMPLETION)
1607 		return as_data_vio(completion);
1608 
1609 	return container_of(completion, struct data_vio, decrement_completion);
1610 }
1611 
1612 /**
1613  * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1614  *                      made its reference updates. Handle any error from either, or proceed
1615  *                      to updating the block map.
1616  * @completion: The completion of the write in progress.
1617  */
1618 static void update_block_map(struct vdo_completion *completion)
1619 {
1620 	struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
1621 
1622 	assert_data_vio_in_logical_zone(data_vio);
1623 
1624 	if (!data_vio->first_reference_operation_complete) {
1625 		/* Rendezvous, we're first */
1626 		data_vio->first_reference_operation_complete = true;
1627 		return;
1628 	}
1629 
1630 	completion = &data_vio->vio.completion;
1631 	vdo_set_completion_result(completion, data_vio->decrement_completion.result);
1632 	if (completion->result != VDO_SUCCESS) {
1633 		handle_data_vio_error(completion);
1634 		return;
1635 	}
1636 
1637 	completion->error_handler = handle_data_vio_error;
1638 	if (data_vio->hash_lock != NULL)
1639 		set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
1640 	else
1641 		completion->callback = complete_data_vio;
1642 
1643 	data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
1644 	vdo_put_mapped_block(data_vio);
1645 }
1646 
1647 static void decrement_reference_count(struct vdo_completion *completion)
1648 {
1649 	struct data_vio *data_vio = container_of(completion, struct data_vio,
1650 						 decrement_completion);
1651 
1652 	assert_data_vio_in_mapped_zone(data_vio);
1653 
1654 	vdo_set_completion_callback(completion, update_block_map,
1655 				    data_vio->logical.zone->thread_id);
1656 	completion->error_handler = update_block_map;
1657 	vdo_modify_reference_count(completion, &data_vio->decrement_updater);
1658 }
1659 
1660 static void increment_reference_count(struct vdo_completion *completion)
1661 {
1662 	struct data_vio *data_vio = as_data_vio(completion);
1663 
1664 	assert_data_vio_in_new_mapped_zone(data_vio);
1665 
1666 	if (data_vio->downgrade_allocation_lock) {
1667 		/*
1668 		 * Now that the data has been written, it's safe to deduplicate against the
1669 		 * block. Downgrade the allocation lock to a read lock so it can be used later by
1670 		 * the hash lock. This is done here since it needs to happen sometime before we
1671 		 * return to the hash zone, and we are currently on the correct thread. For
1672 		 * compressed blocks, the downgrade will have already been done.
1673 		 */
1674 		vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
1675 	}
1676 
1677 	set_data_vio_logical_callback(data_vio, update_block_map);
1678 	completion->error_handler = update_block_map;
1679 	vdo_modify_reference_count(completion, &data_vio->increment_updater);
1680 }
1681 
1682 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
1683 static void journal_remapping(struct vdo_completion *completion)
1684 {
1685 	struct data_vio *data_vio = as_data_vio(completion);
1686 
1687 	assert_data_vio_in_journal_zone(data_vio);
1688 
1689 	data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
1690 	data_vio->decrement_updater.zpbn = data_vio->mapped;
1691 	if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1692 		data_vio->first_reference_operation_complete = true;
1693 		if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
1694 			set_data_vio_logical_callback(data_vio, update_block_map);
1695 	} else {
1696 		set_data_vio_new_mapped_zone_callback(data_vio,
1697 						      increment_reference_count);
1698 	}
1699 
1700 	if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1701 		data_vio->first_reference_operation_complete = true;
1702 	} else {
1703 		vdo_set_completion_callback(&data_vio->decrement_completion,
1704 					    decrement_reference_count,
1705 					    data_vio->mapped.zone->thread_id);
1706 	}
1707 
1708 	data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
1709 	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
1710 }
1711 
1712 /**
1713  * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1714  *
1715  * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1716  * journal entry referencing the removal of this LBN->PBN mapping.
1717  */
1718 static void read_old_block_mapping(struct vdo_completion *completion)
1719 {
1720 	struct data_vio *data_vio = as_data_vio(completion);
1721 
1722 	assert_data_vio_in_logical_zone(data_vio);
1723 
1724 	data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
1725 	set_data_vio_journal_callback(data_vio, journal_remapping);
1726 	vdo_get_mapped_block(data_vio);
1727 }
1728 
1729 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
1730 {
1731 	data_vio->increment_updater = (struct reference_updater) {
1732 		.operation = VDO_JOURNAL_DATA_REMAPPING,
1733 		.increment = true,
1734 		.zpbn = data_vio->new_mapped,
1735 		.lock = lock,
1736 	};
1737 
1738 	launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
1739 }
1740 
1741 /**
1742  * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1743  *
1744  * This is the callback registered in launch_compress_data_vio().
1745  */
1746 static void pack_compressed_data(struct vdo_completion *completion)
1747 {
1748 	struct data_vio *data_vio = as_data_vio(completion);
1749 
1750 	assert_data_vio_in_packer_zone(data_vio);
1751 
1752 	if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1753 	    get_data_vio_compression_status(data_vio).may_not_compress) {
1754 		write_data_vio(data_vio);
1755 		return;
1756 	}
1757 
1758 	data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
1759 	vdo_attempt_packing(data_vio);
1760 }
1761 
1762 /**
1763  * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1764  *
1765  * This callback is registered in launch_compress_data_vio().
1766  */
1767 static void compress_data_vio(struct vdo_completion *completion)
1768 {
1769 	struct data_vio *data_vio = as_data_vio(completion);
1770 	int size;
1771 
1772 	assert_data_vio_on_cpu_thread(data_vio);
1773 
1774 	/*
1775 	 * By putting the compressed data at the start of the compressed block data field, we won't
1776 	 * need to copy it if this data_vio becomes a compressed write agent.
1777 	 */
1778 	size = LZ4_compress_default(data_vio->vio.data,
1779 				    data_vio->compression.block->data, VDO_BLOCK_SIZE,
1780 				    VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
1781 				    (char *) vdo_get_work_queue_private_data());
1782 	if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
1783 		data_vio->compression.size = size;
1784 		launch_data_vio_packer_callback(data_vio, pack_compressed_data);
1785 		return;
1786 	}
1787 
1788 	write_data_vio(data_vio);
1789 }
1790 
1791 /**
1792  * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1793  *
1794  * This is a re-entry point to vio_write used by hash locks.
1795  */
1796 void launch_compress_data_vio(struct data_vio *data_vio)
1797 {
1798 	VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
1799 	VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
1800 			    "data_vio to compress has a hash_lock");
1801 	VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
1802 			    "data_vio to compress has an allocation");
1803 
1804 	/*
1805 	 * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1806 	 * compression:
1807 	 *
1808 	 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1809 	 * write request also requests FUA.
1810 	 *
1811 	 * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1812 	 *
1813 	 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1814 	 * yet been acknowledged and hence blocking in the packer would be bad.
1815 	 *
1816 	 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1817 	 * packer would also be bad.
1818 	 */
1819 	if (data_vio->fua ||
1820 	    !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1821 	    ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
1822 	    (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
1823 		write_data_vio(data_vio);
1824 		return;
1825 	}
1826 
1827 	data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
1828 	launch_data_vio_cpu_callback(data_vio, compress_data_vio,
1829 				     CPU_Q_COMPRESS_BLOCK_PRIORITY);
1830 }
1831 
1832 /**
1833  * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1834  *		     name as set).
1835 
1836  * This callback is registered in prepare_for_dedupe().
1837  */
1838 static void hash_data_vio(struct vdo_completion *completion)
1839 {
1840 	struct data_vio *data_vio = as_data_vio(completion);
1841 
1842 	assert_data_vio_on_cpu_thread(data_vio);
1843 	VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
1844 
1845 	murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
1846 			&data_vio->record_name);
1847 
1848 	data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
1849 						   &data_vio->record_name);
1850 	data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
1851 	launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
1852 }
1853 
1854 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
1855 static void prepare_for_dedupe(struct data_vio *data_vio)
1856 {
1857 	/* We don't care what thread we are on. */
1858 	VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
1859 
1860 	/*
1861 	 * Before we can dedupe, we need to know the record name, so the first
1862 	 * step is to hash the block data.
1863 	 */
1864 	data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
1865 	launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
1866 }
1867 
1868 /**
1869  * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1870  *			  when a data_vio's write to the underlying storage has completed.
1871  */
1872 static void write_bio_finished(struct bio *bio)
1873 {
1874 	struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
1875 
1876 	vdo_count_completed_bios(bio);
1877 	vdo_set_completion_result(&data_vio->vio.completion,
1878 				  blk_status_to_errno(bio->bi_status));
1879 	data_vio->downgrade_allocation_lock = true;
1880 	update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
1881 }
1882 
1883 /** write_data_vio() - Write a data block to storage without compression. */
1884 void write_data_vio(struct data_vio *data_vio)
1885 {
1886 	struct data_vio_compression_status status, new_status;
1887 	int result;
1888 
1889 	if (!data_vio_has_allocation(data_vio)) {
1890 		/*
1891 		 * There was no space to write this block and we failed to deduplicate or compress
1892 		 * it.
1893 		 */
1894 		continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
1895 		return;
1896 	}
1897 
1898 	new_status = (struct data_vio_compression_status) {
1899 		.stage = DATA_VIO_POST_PACKER,
1900 		.may_not_compress = true,
1901 	};
1902 
1903 	do {
1904 		status = get_data_vio_compression_status(data_vio);
1905 	} while ((status.stage != DATA_VIO_POST_PACKER) &&
1906 		 !set_data_vio_compression_status(data_vio, status, new_status));
1907 
1908 	/* Write the data from the data block buffer. */
1909 	result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
1910 			       write_bio_finished, REQ_OP_WRITE,
1911 			       data_vio->allocation.pbn);
1912 	if (result != VDO_SUCCESS) {
1913 		continue_data_vio_with_error(data_vio, result);
1914 		return;
1915 	}
1916 
1917 	data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
1918 	vdo_submit_data_vio(data_vio);
1919 }
1920 
1921 /**
1922  * acknowledge_write_callback() - Acknowledge a write to the requestor.
1923  *
1924  * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1925  */
1926 static void acknowledge_write_callback(struct vdo_completion *completion)
1927 {
1928 	struct data_vio *data_vio = as_data_vio(completion);
1929 	struct vdo *vdo = completion->vdo;
1930 
1931 	VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
1932 			     (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
1933 			    "%s() called on bio ack queue", __func__);
1934 	VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
1935 			    "write VIO to be acknowledged has a flush generation lock");
1936 	acknowledge_data_vio(data_vio);
1937 	if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1938 		/* This is a zero write or discard */
1939 		update_metadata_for_data_vio_write(data_vio, NULL);
1940 		return;
1941 	}
1942 
1943 	prepare_for_dedupe(data_vio);
1944 }
1945 
1946 /**
1947  * allocate_block() - Attempt to allocate a block in the current allocation zone.
1948  *
1949  * This callback is registered in continue_write_with_block_map_slot().
1950  */
1951 static void allocate_block(struct vdo_completion *completion)
1952 {
1953 	struct data_vio *data_vio = as_data_vio(completion);
1954 
1955 	assert_data_vio_in_allocated_zone(data_vio);
1956 
1957 	if (!vdo_allocate_block_in_zone(data_vio))
1958 		return;
1959 
1960 	completion->error_handler = handle_data_vio_error;
1961 	WRITE_ONCE(data_vio->allocation_succeeded, true);
1962 	data_vio->new_mapped = (struct zoned_pbn) {
1963 		.zone = data_vio->allocation.zone,
1964 		.pbn = data_vio->allocation.pbn,
1965 		.state = VDO_MAPPING_STATE_UNCOMPRESSED,
1966 	};
1967 
1968 	if (data_vio->fua) {
1969 		prepare_for_dedupe(data_vio);
1970 		return;
1971 	}
1972 
1973 	data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
1974 	launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
1975 }
1976 
1977 /**
1978  * handle_allocation_error() - Handle an error attempting to allocate a block.
1979  *
1980  * This error handler is registered in continue_write_with_block_map_slot().
1981  */
1982 static void handle_allocation_error(struct vdo_completion *completion)
1983 {
1984 	struct data_vio *data_vio = as_data_vio(completion);
1985 
1986 	if (completion->result == VDO_NO_SPACE) {
1987 		/* We failed to get an allocation, but we can try to dedupe. */
1988 		vdo_reset_completion(completion);
1989 		completion->error_handler = handle_data_vio_error;
1990 		prepare_for_dedupe(data_vio);
1991 		return;
1992 	}
1993 
1994 	/* We got a "real" error, not just a failure to allocate, so fail the request. */
1995 	handle_data_vio_error(completion);
1996 }
1997 
1998 static int assert_is_discard(struct data_vio *data_vio)
1999 {
2000 	int result = VDO_ASSERT(data_vio->is_discard,
2001 				"data_vio with no block map page is a discard");
2002 
2003 	return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
2004 }
2005 
2006 /**
2007  * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
2008  *
2009  * This callback is registered in launch_read_data_vio().
2010  */
2011 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
2012 {
2013 	struct data_vio *data_vio = as_data_vio(completion);
2014 
2015 	assert_data_vio_in_logical_zone(data_vio);
2016 	if (data_vio->read) {
2017 		set_data_vio_logical_callback(data_vio, read_block);
2018 		data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
2019 		vdo_get_mapped_block(data_vio);
2020 		return;
2021 	}
2022 
2023 	vdo_acquire_flush_generation_lock(data_vio);
2024 
2025 	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
2026 		/*
2027 		 * This is a discard for a block on a block map page which has not been allocated, so
2028 		 * there's nothing more we need to do.
2029 		 */
2030 		completion->callback = complete_data_vio;
2031 		continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
2032 		return;
2033 	}
2034 
2035 	/*
2036 	 * We need an allocation if this is neither a full-block discard nor a
2037 	 * full-block zero write.
2038 	 */
2039 	if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
2040 		data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
2041 					     handle_allocation_error);
2042 		return;
2043 	}
2044 
2045 
2046 	/*
2047 	 * We don't need to write any data, so skip allocation and just update the block map and
2048 	 * reference counts (via the journal).
2049 	 */
2050 	data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
2051 	if (data_vio->is_zero)
2052 		data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
2053 
2054 	if (data_vio->remaining_discard > VDO_BLOCK_SIZE) {
2055 		/* This is not the final block of a discard so we can't acknowledge it yet. */
2056 		update_metadata_for_data_vio_write(data_vio, NULL);
2057 		return;
2058 	}
2059 
2060 	data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
2061 	launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
2062 }
2063