xref: /linux/drivers/md/dm-vdo/data-vio.c (revision 55a42f78ffd386e01a5404419f8c5ded7db70a21)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "data-vio.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/blkdev.h>
11 #include <linux/delay.h>
12 #include <linux/device-mapper.h>
13 #include <linux/jiffies.h>
14 #include <linux/kernel.h>
15 #include <linux/list.h>
16 #include <linux/lz4.h>
17 #include <linux/minmax.h>
18 #include <linux/sched.h>
19 #include <linux/spinlock.h>
20 #include <linux/string.h>
21 #include <linux/wait.h>
22 
23 #include "logger.h"
24 #include "memory-alloc.h"
25 #include "murmurhash3.h"
26 #include "permassert.h"
27 
28 #include "block-map.h"
29 #include "dump.h"
30 #include "encodings.h"
31 #include "int-map.h"
32 #include "io-submitter.h"
33 #include "logical-zone.h"
34 #include "packer.h"
35 #include "recovery-journal.h"
36 #include "slab-depot.h"
37 #include "status-codes.h"
38 #include "types.h"
39 #include "vdo.h"
40 #include "vio.h"
41 #include "wait-queue.h"
42 
43 /**
44  * DOC: Bio flags.
45  *
46  * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
47  * flags on our own bio(s) for that request may help underlying layers better fulfill the user
48  * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
49  * flags, as they convey incorrect information.
50  *
51  * These flags are always irrelevant if we have already finished the user bio as they are only
52  * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
53  * important finishing the finished bio was.
54  *
55  * Note that bio.c contains the complete list of flags we believe may be set; the following list
56  * explains the action taken with each of those flags VDO could receive:
57  *
58  * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
59  *   completion is required for further work to be done by the issuer.
60  * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
61  *   treats it as more urgent, similar to REQ_SYNC.
62  * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
63  *   important.
64  * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
65  * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
66  *   match incoming IO, so this flag is incorrect for it.
67  * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
68  * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
69  * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
70  *   ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
71  *   prioritization.
72  */
73 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
74 
75 /**
76  * DOC:
77  *
78  * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
79  * correctness, and in order to avoid potentially expensive or blocking memory allocations during
80  * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
81  * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
82  * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
83  * for which a data_vio or discard permit are not available will block until the necessary
84  * resources are available. The pool is also responsible for distributing resources to blocked
85  * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
86  * performing the work of actually assigning resources to blocked threads or placing data_vios back
87  * into the pool on a single cpu at a time.
88  *
89  * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
90  * permits. The limiters also provide safe cross-thread access to pool statistics without the need
91  * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
92  * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
93  * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
94  * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
95  * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
96  * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
97  * break if jiffies are only 32 bits.)
98  *
99  * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
100  * will be called on it. This function will add the data_vio to a funnel queue, and then check the
101  * state of the pool. If the pool is not currently processing released data_vios, the pool's
102  * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
103  * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
104  * threads.
105  *
106  * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
107  * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
108  * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
109  * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
110  * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
111  * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
112  * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
113  * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
114  * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
115  * them are awakened.
116  */
117 
118 #define DATA_VIO_RELEASE_BATCH_SIZE 128
119 
120 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
121 static const u32 COMPRESSION_STATUS_MASK = 0xff;
122 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
123 
124 struct limiter;
125 typedef void (*assigner_fn)(struct limiter *limiter);
126 
127 /* Bookkeeping structure for a single type of resource. */
128 struct limiter {
129 	/* The data_vio_pool to which this limiter belongs */
130 	struct data_vio_pool *pool;
131 	/* The maximum number of data_vios available */
132 	data_vio_count_t limit;
133 	/* The number of resources in use */
134 	data_vio_count_t busy;
135 	/* The maximum number of resources ever simultaneously in use */
136 	data_vio_count_t max_busy;
137 	/* The number of resources to release */
138 	data_vio_count_t release_count;
139 	/* The number of waiters to wake */
140 	data_vio_count_t wake_count;
141 	/* The list of waiting bios which are known to process_release_callback() */
142 	struct bio_list waiters;
143 	/* The list of waiting bios which are not yet known to process_release_callback() */
144 	struct bio_list new_waiters;
145 	/* The list of waiters which have their permits */
146 	struct bio_list *permitted_waiters;
147 	/* The function for assigning a resource to a waiter */
148 	assigner_fn assigner;
149 	/* The queue of blocked threads */
150 	wait_queue_head_t blocked_threads;
151 	/* The arrival time of the eldest waiter */
152 	u64 arrival;
153 };
154 
155 /*
156  * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
157  * and are released in batches.
158  */
159 struct data_vio_pool {
160 	/* Completion for scheduling releases */
161 	struct vdo_completion completion;
162 	/* The administrative state of the pool */
163 	struct admin_state state;
164 	/* Lock protecting the pool */
165 	spinlock_t lock;
166 	/* The main limiter controlling the total data_vios in the pool. */
167 	struct limiter limiter;
168 	/* The limiter controlling data_vios for discard */
169 	struct limiter discard_limiter;
170 	/* The list of bios which have discard permits but still need a data_vio */
171 	struct bio_list permitted_discards;
172 	/* The list of available data_vios */
173 	struct list_head available;
174 	/* The queue of data_vios waiting to be returned to the pool */
175 	struct funnel_queue *queue;
176 	/* Whether the pool is processing, or scheduled to process releases */
177 	atomic_t processing;
178 	/* The data vios in the pool */
179 	struct data_vio data_vios[];
180 };
181 
182 static const char * const ASYNC_OPERATION_NAMES[] = {
183 	"launch",
184 	"acknowledge_write",
185 	"acquire_hash_lock",
186 	"attempt_logical_block_lock",
187 	"lock_duplicate_pbn",
188 	"check_for_duplication",
189 	"cleanup",
190 	"compress_data_vio",
191 	"find_block_map_slot",
192 	"get_mapped_block_for_read",
193 	"get_mapped_block_for_write",
194 	"hash_data_vio",
195 	"journal_remapping",
196 	"vdo_attempt_packing",
197 	"put_mapped_block",
198 	"read_data_vio",
199 	"update_dedupe_index",
200 	"update_reference_counts",
201 	"verify_duplication",
202 	"write_data_vio",
203 };
204 
205 /* The steps taken cleaning up a VIO, in the order they are performed. */
206 enum data_vio_cleanup_stage {
207 	VIO_CLEANUP_START,
208 	VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
209 	VIO_RELEASE_ALLOCATED,
210 	VIO_RELEASE_RECOVERY_LOCKS,
211 	VIO_RELEASE_LOGICAL,
212 	VIO_CLEANUP_DONE
213 };
214 
215 static inline struct data_vio_pool * __must_check
216 as_data_vio_pool(struct vdo_completion *completion)
217 {
218 	vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
219 	return container_of(completion, struct data_vio_pool, completion);
220 }
221 
222 static inline u64 get_arrival_time(struct bio *bio)
223 {
224 	return (u64) bio->bi_private;
225 }
226 
227 /**
228  * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
229  *				       or waiters while holding the pool's lock.
230  */
231 static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
232 {
233 	if (pool->limiter.busy > 0)
234 		return false;
235 
236 	VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
237 			    "no outstanding discard permits");
238 
239 	return (bio_list_empty(&pool->limiter.new_waiters) &&
240 		bio_list_empty(&pool->discard_limiter.new_waiters));
241 }
242 
243 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
244 {
245 	struct vdo *vdo = vdo_from_data_vio(data_vio);
246 	zone_count_t zone_number;
247 	struct lbn_lock *lock = &data_vio->logical;
248 
249 	lock->lbn = lbn;
250 	lock->locked = false;
251 	vdo_waitq_init(&lock->waiters);
252 	zone_number = vdo_compute_logical_zone(data_vio);
253 	lock->zone = &vdo->logical_zones->zones[zone_number];
254 }
255 
256 static void launch_locked_request(struct data_vio *data_vio)
257 {
258 	data_vio->logical.locked = true;
259 	if (data_vio->write) {
260 		struct vdo *vdo = vdo_from_data_vio(data_vio);
261 
262 		if (vdo_is_read_only(vdo)) {
263 			continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
264 			return;
265 		}
266 	}
267 
268 	data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
269 	vdo_find_block_map_slot(data_vio);
270 }
271 
272 static void acknowledge_data_vio(struct data_vio *data_vio)
273 {
274 	struct vdo *vdo = vdo_from_data_vio(data_vio);
275 	struct bio *bio = data_vio->user_bio;
276 	int error = vdo_status_to_errno(data_vio->vio.completion.result);
277 
278 	if (bio == NULL)
279 		return;
280 
281 	VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
282 			     (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
283 			    "data_vio to acknowledge is not an incomplete discard");
284 
285 	data_vio->user_bio = NULL;
286 	vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
287 	if (data_vio->is_partial)
288 		vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
289 
290 	bio->bi_status = errno_to_blk_status(error);
291 	bio_endio(bio);
292 }
293 
294 static void copy_to_bio(struct bio *bio, char *data_ptr)
295 {
296 	struct bio_vec biovec;
297 	struct bvec_iter iter;
298 
299 	bio_for_each_segment(biovec, bio, iter) {
300 		memcpy_to_bvec(&biovec, data_ptr);
301 		data_ptr += biovec.bv_len;
302 	}
303 }
304 
305 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
306 {
307 	u32 packed = atomic_read(&data_vio->compression.status);
308 
309 	/* pairs with cmpxchg in set_data_vio_compression_status */
310 	smp_rmb();
311 	return (struct data_vio_compression_status) {
312 		.stage = packed & COMPRESSION_STATUS_MASK,
313 		.may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
314 	};
315 }
316 
317 /**
318  * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
319  *                 atomically.
320  * @status: The state to convert.
321  *
322  * Return: The compression state packed into a u32.
323  */
324 static u32 __must_check pack_status(struct data_vio_compression_status status)
325 {
326 	return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
327 }
328 
329 /**
330  * set_data_vio_compression_status() - Set the compression status of a data_vio.
331  * @data_vio: The data_vio to change.
332  * @status: The expected current status of the data_vio.
333  * @new_status: The status to set.
334  *
335  * Return: true if the new status was set, false if the data_vio's compression status did not
336  *         match the expected state, and so was left unchanged.
337  */
338 static bool __must_check
339 set_data_vio_compression_status(struct data_vio *data_vio,
340 				struct data_vio_compression_status status,
341 				struct data_vio_compression_status new_status)
342 {
343 	u32 actual;
344 	u32 expected = pack_status(status);
345 	u32 replacement = pack_status(new_status);
346 
347 	/*
348 	 * Extra barriers because this was original developed using a CAS operation that implicitly
349 	 * had them.
350 	 */
351 	smp_mb__before_atomic();
352 	actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
353 	/* same as before_atomic */
354 	smp_mb__after_atomic();
355 	return (expected == actual);
356 }
357 
358 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
359 {
360 	for (;;) {
361 		struct data_vio_compression_status status =
362 			get_data_vio_compression_status(data_vio);
363 		struct data_vio_compression_status new_status = status;
364 
365 		if (status.stage == DATA_VIO_POST_PACKER) {
366 			/* We're already in the last stage. */
367 			return status;
368 		}
369 
370 		if (status.may_not_compress) {
371 			/*
372 			 * Compression has been dis-allowed for this VIO, so skip the rest of the
373 			 * path and go to the end.
374 			 */
375 			new_status.stage = DATA_VIO_POST_PACKER;
376 		} else {
377 			/* Go to the next state. */
378 			new_status.stage++;
379 		}
380 
381 		if (set_data_vio_compression_status(data_vio, status, new_status))
382 			return new_status;
383 
384 		/* Another thread changed the status out from under us so try again. */
385 	}
386 }
387 
388 /**
389  * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
390  *
391  * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
392  */
393 bool cancel_data_vio_compression(struct data_vio *data_vio)
394 {
395 	struct data_vio_compression_status status, new_status;
396 
397 	for (;;) {
398 		status = get_data_vio_compression_status(data_vio);
399 		if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
400 			/* This data_vio is already set up to not block in the packer. */
401 			break;
402 		}
403 
404 		new_status.stage = status.stage;
405 		new_status.may_not_compress = true;
406 
407 		if (set_data_vio_compression_status(data_vio, status, new_status))
408 			break;
409 	}
410 
411 	return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
412 }
413 
414 /**
415  * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
416  * @completion: The data_vio for an external data request as a completion.
417  *
418  * This is the start of the path for all external requests. It is registered in launch_data_vio().
419  */
420 static void attempt_logical_block_lock(struct vdo_completion *completion)
421 {
422 	struct data_vio *data_vio = as_data_vio(completion);
423 	struct lbn_lock *lock = &data_vio->logical;
424 	struct vdo *vdo = vdo_from_data_vio(data_vio);
425 	struct data_vio *lock_holder;
426 	int result;
427 
428 	assert_data_vio_in_logical_zone(data_vio);
429 
430 	if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
431 		continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
432 		return;
433 	}
434 
435 	result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
436 				 data_vio, false, (void **) &lock_holder);
437 	if (result != VDO_SUCCESS) {
438 		continue_data_vio_with_error(data_vio, result);
439 		return;
440 	}
441 
442 	if (lock_holder == NULL) {
443 		/* We got the lock */
444 		launch_locked_request(data_vio);
445 		return;
446 	}
447 
448 	result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
449 	if (result != VDO_SUCCESS) {
450 		continue_data_vio_with_error(data_vio, result);
451 		return;
452 	}
453 
454 	/*
455 	 * If the new request is a pure read request (not read-modify-write) and the lock_holder is
456 	 * writing and has received an allocation, service the read request immediately by copying
457 	 * data from the lock_holder to avoid having to flush the write out of the packer just to
458 	 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
459 	 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
460 	 * order to prevent returning data that may not have actually been written.
461 	 */
462 	if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
463 		copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
464 		acknowledge_data_vio(data_vio);
465 		complete_data_vio(completion);
466 		return;
467 	}
468 
469 	data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
470 	vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
471 
472 	/*
473 	 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
474 	 * packer.
475 	 */
476 	if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
477 		data_vio->compression.lock_holder = lock_holder;
478 		launch_data_vio_packer_callback(data_vio,
479 						vdo_remove_lock_holder_from_packer);
480 	}
481 }
482 
483 /**
484  * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
485  *		       same parent and other state and send it on its way.
486  */
487 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
488 {
489 	struct vdo_completion *completion = &data_vio->vio.completion;
490 
491 	/*
492 	 * Clearing the tree lock must happen before initializing the LBN lock, which also adds
493 	 * information to the tree lock.
494 	 */
495 	memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
496 	initialize_lbn_lock(data_vio, lbn);
497 	INIT_LIST_HEAD(&data_vio->hash_lock_entry);
498 	INIT_LIST_HEAD(&data_vio->write_entry);
499 
500 	memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
501 
502 	data_vio->is_duplicate = false;
503 
504 	memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
505 	memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
506 	vdo_reset_completion(&data_vio->decrement_completion);
507 	vdo_reset_completion(completion);
508 	completion->error_handler = handle_data_vio_error;
509 	set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
510 	vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
511 }
512 
513 static void copy_from_bio(struct bio *bio, char *data_ptr)
514 {
515 	struct bio_vec biovec;
516 	struct bvec_iter iter;
517 
518 	bio_for_each_segment(biovec, bio, iter) {
519 		memcpy_from_bvec(data_ptr, &biovec);
520 		data_ptr += biovec.bv_len;
521 	}
522 }
523 
524 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
525 {
526 	logical_block_number_t lbn;
527 	/*
528 	 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
529 	 * separately allocated objects).
530 	 */
531 	memset(data_vio, 0, offsetof(struct data_vio, vio));
532 	memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
533 
534 	data_vio->user_bio = bio;
535 	data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
536 	data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
537 
538 	/*
539 	 * Discards behave very differently than other requests when coming in from device-mapper.
540 	 * We have to be able to handle any size discards and various sector offsets within a
541 	 * block.
542 	 */
543 	if (bio_op(bio) == REQ_OP_DISCARD) {
544 		data_vio->remaining_discard = bio->bi_iter.bi_size;
545 		data_vio->write = true;
546 		data_vio->is_discard = true;
547 		if (data_vio->is_partial) {
548 			vdo_count_bios(&vdo->stats.bios_in_partial, bio);
549 			data_vio->read = true;
550 		}
551 	} else if (data_vio->is_partial) {
552 		vdo_count_bios(&vdo->stats.bios_in_partial, bio);
553 		data_vio->read = true;
554 		if (bio_data_dir(bio) == WRITE)
555 			data_vio->write = true;
556 	} else if (bio_data_dir(bio) == READ) {
557 		data_vio->read = true;
558 	} else {
559 		/*
560 		 * Copy the bio data to a char array so that we can continue to use the data after
561 		 * we acknowledge the bio.
562 		 */
563 		copy_from_bio(bio, data_vio->vio.data);
564 		data_vio->is_zero = mem_is_zero(data_vio->vio.data, VDO_BLOCK_SIZE);
565 		data_vio->write = true;
566 	}
567 
568 	if (data_vio->user_bio->bi_opf & REQ_FUA)
569 		data_vio->fua = true;
570 
571 	lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
572 	launch_data_vio(data_vio, lbn);
573 }
574 
575 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
576 {
577 	struct bio *bio = bio_list_pop(limiter->permitted_waiters);
578 
579 	launch_bio(limiter->pool->completion.vdo, data_vio, bio);
580 	limiter->wake_count++;
581 
582 	bio = bio_list_peek(limiter->permitted_waiters);
583 	limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
584 }
585 
586 static void assign_discard_permit(struct limiter *limiter)
587 {
588 	struct bio *bio = bio_list_pop(&limiter->waiters);
589 
590 	if (limiter->arrival == U64_MAX)
591 		limiter->arrival = get_arrival_time(bio);
592 
593 	bio_list_add(limiter->permitted_waiters, bio);
594 }
595 
596 static void get_waiters(struct limiter *limiter)
597 {
598 	bio_list_merge_init(&limiter->waiters, &limiter->new_waiters);
599 }
600 
601 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
602 {
603 	struct data_vio *data_vio =
604 		list_first_entry(&pool->available, struct data_vio, pool_entry);
605 
606 	list_del_init(&data_vio->pool_entry);
607 	return data_vio;
608 }
609 
610 static void assign_data_vio_to_waiter(struct limiter *limiter)
611 {
612 	assign_data_vio(limiter, get_available_data_vio(limiter->pool));
613 }
614 
615 static void update_limiter(struct limiter *limiter)
616 {
617 	struct bio_list *waiters = &limiter->waiters;
618 	data_vio_count_t available = limiter->limit - limiter->busy;
619 
620 	VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
621 			    "Release count %u is not more than busy count %u",
622 			    limiter->release_count, limiter->busy);
623 
624 	get_waiters(limiter);
625 	for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
626 		limiter->assigner(limiter);
627 
628 	if (limiter->release_count > 0) {
629 		WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
630 		limiter->release_count = 0;
631 		return;
632 	}
633 
634 	for (; (available > 0) && !bio_list_empty(waiters); available--)
635 		limiter->assigner(limiter);
636 
637 	WRITE_ONCE(limiter->busy, limiter->limit - available);
638 	if (limiter->max_busy < limiter->busy)
639 		WRITE_ONCE(limiter->max_busy, limiter->busy);
640 }
641 
642 /**
643  * schedule_releases() - Ensure that release processing is scheduled.
644  *
645  * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
646  * done so.
647  */
648 static void schedule_releases(struct data_vio_pool *pool)
649 {
650 	/* Pairs with the barrier in process_release_callback(). */
651 	smp_mb__before_atomic();
652 	if (atomic_cmpxchg(&pool->processing, false, true))
653 		return;
654 
655 	pool->completion.requeue = true;
656 	vdo_launch_completion_with_priority(&pool->completion,
657 					    CPU_Q_COMPLETE_VIO_PRIORITY);
658 }
659 
660 static void reuse_or_release_resources(struct data_vio_pool *pool,
661 				       struct data_vio *data_vio,
662 				       struct list_head *returned)
663 {
664 	if (data_vio->remaining_discard > 0) {
665 		if (bio_list_empty(&pool->discard_limiter.waiters)) {
666 			/* Return the data_vio's discard permit. */
667 			pool->discard_limiter.release_count++;
668 		} else {
669 			assign_discard_permit(&pool->discard_limiter);
670 		}
671 	}
672 
673 	if (pool->limiter.arrival < pool->discard_limiter.arrival) {
674 		assign_data_vio(&pool->limiter, data_vio);
675 	} else if (pool->discard_limiter.arrival < U64_MAX) {
676 		assign_data_vio(&pool->discard_limiter, data_vio);
677 	} else {
678 		list_add(&data_vio->pool_entry, returned);
679 		pool->limiter.release_count++;
680 	}
681 }
682 
683 /**
684  * process_release_callback() - Process a batch of data_vio releases.
685  * @completion: The pool with data_vios to release.
686  */
687 static void process_release_callback(struct vdo_completion *completion)
688 {
689 	struct data_vio_pool *pool = as_data_vio_pool(completion);
690 	bool reschedule;
691 	bool drained;
692 	data_vio_count_t processed;
693 	data_vio_count_t to_wake;
694 	data_vio_count_t discards_to_wake;
695 	LIST_HEAD(returned);
696 
697 	spin_lock(&pool->lock);
698 	get_waiters(&pool->discard_limiter);
699 	get_waiters(&pool->limiter);
700 	spin_unlock(&pool->lock);
701 
702 	if (pool->limiter.arrival == U64_MAX) {
703 		struct bio *bio = bio_list_peek(&pool->limiter.waiters);
704 
705 		if (bio != NULL)
706 			pool->limiter.arrival = get_arrival_time(bio);
707 	}
708 
709 	for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
710 		struct data_vio *data_vio;
711 		struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
712 
713 		if (entry == NULL)
714 			break;
715 
716 		data_vio = as_data_vio(container_of(entry, struct vdo_completion,
717 						    work_queue_entry_link));
718 		acknowledge_data_vio(data_vio);
719 		reuse_or_release_resources(pool, data_vio, &returned);
720 	}
721 
722 	spin_lock(&pool->lock);
723 	/*
724 	 * There is a race where waiters could be added while we are in the unlocked section above.
725 	 * Those waiters could not see the resources we are now about to release, so we assign
726 	 * those resources now as we have no guarantee of being rescheduled. This is handled in
727 	 * update_limiter().
728 	 */
729 	update_limiter(&pool->discard_limiter);
730 	list_splice(&returned, &pool->available);
731 	update_limiter(&pool->limiter);
732 	to_wake = pool->limiter.wake_count;
733 	pool->limiter.wake_count = 0;
734 	discards_to_wake = pool->discard_limiter.wake_count;
735 	pool->discard_limiter.wake_count = 0;
736 
737 	atomic_set(&pool->processing, false);
738 	/* Pairs with the barrier in schedule_releases(). */
739 	smp_mb();
740 
741 	reschedule = !vdo_is_funnel_queue_empty(pool->queue);
742 	drained = (!reschedule &&
743 		   vdo_is_state_draining(&pool->state) &&
744 		   check_for_drain_complete_locked(pool));
745 	spin_unlock(&pool->lock);
746 
747 	if (to_wake > 0)
748 		wake_up_nr(&pool->limiter.blocked_threads, to_wake);
749 
750 	if (discards_to_wake > 0)
751 		wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
752 
753 	if (reschedule)
754 		schedule_releases(pool);
755 	else if (drained)
756 		vdo_finish_draining(&pool->state);
757 }
758 
759 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
760 			       assigner_fn assigner, data_vio_count_t limit)
761 {
762 	limiter->pool = pool;
763 	limiter->assigner = assigner;
764 	limiter->limit = limit;
765 	limiter->arrival = U64_MAX;
766 	init_waitqueue_head(&limiter->blocked_threads);
767 }
768 
769 /**
770  * initialize_data_vio() - Allocate the components of a data_vio.
771  *
772  * The caller is responsible for cleaning up the data_vio on error.
773  *
774  * Return: VDO_SUCCESS or an error.
775  */
776 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
777 {
778 	struct bio *bio;
779 	int result;
780 
781 	BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
782 	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
783 				     &data_vio->vio.data);
784 	if (result != VDO_SUCCESS)
785 		return vdo_log_error_strerror(result,
786 					      "data_vio data allocation failure");
787 
788 	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
789 				     &data_vio->compression.block);
790 	if (result != VDO_SUCCESS) {
791 		return vdo_log_error_strerror(result,
792 					      "data_vio compressed block allocation failure");
793 	}
794 
795 	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
796 				     &data_vio->scratch_block);
797 	if (result != VDO_SUCCESS)
798 		return vdo_log_error_strerror(result,
799 					      "data_vio scratch allocation failure");
800 
801 	result = vdo_create_bio(&bio);
802 	if (result != VDO_SUCCESS)
803 		return vdo_log_error_strerror(result,
804 					      "data_vio data bio allocation failure");
805 
806 	vdo_initialize_completion(&data_vio->decrement_completion, vdo,
807 				  VDO_DECREMENT_COMPLETION);
808 	initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
809 
810 	return VDO_SUCCESS;
811 }
812 
813 static void destroy_data_vio(struct data_vio *data_vio)
814 {
815 	if (data_vio == NULL)
816 		return;
817 
818 	vdo_free_bio(vdo_forget(data_vio->vio.bio));
819 	vdo_free(vdo_forget(data_vio->vio.data));
820 	vdo_free(vdo_forget(data_vio->compression.block));
821 	vdo_free(vdo_forget(data_vio->scratch_block));
822 }
823 
824 /**
825  * make_data_vio_pool() - Initialize a data_vio pool.
826  * @vdo: The vdo to which the pool will belong.
827  * @pool_size: The number of data_vios in the pool.
828  * @discard_limit: The maximum number of data_vios which may be used for discards.
829  * @pool_ptr: A pointer to hold the newly allocated pool.
830  */
831 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
832 		       data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
833 {
834 	int result;
835 	struct data_vio_pool *pool;
836 	data_vio_count_t i;
837 
838 	result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
839 				       __func__, &pool);
840 	if (result != VDO_SUCCESS)
841 		return result;
842 
843 	VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
844 			    "discard limit does not exceed pool size");
845 	initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
846 			   discard_limit);
847 	pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
848 	initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
849 	pool->limiter.permitted_waiters = &pool->limiter.waiters;
850 	INIT_LIST_HEAD(&pool->available);
851 	spin_lock_init(&pool->lock);
852 	vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
853 	vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
854 	vdo_prepare_completion(&pool->completion, process_release_callback,
855 			       process_release_callback, vdo->thread_config.cpu_thread,
856 			       NULL);
857 
858 	result = vdo_make_funnel_queue(&pool->queue);
859 	if (result != VDO_SUCCESS) {
860 		free_data_vio_pool(vdo_forget(pool));
861 		return result;
862 	}
863 
864 	for (i = 0; i < pool_size; i++) {
865 		struct data_vio *data_vio = &pool->data_vios[i];
866 
867 		result = initialize_data_vio(data_vio, vdo);
868 		if (result != VDO_SUCCESS) {
869 			destroy_data_vio(data_vio);
870 			free_data_vio_pool(pool);
871 			return result;
872 		}
873 
874 		list_add(&data_vio->pool_entry, &pool->available);
875 	}
876 
877 	*pool_ptr = pool;
878 	return VDO_SUCCESS;
879 }
880 
881 /**
882  * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
883  *
884  * All data_vios must be returned to the pool before calling this function.
885  */
886 void free_data_vio_pool(struct data_vio_pool *pool)
887 {
888 	struct data_vio *data_vio, *tmp;
889 
890 	if (pool == NULL)
891 		return;
892 
893 	/*
894 	 * Pairs with the barrier in process_release_callback(). Possibly not needed since it
895 	 * caters to an enqueue vs. free race.
896 	 */
897 	smp_mb();
898 	BUG_ON(atomic_read(&pool->processing));
899 
900 	spin_lock(&pool->lock);
901 	VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
902 			    "data_vio pool must not have %u busy entries when being freed",
903 			    pool->limiter.busy);
904 	VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
905 			     bio_list_empty(&pool->limiter.new_waiters)),
906 			    "data_vio pool must not have threads waiting to read or write when being freed");
907 	VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
908 			     bio_list_empty(&pool->discard_limiter.new_waiters)),
909 			    "data_vio pool must not have threads waiting to discard when being freed");
910 	spin_unlock(&pool->lock);
911 
912 	list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
913 		list_del_init(&data_vio->pool_entry);
914 		destroy_data_vio(data_vio);
915 	}
916 
917 	vdo_free_funnel_queue(vdo_forget(pool->queue));
918 	vdo_free(pool);
919 }
920 
921 static bool acquire_permit(struct limiter *limiter)
922 {
923 	if (limiter->busy >= limiter->limit)
924 		return false;
925 
926 	WRITE_ONCE(limiter->busy, limiter->busy + 1);
927 	if (limiter->max_busy < limiter->busy)
928 		WRITE_ONCE(limiter->max_busy, limiter->busy);
929 	return true;
930 }
931 
932 static void wait_permit(struct limiter *limiter, struct bio *bio)
933 	__releases(&limiter->pool->lock)
934 {
935 	DEFINE_WAIT(wait);
936 
937 	bio_list_add(&limiter->new_waiters, bio);
938 	prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
939 				  TASK_UNINTERRUPTIBLE);
940 	spin_unlock(&limiter->pool->lock);
941 	io_schedule();
942 	finish_wait(&limiter->blocked_threads, &wait);
943 }
944 
945 /**
946  * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
947  *
948  * This will block if data_vios or discard permits are not available.
949  */
950 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
951 {
952 	struct data_vio *data_vio;
953 
954 	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
955 			    "data_vio_pool not quiescent on acquire");
956 
957 	bio->bi_private = (void *) jiffies;
958 	spin_lock(&pool->lock);
959 	if ((bio_op(bio) == REQ_OP_DISCARD) &&
960 	    !acquire_permit(&pool->discard_limiter)) {
961 		wait_permit(&pool->discard_limiter, bio);
962 		return;
963 	}
964 
965 	if (!acquire_permit(&pool->limiter)) {
966 		wait_permit(&pool->limiter, bio);
967 		return;
968 	}
969 
970 	data_vio = get_available_data_vio(pool);
971 	spin_unlock(&pool->lock);
972 	launch_bio(pool->completion.vdo, data_vio, bio);
973 }
974 
975 /* Implements vdo_admin_initiator_fn. */
976 static void initiate_drain(struct admin_state *state)
977 {
978 	bool drained;
979 	struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
980 
981 	spin_lock(&pool->lock);
982 	drained = check_for_drain_complete_locked(pool);
983 	spin_unlock(&pool->lock);
984 
985 	if (drained)
986 		vdo_finish_draining(state);
987 }
988 
989 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
990 {
991 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
992 			    "%s called on cpu thread", name);
993 }
994 
995 /**
996  * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
997  * @completion: The completion to notify when the pool has drained.
998  */
999 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1000 {
1001 	assert_on_vdo_cpu_thread(completion->vdo, __func__);
1002 	vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
1003 			   initiate_drain);
1004 }
1005 
1006 /**
1007  * resume_data_vio_pool() - Resume a data_vio pool.
1008  * @completion: The completion to notify when the pool has resumed.
1009  */
1010 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1011 {
1012 	assert_on_vdo_cpu_thread(completion->vdo, __func__);
1013 	vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
1014 }
1015 
1016 static void dump_limiter(const char *name, struct limiter *limiter)
1017 {
1018 	vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
1019 		     limiter->limit, limiter->max_busy,
1020 		     ((bio_list_empty(&limiter->waiters) &&
1021 		       bio_list_empty(&limiter->new_waiters)) ?
1022 		      "no waiters" : "has waiters"));
1023 }
1024 
1025 /**
1026  * dump_data_vio_pool() - Dump a data_vio pool to the log.
1027  * @dump_vios: Whether to dump the details of each busy data_vio as well.
1028  */
1029 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
1030 {
1031 	/*
1032 	 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1033 	 * second clock tick).  These numbers were picked based on experiments with lab machines.
1034 	 */
1035 	static const int ELEMENTS_PER_BATCH = 35;
1036 	static const int SLEEP_FOR_SYSLOG = 4000;
1037 
1038 	if (pool == NULL)
1039 		return;
1040 
1041 	spin_lock(&pool->lock);
1042 	dump_limiter("data_vios", &pool->limiter);
1043 	dump_limiter("discard permits", &pool->discard_limiter);
1044 	if (dump_vios) {
1045 		int i;
1046 		int dumped = 0;
1047 
1048 		for (i = 0; i < pool->limiter.limit; i++) {
1049 			struct data_vio *data_vio = &pool->data_vios[i];
1050 
1051 			if (!list_empty(&data_vio->pool_entry))
1052 				continue;
1053 
1054 			dump_data_vio(data_vio);
1055 			if (++dumped >= ELEMENTS_PER_BATCH) {
1056 				spin_unlock(&pool->lock);
1057 				dumped = 0;
1058 				fsleep(SLEEP_FOR_SYSLOG);
1059 				spin_lock(&pool->lock);
1060 			}
1061 		}
1062 	}
1063 
1064 	spin_unlock(&pool->lock);
1065 }
1066 
1067 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
1068 {
1069 	return READ_ONCE(pool->limiter.busy);
1070 }
1071 
1072 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
1073 {
1074 	return READ_ONCE(pool->limiter.limit);
1075 }
1076 
1077 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
1078 {
1079 	return READ_ONCE(pool->limiter.max_busy);
1080 }
1081 
1082 static void update_data_vio_error_stats(struct data_vio *data_vio)
1083 {
1084 	u8 index = 0;
1085 	static const char * const operations[] = {
1086 		[0] = "empty",
1087 		[1] = "read",
1088 		[2] = "write",
1089 		[3] = "read-modify-write",
1090 		[5] = "read+fua",
1091 		[6] = "write+fua",
1092 		[7] = "read-modify-write+fua",
1093 	};
1094 
1095 	if (data_vio->read)
1096 		index = 1;
1097 
1098 	if (data_vio->write)
1099 		index += 2;
1100 
1101 	if (data_vio->fua)
1102 		index += 4;
1103 
1104 	update_vio_error_stats(&data_vio->vio,
1105 			       "Completing %s vio for LBN %llu with error after %s",
1106 			       operations[index],
1107 			       (unsigned long long) data_vio->logical.lbn,
1108 			       get_data_vio_operation_name(data_vio));
1109 }
1110 
1111 static void perform_cleanup_stage(struct data_vio *data_vio,
1112 				  enum data_vio_cleanup_stage stage);
1113 
1114 /**
1115  * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1116  *			      the end of processing a data_vio.
1117  */
1118 static void release_allocated_lock(struct vdo_completion *completion)
1119 {
1120 	struct data_vio *data_vio = as_data_vio(completion);
1121 
1122 	assert_data_vio_in_allocated_zone(data_vio);
1123 	release_data_vio_allocation_lock(data_vio, false);
1124 	perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
1125 }
1126 
1127 /** release_lock() - Release an uncontended LBN lock. */
1128 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1129 {
1130 	struct int_map *lock_map = lock->zone->lbn_operations;
1131 	struct data_vio *lock_holder;
1132 
1133 	if (!lock->locked) {
1134 		/*  The lock is not locked, so it had better not be registered in the lock map. */
1135 		struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
1136 
1137 		VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
1138 				    "no logical block lock held for block %llu",
1139 				    (unsigned long long) lock->lbn);
1140 		return;
1141 	}
1142 
1143 	/* Release the lock by removing the lock from the map. */
1144 	lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
1145 	VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
1146 			    "logical block lock mismatch for block %llu",
1147 			    (unsigned long long) lock->lbn);
1148 	lock->locked = false;
1149 }
1150 
1151 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
1152 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1153 {
1154 	struct data_vio *lock_holder, *next_lock_holder;
1155 	int result;
1156 
1157 	VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
1158 
1159 	/* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1160 	next_lock_holder =
1161 		vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
1162 
1163 	/* Transfer the remaining lock waiters to the next lock holder. */
1164 	vdo_waitq_transfer_all_waiters(&lock->waiters,
1165 				       &next_lock_holder->logical.waiters);
1166 
1167 	result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
1168 				 next_lock_holder, true, (void **) &lock_holder);
1169 	if (result != VDO_SUCCESS) {
1170 		continue_data_vio_with_error(next_lock_holder, result);
1171 		return;
1172 	}
1173 
1174 	VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
1175 			    "logical block lock mismatch for block %llu",
1176 			    (unsigned long long) lock->lbn);
1177 	lock->locked = false;
1178 
1179 	/*
1180 	 * If there are still waiters, other data_vios must be trying to get the lock we just
1181 	 * transferred. We must ensure that the new lock holder doesn't block in the packer.
1182 	 */
1183 	if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
1184 		cancel_data_vio_compression(next_lock_holder);
1185 
1186 	/*
1187 	 * Avoid stack overflow on lock transfer.
1188 	 * FIXME: this is only an issue in the 1 thread config.
1189 	 */
1190 	next_lock_holder->vio.completion.requeue = true;
1191 	launch_locked_request(next_lock_holder);
1192 }
1193 
1194 /**
1195  * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1196  *			    processing a data_vio.
1197  */
1198 static void release_logical_lock(struct vdo_completion *completion)
1199 {
1200 	struct data_vio *data_vio = as_data_vio(completion);
1201 	struct lbn_lock *lock = &data_vio->logical;
1202 
1203 	assert_data_vio_in_logical_zone(data_vio);
1204 
1205 	if (vdo_waitq_has_waiters(&lock->waiters))
1206 		transfer_lock(data_vio, lock);
1207 	else
1208 		release_lock(data_vio, lock);
1209 
1210 	vdo_release_flush_generation_lock(data_vio);
1211 	perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
1212 }
1213 
1214 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
1215 static void clean_hash_lock(struct vdo_completion *completion)
1216 {
1217 	struct data_vio *data_vio = as_data_vio(completion);
1218 
1219 	assert_data_vio_in_hash_zone(data_vio);
1220 	if (completion->result != VDO_SUCCESS) {
1221 		vdo_clean_failed_hash_lock(data_vio);
1222 		return;
1223 	}
1224 
1225 	vdo_release_hash_lock(data_vio);
1226 	perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
1227 }
1228 
1229 /**
1230  * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1231  *
1232  * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1233  * pool.
1234  */
1235 static void finish_cleanup(struct data_vio *data_vio)
1236 {
1237 	struct vdo_completion *completion = &data_vio->vio.completion;
1238 	u32 discard_size = min_t(u32, data_vio->remaining_discard,
1239 				 VDO_BLOCK_SIZE - data_vio->offset);
1240 
1241 	VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
1242 			    "complete data_vio has no allocation lock");
1243 	VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
1244 			    "complete data_vio has no hash lock");
1245 	if ((data_vio->remaining_discard <= discard_size) ||
1246 	    (completion->result != VDO_SUCCESS)) {
1247 		struct data_vio_pool *pool = completion->vdo->data_vio_pool;
1248 
1249 		vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
1250 		schedule_releases(pool);
1251 		return;
1252 	}
1253 
1254 	data_vio->remaining_discard -= discard_size;
1255 	data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
1256 	data_vio->read = data_vio->is_partial;
1257 	data_vio->offset = 0;
1258 	completion->requeue = true;
1259 	data_vio->first_reference_operation_complete = false;
1260 	launch_data_vio(data_vio, data_vio->logical.lbn + 1);
1261 }
1262 
1263 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
1264 static void perform_cleanup_stage(struct data_vio *data_vio,
1265 				  enum data_vio_cleanup_stage stage)
1266 {
1267 	struct vdo *vdo = vdo_from_data_vio(data_vio);
1268 
1269 	switch (stage) {
1270 	case VIO_RELEASE_HASH_LOCK:
1271 		if (data_vio->hash_lock != NULL) {
1272 			launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
1273 			return;
1274 		}
1275 		fallthrough;
1276 
1277 	case VIO_RELEASE_ALLOCATED:
1278 		if (data_vio_has_allocation(data_vio)) {
1279 			launch_data_vio_allocated_zone_callback(data_vio,
1280 								release_allocated_lock);
1281 			return;
1282 		}
1283 		fallthrough;
1284 
1285 	case VIO_RELEASE_RECOVERY_LOCKS:
1286 		if ((data_vio->recovery_sequence_number > 0) &&
1287 		    (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
1288 		    (data_vio->vio.completion.result != VDO_READ_ONLY))
1289 			vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1290 		fallthrough;
1291 
1292 	case VIO_RELEASE_LOGICAL:
1293 		launch_data_vio_logical_callback(data_vio, release_logical_lock);
1294 		return;
1295 
1296 	default:
1297 		finish_cleanup(data_vio);
1298 	}
1299 }
1300 
1301 void complete_data_vio(struct vdo_completion *completion)
1302 {
1303 	struct data_vio *data_vio = as_data_vio(completion);
1304 
1305 	completion->error_handler = NULL;
1306 	data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
1307 	perform_cleanup_stage(data_vio,
1308 			      (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
1309 }
1310 
1311 static void enter_read_only_mode(struct vdo_completion *completion)
1312 {
1313 	if (vdo_is_read_only(completion->vdo))
1314 		return;
1315 
1316 	if (completion->result != VDO_READ_ONLY) {
1317 		struct data_vio *data_vio = as_data_vio(completion);
1318 
1319 		vdo_log_error_strerror(completion->result,
1320 				       "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1321 				       (unsigned long long) data_vio->logical.lbn,
1322 				       (unsigned long long) data_vio->new_mapped.pbn,
1323 				       (unsigned long long) data_vio->mapped.pbn,
1324 				       (unsigned long long) data_vio->allocation.pbn,
1325 				       get_data_vio_operation_name(data_vio));
1326 	}
1327 
1328 	vdo_enter_read_only_mode(completion->vdo, completion->result);
1329 }
1330 
1331 void handle_data_vio_error(struct vdo_completion *completion)
1332 {
1333 	struct data_vio *data_vio = as_data_vio(completion);
1334 
1335 	if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
1336 		enter_read_only_mode(completion);
1337 
1338 	update_data_vio_error_stats(data_vio);
1339 	complete_data_vio(completion);
1340 }
1341 
1342 /**
1343  * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1344  *				   data_vio.
1345  */
1346 const char *get_data_vio_operation_name(struct data_vio *data_vio)
1347 {
1348 	BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
1349 		     ARRAY_SIZE(ASYNC_OPERATION_NAMES));
1350 
1351 	return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
1352 		ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
1353 		"unknown async operation");
1354 }
1355 
1356 /**
1357  * data_vio_allocate_data_block() - Allocate a data block.
1358  *
1359  * @write_lock_type: The type of write lock to obtain on the block.
1360  * @callback: The callback which will attempt an allocation in the current zone and continue if it
1361  *	      succeeds.
1362  * @error_handler: The handler for errors while allocating.
1363  */
1364 void data_vio_allocate_data_block(struct data_vio *data_vio,
1365 				  enum pbn_lock_type write_lock_type,
1366 				  vdo_action_fn callback, vdo_action_fn error_handler)
1367 {
1368 	struct allocation *allocation = &data_vio->allocation;
1369 
1370 	VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
1371 			    "data_vio does not have an allocation");
1372 	allocation->write_lock_type = write_lock_type;
1373 	allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
1374 	allocation->first_allocation_zone = allocation->zone->zone_number;
1375 
1376 	data_vio->vio.completion.error_handler = error_handler;
1377 	launch_data_vio_allocated_zone_callback(data_vio, callback);
1378 }
1379 
1380 /**
1381  * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1382  * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1383  *
1384  * If the reference to the locked block is still provisional, it will be released as well.
1385  */
1386 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
1387 {
1388 	struct allocation *allocation = &data_vio->allocation;
1389 	physical_block_number_t locked_pbn = allocation->pbn;
1390 
1391 	assert_data_vio_in_allocated_zone(data_vio);
1392 
1393 	if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
1394 		allocation->pbn = VDO_ZERO_BLOCK;
1395 
1396 	vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
1397 					   vdo_forget(allocation->lock));
1398 }
1399 
1400 /**
1401  * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1402  * @mapping_state: The mapping state indicating which fragment to decompress.
1403  * @buffer: The buffer to receive the uncompressed data.
1404  */
1405 int uncompress_data_vio(struct data_vio *data_vio,
1406 			enum block_mapping_state mapping_state, char *buffer)
1407 {
1408 	int size;
1409 	u16 fragment_offset, fragment_size;
1410 	struct compressed_block *block = data_vio->compression.block;
1411 	int result = vdo_get_compressed_block_fragment(mapping_state, block,
1412 						       &fragment_offset, &fragment_size);
1413 
1414 	if (result != VDO_SUCCESS) {
1415 		vdo_log_debug("%s: compressed fragment error %d", __func__, result);
1416 		return result;
1417 	}
1418 
1419 	size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
1420 				   fragment_size, VDO_BLOCK_SIZE);
1421 	if (size != VDO_BLOCK_SIZE) {
1422 		vdo_log_debug("%s: lz4 error", __func__);
1423 		return VDO_INVALID_FRAGMENT;
1424 	}
1425 
1426 	return VDO_SUCCESS;
1427 }
1428 
1429 /**
1430  * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1431  * @completion: The data_vio which has just finished its read.
1432  *
1433  * This callback is registered in read_block().
1434  */
1435 static void modify_for_partial_write(struct vdo_completion *completion)
1436 {
1437 	struct data_vio *data_vio = as_data_vio(completion);
1438 	char *data = data_vio->vio.data;
1439 	struct bio *bio = data_vio->user_bio;
1440 
1441 	assert_data_vio_on_cpu_thread(data_vio);
1442 
1443 	if (bio_op(bio) == REQ_OP_DISCARD) {
1444 		memset(data + data_vio->offset, '\0', min_t(u32,
1445 							    data_vio->remaining_discard,
1446 							    VDO_BLOCK_SIZE - data_vio->offset));
1447 	} else {
1448 		copy_from_bio(bio, data + data_vio->offset);
1449 	}
1450 
1451 	data_vio->is_zero = mem_is_zero(data, VDO_BLOCK_SIZE);
1452 	data_vio->read = false;
1453 	launch_data_vio_logical_callback(data_vio,
1454 					 continue_data_vio_with_block_map_slot);
1455 }
1456 
1457 static void complete_read(struct vdo_completion *completion)
1458 {
1459 	struct data_vio *data_vio = as_data_vio(completion);
1460 	char *data = data_vio->vio.data;
1461 	bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
1462 
1463 	assert_data_vio_on_cpu_thread(data_vio);
1464 
1465 	if (compressed) {
1466 		int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
1467 
1468 		if (result != VDO_SUCCESS) {
1469 			continue_data_vio_with_error(data_vio, result);
1470 			return;
1471 		}
1472 	}
1473 
1474 	if (data_vio->write) {
1475 		modify_for_partial_write(completion);
1476 		return;
1477 	}
1478 
1479 	if (compressed || data_vio->is_partial)
1480 		copy_to_bio(data_vio->user_bio, data + data_vio->offset);
1481 
1482 	acknowledge_data_vio(data_vio);
1483 	complete_data_vio(completion);
1484 }
1485 
1486 static void read_endio(struct bio *bio)
1487 {
1488 	struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
1489 	int result = blk_status_to_errno(bio->bi_status);
1490 
1491 	vdo_count_completed_bios(bio);
1492 	if (result != VDO_SUCCESS) {
1493 		continue_data_vio_with_error(data_vio, result);
1494 		return;
1495 	}
1496 
1497 	launch_data_vio_cpu_callback(data_vio, complete_read,
1498 				     CPU_Q_COMPLETE_READ_PRIORITY);
1499 }
1500 
1501 static void complete_zero_read(struct vdo_completion *completion)
1502 {
1503 	struct data_vio *data_vio = as_data_vio(completion);
1504 
1505 	assert_data_vio_on_cpu_thread(data_vio);
1506 
1507 	if (data_vio->is_partial) {
1508 		memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
1509 		if (data_vio->write) {
1510 			modify_for_partial_write(completion);
1511 			return;
1512 		}
1513 	} else {
1514 		zero_fill_bio(data_vio->user_bio);
1515 	}
1516 
1517 	complete_read(completion);
1518 }
1519 
1520 /**
1521  * read_block() - Read a block asynchronously.
1522  *
1523  * This is the callback registered in read_block_mapping().
1524  */
1525 static void read_block(struct vdo_completion *completion)
1526 {
1527 	struct data_vio *data_vio = as_data_vio(completion);
1528 	struct vio *vio = as_vio(completion);
1529 	int result = VDO_SUCCESS;
1530 
1531 	if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1532 		launch_data_vio_cpu_callback(data_vio, complete_zero_read,
1533 					     CPU_Q_COMPLETE_VIO_PRIORITY);
1534 		return;
1535 	}
1536 
1537 	data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
1538 	if (vdo_is_state_compressed(data_vio->mapped.state)) {
1539 		result = vio_reset_bio(vio, (char *) data_vio->compression.block,
1540 				       read_endio, REQ_OP_READ, data_vio->mapped.pbn);
1541 	} else {
1542 		blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
1543 
1544 		if (data_vio->is_partial) {
1545 			result = vio_reset_bio(vio, vio->data, read_endio, opf,
1546 					       data_vio->mapped.pbn);
1547 		} else {
1548 			/* A full 4k read. Use the incoming bio to avoid having to copy the data */
1549 			bio_reset(vio->bio, vio->bio->bi_bdev, opf);
1550 			bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
1551 				       data_vio->user_bio, GFP_KERNEL);
1552 
1553 			/* Copy over the original bio iovec and opflags. */
1554 			vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
1555 					       data_vio->mapped.pbn);
1556 		}
1557 	}
1558 
1559 	if (result != VDO_SUCCESS) {
1560 		continue_data_vio_with_error(data_vio, result);
1561 		return;
1562 	}
1563 
1564 	vdo_submit_data_vio(data_vio);
1565 }
1566 
1567 static inline struct data_vio *
1568 reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
1569 {
1570 	if (completion->type == VIO_COMPLETION)
1571 		return as_data_vio(completion);
1572 
1573 	return container_of(completion, struct data_vio, decrement_completion);
1574 }
1575 
1576 /**
1577  * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1578  *                      made its reference updates. Handle any error from either, or proceed
1579  *                      to updating the block map.
1580  * @completion: The completion of the write in progress.
1581  */
1582 static void update_block_map(struct vdo_completion *completion)
1583 {
1584 	struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
1585 
1586 	assert_data_vio_in_logical_zone(data_vio);
1587 
1588 	if (!data_vio->first_reference_operation_complete) {
1589 		/* Rendezvous, we're first */
1590 		data_vio->first_reference_operation_complete = true;
1591 		return;
1592 	}
1593 
1594 	completion = &data_vio->vio.completion;
1595 	vdo_set_completion_result(completion, data_vio->decrement_completion.result);
1596 	if (completion->result != VDO_SUCCESS) {
1597 		handle_data_vio_error(completion);
1598 		return;
1599 	}
1600 
1601 	completion->error_handler = handle_data_vio_error;
1602 	if (data_vio->hash_lock != NULL)
1603 		set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
1604 	else
1605 		completion->callback = complete_data_vio;
1606 
1607 	data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
1608 	vdo_put_mapped_block(data_vio);
1609 }
1610 
1611 static void decrement_reference_count(struct vdo_completion *completion)
1612 {
1613 	struct data_vio *data_vio = container_of(completion, struct data_vio,
1614 						 decrement_completion);
1615 
1616 	assert_data_vio_in_mapped_zone(data_vio);
1617 
1618 	vdo_set_completion_callback(completion, update_block_map,
1619 				    data_vio->logical.zone->thread_id);
1620 	completion->error_handler = update_block_map;
1621 	vdo_modify_reference_count(completion, &data_vio->decrement_updater);
1622 }
1623 
1624 static void increment_reference_count(struct vdo_completion *completion)
1625 {
1626 	struct data_vio *data_vio = as_data_vio(completion);
1627 
1628 	assert_data_vio_in_new_mapped_zone(data_vio);
1629 
1630 	if (data_vio->downgrade_allocation_lock) {
1631 		/*
1632 		 * Now that the data has been written, it's safe to deduplicate against the
1633 		 * block. Downgrade the allocation lock to a read lock so it can be used later by
1634 		 * the hash lock. This is done here since it needs to happen sometime before we
1635 		 * return to the hash zone, and we are currently on the correct thread. For
1636 		 * compressed blocks, the downgrade will have already been done.
1637 		 */
1638 		vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
1639 	}
1640 
1641 	set_data_vio_logical_callback(data_vio, update_block_map);
1642 	completion->error_handler = update_block_map;
1643 	vdo_modify_reference_count(completion, &data_vio->increment_updater);
1644 }
1645 
1646 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
1647 static void journal_remapping(struct vdo_completion *completion)
1648 {
1649 	struct data_vio *data_vio = as_data_vio(completion);
1650 
1651 	assert_data_vio_in_journal_zone(data_vio);
1652 
1653 	data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
1654 	data_vio->decrement_updater.zpbn = data_vio->mapped;
1655 	if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1656 		data_vio->first_reference_operation_complete = true;
1657 		if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
1658 			set_data_vio_logical_callback(data_vio, update_block_map);
1659 	} else {
1660 		set_data_vio_new_mapped_zone_callback(data_vio,
1661 						      increment_reference_count);
1662 	}
1663 
1664 	if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1665 		data_vio->first_reference_operation_complete = true;
1666 	} else {
1667 		vdo_set_completion_callback(&data_vio->decrement_completion,
1668 					    decrement_reference_count,
1669 					    data_vio->mapped.zone->thread_id);
1670 	}
1671 
1672 	data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
1673 	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
1674 }
1675 
1676 /**
1677  * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1678  *
1679  * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1680  * journal entry referencing the removal of this LBN->PBN mapping.
1681  */
1682 static void read_old_block_mapping(struct vdo_completion *completion)
1683 {
1684 	struct data_vio *data_vio = as_data_vio(completion);
1685 
1686 	assert_data_vio_in_logical_zone(data_vio);
1687 
1688 	data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
1689 	set_data_vio_journal_callback(data_vio, journal_remapping);
1690 	vdo_get_mapped_block(data_vio);
1691 }
1692 
1693 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
1694 {
1695 	data_vio->increment_updater = (struct reference_updater) {
1696 		.operation = VDO_JOURNAL_DATA_REMAPPING,
1697 		.increment = true,
1698 		.zpbn = data_vio->new_mapped,
1699 		.lock = lock,
1700 	};
1701 
1702 	launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
1703 }
1704 
1705 /**
1706  * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1707  *
1708  * This is the callback registered in launch_compress_data_vio().
1709  */
1710 static void pack_compressed_data(struct vdo_completion *completion)
1711 {
1712 	struct data_vio *data_vio = as_data_vio(completion);
1713 
1714 	assert_data_vio_in_packer_zone(data_vio);
1715 
1716 	if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1717 	    get_data_vio_compression_status(data_vio).may_not_compress) {
1718 		write_data_vio(data_vio);
1719 		return;
1720 	}
1721 
1722 	data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
1723 	vdo_attempt_packing(data_vio);
1724 }
1725 
1726 /**
1727  * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1728  *
1729  * This callback is registered in launch_compress_data_vio().
1730  */
1731 static void compress_data_vio(struct vdo_completion *completion)
1732 {
1733 	struct data_vio *data_vio = as_data_vio(completion);
1734 	int size;
1735 
1736 	assert_data_vio_on_cpu_thread(data_vio);
1737 
1738 	/*
1739 	 * By putting the compressed data at the start of the compressed block data field, we won't
1740 	 * need to copy it if this data_vio becomes a compressed write agent.
1741 	 */
1742 	size = LZ4_compress_default(data_vio->vio.data,
1743 				    data_vio->compression.block->data, VDO_BLOCK_SIZE,
1744 				    VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
1745 				    (char *) vdo_get_work_queue_private_data());
1746 	if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
1747 		data_vio->compression.size = size;
1748 		launch_data_vio_packer_callback(data_vio, pack_compressed_data);
1749 		return;
1750 	}
1751 
1752 	write_data_vio(data_vio);
1753 }
1754 
1755 /**
1756  * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1757  *
1758  * This is a re-entry point to vio_write used by hash locks.
1759  */
1760 void launch_compress_data_vio(struct data_vio *data_vio)
1761 {
1762 	VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
1763 	VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
1764 			    "data_vio to compress has a hash_lock");
1765 	VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
1766 			    "data_vio to compress has an allocation");
1767 
1768 	/*
1769 	 * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1770 	 * compression:
1771 	 *
1772 	 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1773 	 * write request also requests FUA.
1774 	 *
1775 	 * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1776 	 *
1777 	 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1778 	 * yet been acknowledged and hence blocking in the packer would be bad.
1779 	 *
1780 	 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1781 	 * packer would also be bad.
1782 	 */
1783 	if (data_vio->fua ||
1784 	    !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1785 	    ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
1786 	    (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
1787 		write_data_vio(data_vio);
1788 		return;
1789 	}
1790 
1791 	data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
1792 	launch_data_vio_cpu_callback(data_vio, compress_data_vio,
1793 				     CPU_Q_COMPRESS_BLOCK_PRIORITY);
1794 }
1795 
1796 /**
1797  * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1798  *		     name as set).
1799 
1800  * This callback is registered in prepare_for_dedupe().
1801  */
1802 static void hash_data_vio(struct vdo_completion *completion)
1803 {
1804 	struct data_vio *data_vio = as_data_vio(completion);
1805 
1806 	assert_data_vio_on_cpu_thread(data_vio);
1807 	VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
1808 
1809 	murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
1810 			&data_vio->record_name);
1811 
1812 	data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
1813 						   &data_vio->record_name);
1814 	data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
1815 	launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
1816 }
1817 
1818 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
1819 static void prepare_for_dedupe(struct data_vio *data_vio)
1820 {
1821 	/* We don't care what thread we are on. */
1822 	VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
1823 
1824 	/*
1825 	 * Before we can dedupe, we need to know the record name, so the first
1826 	 * step is to hash the block data.
1827 	 */
1828 	data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
1829 	launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
1830 }
1831 
1832 /**
1833  * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1834  *			  when a data_vio's write to the underlying storage has completed.
1835  */
1836 static void write_bio_finished(struct bio *bio)
1837 {
1838 	struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
1839 
1840 	vdo_count_completed_bios(bio);
1841 	vdo_set_completion_result(&data_vio->vio.completion,
1842 				  blk_status_to_errno(bio->bi_status));
1843 	data_vio->downgrade_allocation_lock = true;
1844 	update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
1845 }
1846 
1847 /** write_data_vio() - Write a data block to storage without compression. */
1848 void write_data_vio(struct data_vio *data_vio)
1849 {
1850 	struct data_vio_compression_status status, new_status;
1851 	int result;
1852 
1853 	if (!data_vio_has_allocation(data_vio)) {
1854 		/*
1855 		 * There was no space to write this block and we failed to deduplicate or compress
1856 		 * it.
1857 		 */
1858 		continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
1859 		return;
1860 	}
1861 
1862 	new_status = (struct data_vio_compression_status) {
1863 		.stage = DATA_VIO_POST_PACKER,
1864 		.may_not_compress = true,
1865 	};
1866 
1867 	do {
1868 		status = get_data_vio_compression_status(data_vio);
1869 	} while ((status.stage != DATA_VIO_POST_PACKER) &&
1870 		 !set_data_vio_compression_status(data_vio, status, new_status));
1871 
1872 	/* Write the data from the data block buffer. */
1873 	result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
1874 			       write_bio_finished, REQ_OP_WRITE,
1875 			       data_vio->allocation.pbn);
1876 	if (result != VDO_SUCCESS) {
1877 		continue_data_vio_with_error(data_vio, result);
1878 		return;
1879 	}
1880 
1881 	data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
1882 	vdo_submit_data_vio(data_vio);
1883 }
1884 
1885 /**
1886  * acknowledge_write_callback() - Acknowledge a write to the requestor.
1887  *
1888  * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1889  */
1890 static void acknowledge_write_callback(struct vdo_completion *completion)
1891 {
1892 	struct data_vio *data_vio = as_data_vio(completion);
1893 	struct vdo *vdo = completion->vdo;
1894 
1895 	VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
1896 			     (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
1897 			    "%s() called on bio ack queue", __func__);
1898 	VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
1899 			    "write VIO to be acknowledged has a flush generation lock");
1900 	acknowledge_data_vio(data_vio);
1901 	if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1902 		/* This is a zero write or discard */
1903 		update_metadata_for_data_vio_write(data_vio, NULL);
1904 		return;
1905 	}
1906 
1907 	prepare_for_dedupe(data_vio);
1908 }
1909 
1910 /**
1911  * allocate_block() - Attempt to allocate a block in the current allocation zone.
1912  *
1913  * This callback is registered in continue_write_with_block_map_slot().
1914  */
1915 static void allocate_block(struct vdo_completion *completion)
1916 {
1917 	struct data_vio *data_vio = as_data_vio(completion);
1918 
1919 	assert_data_vio_in_allocated_zone(data_vio);
1920 
1921 	if (!vdo_allocate_block_in_zone(data_vio))
1922 		return;
1923 
1924 	completion->error_handler = handle_data_vio_error;
1925 	WRITE_ONCE(data_vio->allocation_succeeded, true);
1926 	data_vio->new_mapped = (struct zoned_pbn) {
1927 		.zone = data_vio->allocation.zone,
1928 		.pbn = data_vio->allocation.pbn,
1929 		.state = VDO_MAPPING_STATE_UNCOMPRESSED,
1930 	};
1931 
1932 	if (data_vio->fua ||
1933 	    data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
1934 		prepare_for_dedupe(data_vio);
1935 		return;
1936 	}
1937 
1938 	data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
1939 	launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
1940 }
1941 
1942 /**
1943  * handle_allocation_error() - Handle an error attempting to allocate a block.
1944  *
1945  * This error handler is registered in continue_write_with_block_map_slot().
1946  */
1947 static void handle_allocation_error(struct vdo_completion *completion)
1948 {
1949 	struct data_vio *data_vio = as_data_vio(completion);
1950 
1951 	if (completion->result == VDO_NO_SPACE) {
1952 		/* We failed to get an allocation, but we can try to dedupe. */
1953 		vdo_reset_completion(completion);
1954 		completion->error_handler = handle_data_vio_error;
1955 		prepare_for_dedupe(data_vio);
1956 		return;
1957 	}
1958 
1959 	/* We got a "real" error, not just a failure to allocate, so fail the request. */
1960 	handle_data_vio_error(completion);
1961 }
1962 
1963 static int assert_is_discard(struct data_vio *data_vio)
1964 {
1965 	int result = VDO_ASSERT(data_vio->is_discard,
1966 				"data_vio with no block map page is a discard");
1967 
1968 	return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
1969 }
1970 
1971 /**
1972  * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
1973  *
1974  * This callback is registered in launch_read_data_vio().
1975  */
1976 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
1977 {
1978 	struct data_vio *data_vio = as_data_vio(completion);
1979 
1980 	assert_data_vio_in_logical_zone(data_vio);
1981 	if (data_vio->read) {
1982 		set_data_vio_logical_callback(data_vio, read_block);
1983 		data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
1984 		vdo_get_mapped_block(data_vio);
1985 		return;
1986 	}
1987 
1988 	vdo_acquire_flush_generation_lock(data_vio);
1989 
1990 	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
1991 		/*
1992 		 * This is a discard for a block on a block map page which has not been allocated, so
1993 		 * there's nothing more we need to do.
1994 		 */
1995 		completion->callback = complete_data_vio;
1996 		continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
1997 		return;
1998 	}
1999 
2000 	/*
2001 	 * We need an allocation if this is neither a full-block discard nor a
2002 	 * full-block zero write.
2003 	 */
2004 	if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
2005 		data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
2006 					     handle_allocation_error);
2007 		return;
2008 	}
2009 
2010 	/*
2011 	 * We don't need to write any data, so skip allocation and just update the block map and
2012 	 * reference counts (via the journal).
2013 	 */
2014 	data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
2015 	if (data_vio->is_zero)
2016 		data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
2017 
2018 	if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
2019 		/* This is not the final block of a discard so we can't acknowledge it yet. */
2020 		update_metadata_for_data_vio_write(data_vio, NULL);
2021 		return;
2022 	}
2023 
2024 	data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
2025 	launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
2026 }
2027