1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "data-vio.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/blkdev.h>
11 #include <linux/delay.h>
12 #include <linux/device-mapper.h>
13 #include <linux/jiffies.h>
14 #include <linux/kernel.h>
15 #include <linux/list.h>
16 #include <linux/lz4.h>
17 #include <linux/minmax.h>
18 #include <linux/sched.h>
19 #include <linux/spinlock.h>
20 #include <linux/string.h>
21 #include <linux/wait.h>
22
23 #include "logger.h"
24 #include "memory-alloc.h"
25 #include "murmurhash3.h"
26 #include "permassert.h"
27
28 #include "block-map.h"
29 #include "dump.h"
30 #include "encodings.h"
31 #include "int-map.h"
32 #include "io-submitter.h"
33 #include "logical-zone.h"
34 #include "packer.h"
35 #include "recovery-journal.h"
36 #include "slab-depot.h"
37 #include "status-codes.h"
38 #include "types.h"
39 #include "vdo.h"
40 #include "vio.h"
41 #include "wait-queue.h"
42
43 /**
44 * DOC: Bio flags.
45 *
46 * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
47 * flags on our own bio(s) for that request may help underlying layers better fulfill the user
48 * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
49 * flags, as they convey incorrect information.
50 *
51 * These flags are always irrelevant if we have already finished the user bio as they are only
52 * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
53 * important finishing the finished bio was.
54 *
55 * Note that bio.c contains the complete list of flags we believe may be set; the following list
56 * explains the action taken with each of those flags VDO could receive:
57 *
58 * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
59 * completion is required for further work to be done by the issuer.
60 * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
61 * treats it as more urgent, similar to REQ_SYNC.
62 * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
63 * important.
64 * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
65 * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
66 * match incoming IO, so this flag is incorrect for it.
67 * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
68 * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
69 * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
70 * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
71 * prioritization.
72 */
73 static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
74
75 /**
76 * DOC:
77 *
78 * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
79 * correctness, and in order to avoid potentially expensive or blocking memory allocations during
80 * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
81 * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
82 * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
83 * for which a data_vio or discard permit are not available will block until the necessary
84 * resources are available. The pool is also responsible for distributing resources to blocked
85 * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
86 * performing the work of actually assigning resources to blocked threads or placing data_vios back
87 * into the pool on a single cpu at a time.
88 *
89 * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
90 * permits. The limiters also provide safe cross-thread access to pool statistics without the need
91 * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
92 * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
93 * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
94 * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
95 * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
96 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
97 * break if jiffies are only 32 bits.)
98 *
99 * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
100 * will be called on it. This function will add the data_vio to a funnel queue, and then check the
101 * state of the pool. If the pool is not currently processing released data_vios, the pool's
102 * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
103 * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
104 * threads.
105 *
106 * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
107 * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
108 * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
109 * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
110 * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
111 * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
112 * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
113 * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
114 * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
115 * them are awakened.
116 */
117
118 #define DATA_VIO_RELEASE_BATCH_SIZE 128
119
120 static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
121 static const u32 COMPRESSION_STATUS_MASK = 0xff;
122 static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
123
124 struct limiter;
125 typedef void (*assigner_fn)(struct limiter *limiter);
126
127 /* Bookkeeping structure for a single type of resource. */
128 struct limiter {
129 /* The data_vio_pool to which this limiter belongs */
130 struct data_vio_pool *pool;
131 /* The maximum number of data_vios available */
132 data_vio_count_t limit;
133 /* The number of resources in use */
134 data_vio_count_t busy;
135 /* The maximum number of resources ever simultaneously in use */
136 data_vio_count_t max_busy;
137 /* The number of resources to release */
138 data_vio_count_t release_count;
139 /* The number of waiters to wake */
140 data_vio_count_t wake_count;
141 /* The list of waiting bios which are known to process_release_callback() */
142 struct bio_list waiters;
143 /* The list of waiting bios which are not yet known to process_release_callback() */
144 struct bio_list new_waiters;
145 /* The list of waiters which have their permits */
146 struct bio_list *permitted_waiters;
147 /* The function for assigning a resource to a waiter */
148 assigner_fn assigner;
149 /* The queue of blocked threads */
150 wait_queue_head_t blocked_threads;
151 /* The arrival time of the eldest waiter */
152 u64 arrival;
153 };
154
155 /*
156 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
157 * and are released in batches.
158 */
159 struct data_vio_pool {
160 /* Completion for scheduling releases */
161 struct vdo_completion completion;
162 /* The administrative state of the pool */
163 struct admin_state state;
164 /* Lock protecting the pool */
165 spinlock_t lock;
166 /* The main limiter controlling the total data_vios in the pool. */
167 struct limiter limiter;
168 /* The limiter controlling data_vios for discard */
169 struct limiter discard_limiter;
170 /* The list of bios which have discard permits but still need a data_vio */
171 struct bio_list permitted_discards;
172 /* The list of available data_vios */
173 struct list_head available;
174 /* The queue of data_vios waiting to be returned to the pool */
175 struct funnel_queue *queue;
176 /* Whether the pool is processing, or scheduled to process releases */
177 atomic_t processing;
178 /* The data vios in the pool */
179 struct data_vio data_vios[];
180 };
181
182 static const char * const ASYNC_OPERATION_NAMES[] = {
183 "launch",
184 "acknowledge_write",
185 "acquire_hash_lock",
186 "attempt_logical_block_lock",
187 "lock_duplicate_pbn",
188 "check_for_duplication",
189 "cleanup",
190 "compress_data_vio",
191 "find_block_map_slot",
192 "get_mapped_block_for_read",
193 "get_mapped_block_for_write",
194 "hash_data_vio",
195 "journal_remapping",
196 "vdo_attempt_packing",
197 "put_mapped_block",
198 "read_data_vio",
199 "update_dedupe_index",
200 "update_reference_counts",
201 "verify_duplication",
202 "write_data_vio",
203 };
204
205 /* The steps taken cleaning up a VIO, in the order they are performed. */
206 enum data_vio_cleanup_stage {
207 VIO_CLEANUP_START,
208 VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
209 VIO_RELEASE_ALLOCATED,
210 VIO_RELEASE_RECOVERY_LOCKS,
211 VIO_RELEASE_LOGICAL,
212 VIO_CLEANUP_DONE
213 };
214
215 static inline struct data_vio_pool * __must_check
as_data_vio_pool(struct vdo_completion * completion)216 as_data_vio_pool(struct vdo_completion *completion)
217 {
218 vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
219 return container_of(completion, struct data_vio_pool, completion);
220 }
221
get_arrival_time(struct bio * bio)222 static inline u64 get_arrival_time(struct bio *bio)
223 {
224 return (u64) bio->bi_private;
225 }
226
227 /**
228 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
229 * or waiters while holding the pool's lock.
230 * @pool: The data_vio pool.
231 */
check_for_drain_complete_locked(struct data_vio_pool * pool)232 static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
233 {
234 if (pool->limiter.busy > 0)
235 return false;
236
237 VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
238 "no outstanding discard permits");
239
240 return (bio_list_empty(&pool->limiter.new_waiters) &&
241 bio_list_empty(&pool->discard_limiter.new_waiters));
242 }
243
initialize_lbn_lock(struct data_vio * data_vio,logical_block_number_t lbn)244 static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
245 {
246 struct vdo *vdo = vdo_from_data_vio(data_vio);
247 zone_count_t zone_number;
248 struct lbn_lock *lock = &data_vio->logical;
249
250 lock->lbn = lbn;
251 lock->locked = false;
252 vdo_waitq_init(&lock->waiters);
253 zone_number = vdo_compute_logical_zone(data_vio);
254 lock->zone = &vdo->logical_zones->zones[zone_number];
255 }
256
launch_locked_request(struct data_vio * data_vio)257 static void launch_locked_request(struct data_vio *data_vio)
258 {
259 data_vio->logical.locked = true;
260 if (data_vio->write) {
261 struct vdo *vdo = vdo_from_data_vio(data_vio);
262
263 if (vdo_is_read_only(vdo)) {
264 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
265 return;
266 }
267 }
268
269 data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
270 vdo_find_block_map_slot(data_vio);
271 }
272
acknowledge_data_vio(struct data_vio * data_vio)273 static void acknowledge_data_vio(struct data_vio *data_vio)
274 {
275 struct vdo *vdo = vdo_from_data_vio(data_vio);
276 struct bio *bio = data_vio->user_bio;
277 int error = vdo_status_to_errno(data_vio->vio.completion.result);
278
279 if (bio == NULL)
280 return;
281
282 VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
283 (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
284 "data_vio to acknowledge is not an incomplete discard");
285
286 data_vio->user_bio = NULL;
287 vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
288 if (data_vio->is_partial)
289 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
290
291 bio->bi_status = errno_to_blk_status(error);
292 bio_endio(bio);
293 }
294
copy_to_bio(struct bio * bio,char * data_ptr)295 static void copy_to_bio(struct bio *bio, char *data_ptr)
296 {
297 struct bio_vec biovec;
298 struct bvec_iter iter;
299
300 bio_for_each_segment(biovec, bio, iter) {
301 memcpy_to_bvec(&biovec, data_ptr);
302 data_ptr += biovec.bv_len;
303 }
304 }
305
get_data_vio_compression_status(struct data_vio * data_vio)306 struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
307 {
308 u32 packed = atomic_read(&data_vio->compression.status);
309
310 /* pairs with cmpxchg in set_data_vio_compression_status */
311 smp_rmb();
312 return (struct data_vio_compression_status) {
313 .stage = packed & COMPRESSION_STATUS_MASK,
314 .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
315 };
316 }
317
318 /**
319 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
320 * atomically.
321 * @status: The state to convert.
322 *
323 * Return: The compression state packed into a u32.
324 */
pack_status(struct data_vio_compression_status status)325 static u32 __must_check pack_status(struct data_vio_compression_status status)
326 {
327 return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
328 }
329
330 /**
331 * set_data_vio_compression_status() - Set the compression status of a data_vio.
332 * @data_vio: The data_vio to change.
333 * @status: The expected current status of the data_vio.
334 * @new_status: The status to set.
335 *
336 * Return: true if the new status was set, false if the data_vio's compression status did not
337 * match the expected state, and so was left unchanged.
338 */
339 static bool __must_check
set_data_vio_compression_status(struct data_vio * data_vio,struct data_vio_compression_status status,struct data_vio_compression_status new_status)340 set_data_vio_compression_status(struct data_vio *data_vio,
341 struct data_vio_compression_status status,
342 struct data_vio_compression_status new_status)
343 {
344 u32 actual;
345 u32 expected = pack_status(status);
346 u32 replacement = pack_status(new_status);
347
348 /*
349 * Extra barriers because this was original developed using a CAS operation that implicitly
350 * had them.
351 */
352 smp_mb__before_atomic();
353 actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
354 /* same as before_atomic */
355 smp_mb__after_atomic();
356 return (expected == actual);
357 }
358
advance_data_vio_compression_stage(struct data_vio * data_vio)359 struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
360 {
361 for (;;) {
362 struct data_vio_compression_status status =
363 get_data_vio_compression_status(data_vio);
364 struct data_vio_compression_status new_status = status;
365
366 if (status.stage == DATA_VIO_POST_PACKER) {
367 /* We're already in the last stage. */
368 return status;
369 }
370
371 if (status.may_not_compress) {
372 /*
373 * Compression has been dis-allowed for this VIO, so skip the rest of the
374 * path and go to the end.
375 */
376 new_status.stage = DATA_VIO_POST_PACKER;
377 } else {
378 /* Go to the next state. */
379 new_status.stage++;
380 }
381
382 if (set_data_vio_compression_status(data_vio, status, new_status))
383 return new_status;
384
385 /* Another thread changed the status out from under us so try again. */
386 }
387 }
388
389 /**
390 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
391 * @data_vio: The data_vio.
392 *
393 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
394 */
cancel_data_vio_compression(struct data_vio * data_vio)395 bool cancel_data_vio_compression(struct data_vio *data_vio)
396 {
397 struct data_vio_compression_status status, new_status;
398
399 for (;;) {
400 status = get_data_vio_compression_status(data_vio);
401 if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
402 /* This data_vio is already set up to not block in the packer. */
403 break;
404 }
405
406 new_status.stage = status.stage;
407 new_status.may_not_compress = true;
408
409 if (set_data_vio_compression_status(data_vio, status, new_status))
410 break;
411 }
412
413 return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
414 }
415
416 /**
417 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
418 * @completion: The data_vio for an external data request as a completion.
419 *
420 * This is the start of the path for all external requests. It is registered in launch_data_vio().
421 */
attempt_logical_block_lock(struct vdo_completion * completion)422 static void attempt_logical_block_lock(struct vdo_completion *completion)
423 {
424 struct data_vio *data_vio = as_data_vio(completion);
425 struct lbn_lock *lock = &data_vio->logical;
426 struct vdo *vdo = vdo_from_data_vio(data_vio);
427 struct data_vio *lock_holder;
428 int result;
429
430 assert_data_vio_in_logical_zone(data_vio);
431
432 if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
433 continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
434 return;
435 }
436
437 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
438 data_vio, false, (void **) &lock_holder);
439 if (result != VDO_SUCCESS) {
440 continue_data_vio_with_error(data_vio, result);
441 return;
442 }
443
444 if (lock_holder == NULL) {
445 /* We got the lock */
446 launch_locked_request(data_vio);
447 return;
448 }
449
450 result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
451 if (result != VDO_SUCCESS) {
452 continue_data_vio_with_error(data_vio, result);
453 return;
454 }
455
456 /*
457 * If the new request is a pure read request (not read-modify-write) and the lock_holder is
458 * writing and has received an allocation, service the read request immediately by copying
459 * data from the lock_holder to avoid having to flush the write out of the packer just to
460 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
461 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
462 * order to prevent returning data that may not have actually been written.
463 */
464 if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
465 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
466 acknowledge_data_vio(data_vio);
467 complete_data_vio(completion);
468 return;
469 }
470
471 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
472 vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
473
474 /*
475 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
476 * packer.
477 */
478 if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
479 data_vio->compression.lock_holder = lock_holder;
480 launch_data_vio_packer_callback(data_vio,
481 vdo_remove_lock_holder_from_packer);
482 }
483 }
484
485 /**
486 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
487 * same parent and other state and send it on its way.
488 * @data_vio: The data_vio to launch.
489 * @lbn: The logical block number.
490 */
launch_data_vio(struct data_vio * data_vio,logical_block_number_t lbn)491 static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
492 {
493 struct vdo_completion *completion = &data_vio->vio.completion;
494
495 /*
496 * Clearing the tree lock must happen before initializing the LBN lock, which also adds
497 * information to the tree lock.
498 */
499 memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
500 initialize_lbn_lock(data_vio, lbn);
501 INIT_LIST_HEAD(&data_vio->hash_lock_entry);
502 INIT_LIST_HEAD(&data_vio->write_entry);
503
504 memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
505
506 data_vio->is_duplicate = false;
507
508 memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
509 memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
510 vdo_reset_completion(&data_vio->decrement_completion);
511 vdo_reset_completion(completion);
512 completion->error_handler = handle_data_vio_error;
513 set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
514 vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
515 }
516
copy_from_bio(struct bio * bio,char * data_ptr)517 static void copy_from_bio(struct bio *bio, char *data_ptr)
518 {
519 struct bio_vec biovec;
520 struct bvec_iter iter;
521
522 bio_for_each_segment(biovec, bio, iter) {
523 memcpy_from_bvec(data_ptr, &biovec);
524 data_ptr += biovec.bv_len;
525 }
526 }
527
launch_bio(struct vdo * vdo,struct data_vio * data_vio,struct bio * bio)528 static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
529 {
530 logical_block_number_t lbn;
531 /*
532 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
533 * separately allocated objects).
534 */
535 memset(data_vio, 0, offsetof(struct data_vio, vio));
536 memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
537
538 data_vio->user_bio = bio;
539 data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
540 data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
541
542 /*
543 * Discards behave very differently than other requests when coming in from device-mapper.
544 * We have to be able to handle any size discards and various sector offsets within a
545 * block.
546 */
547 if (bio_op(bio) == REQ_OP_DISCARD) {
548 data_vio->remaining_discard = bio->bi_iter.bi_size;
549 data_vio->write = true;
550 data_vio->is_discard = true;
551 if (data_vio->is_partial) {
552 vdo_count_bios(&vdo->stats.bios_in_partial, bio);
553 data_vio->read = true;
554 }
555 } else if (data_vio->is_partial) {
556 vdo_count_bios(&vdo->stats.bios_in_partial, bio);
557 data_vio->read = true;
558 if (bio_data_dir(bio) == WRITE)
559 data_vio->write = true;
560 } else if (bio_data_dir(bio) == READ) {
561 data_vio->read = true;
562 } else {
563 /*
564 * Copy the bio data to a char array so that we can continue to use the data after
565 * we acknowledge the bio.
566 */
567 copy_from_bio(bio, data_vio->vio.data);
568 data_vio->is_zero = mem_is_zero(data_vio->vio.data, VDO_BLOCK_SIZE);
569 data_vio->write = true;
570 }
571
572 if (data_vio->user_bio->bi_opf & REQ_FUA)
573 data_vio->fua = true;
574
575 lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
576 launch_data_vio(data_vio, lbn);
577 }
578
assign_data_vio(struct limiter * limiter,struct data_vio * data_vio)579 static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
580 {
581 struct bio *bio = bio_list_pop(limiter->permitted_waiters);
582
583 launch_bio(limiter->pool->completion.vdo, data_vio, bio);
584 limiter->wake_count++;
585
586 bio = bio_list_peek(limiter->permitted_waiters);
587 limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
588 }
589
assign_discard_permit(struct limiter * limiter)590 static void assign_discard_permit(struct limiter *limiter)
591 {
592 struct bio *bio = bio_list_pop(&limiter->waiters);
593
594 if (limiter->arrival == U64_MAX)
595 limiter->arrival = get_arrival_time(bio);
596
597 bio_list_add(limiter->permitted_waiters, bio);
598 }
599
get_waiters(struct limiter * limiter)600 static void get_waiters(struct limiter *limiter)
601 {
602 bio_list_merge_init(&limiter->waiters, &limiter->new_waiters);
603 }
604
get_available_data_vio(struct data_vio_pool * pool)605 static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
606 {
607 struct data_vio *data_vio =
608 list_first_entry(&pool->available, struct data_vio, pool_entry);
609
610 list_del_init(&data_vio->pool_entry);
611 return data_vio;
612 }
613
assign_data_vio_to_waiter(struct limiter * limiter)614 static void assign_data_vio_to_waiter(struct limiter *limiter)
615 {
616 assign_data_vio(limiter, get_available_data_vio(limiter->pool));
617 }
618
update_limiter(struct limiter * limiter)619 static void update_limiter(struct limiter *limiter)
620 {
621 struct bio_list *waiters = &limiter->waiters;
622 data_vio_count_t available = limiter->limit - limiter->busy;
623
624 VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
625 "Release count %u is not more than busy count %u",
626 limiter->release_count, limiter->busy);
627
628 get_waiters(limiter);
629 for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
630 limiter->assigner(limiter);
631
632 if (limiter->release_count > 0) {
633 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
634 limiter->release_count = 0;
635 return;
636 }
637
638 for (; (available > 0) && !bio_list_empty(waiters); available--)
639 limiter->assigner(limiter);
640
641 WRITE_ONCE(limiter->busy, limiter->limit - available);
642 if (limiter->max_busy < limiter->busy)
643 WRITE_ONCE(limiter->max_busy, limiter->busy);
644 }
645
646 /**
647 * schedule_releases() - Ensure that release processing is scheduled.
648 * @pool: The data_vio pool.
649 *
650 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
651 * done so.
652 */
schedule_releases(struct data_vio_pool * pool)653 static void schedule_releases(struct data_vio_pool *pool)
654 {
655 /* Pairs with the barrier in process_release_callback(). */
656 smp_mb__before_atomic();
657 if (atomic_cmpxchg(&pool->processing, false, true))
658 return;
659
660 pool->completion.requeue = true;
661 vdo_launch_completion_with_priority(&pool->completion,
662 CPU_Q_COMPLETE_VIO_PRIORITY);
663 }
664
reuse_or_release_resources(struct data_vio_pool * pool,struct data_vio * data_vio,struct list_head * returned)665 static void reuse_or_release_resources(struct data_vio_pool *pool,
666 struct data_vio *data_vio,
667 struct list_head *returned)
668 {
669 if (data_vio->remaining_discard > 0) {
670 if (bio_list_empty(&pool->discard_limiter.waiters)) {
671 /* Return the data_vio's discard permit. */
672 pool->discard_limiter.release_count++;
673 } else {
674 assign_discard_permit(&pool->discard_limiter);
675 }
676 }
677
678 if (pool->limiter.arrival < pool->discard_limiter.arrival) {
679 assign_data_vio(&pool->limiter, data_vio);
680 } else if (pool->discard_limiter.arrival < U64_MAX) {
681 assign_data_vio(&pool->discard_limiter, data_vio);
682 } else {
683 list_add(&data_vio->pool_entry, returned);
684 pool->limiter.release_count++;
685 }
686 }
687
688 /**
689 * process_release_callback() - Process a batch of data_vio releases.
690 * @completion: The pool with data_vios to release.
691 */
process_release_callback(struct vdo_completion * completion)692 static void process_release_callback(struct vdo_completion *completion)
693 {
694 struct data_vio_pool *pool = as_data_vio_pool(completion);
695 bool reschedule;
696 bool drained;
697 data_vio_count_t processed;
698 data_vio_count_t to_wake;
699 data_vio_count_t discards_to_wake;
700 LIST_HEAD(returned);
701
702 spin_lock(&pool->lock);
703 get_waiters(&pool->discard_limiter);
704 get_waiters(&pool->limiter);
705 spin_unlock(&pool->lock);
706
707 if (pool->limiter.arrival == U64_MAX) {
708 struct bio *bio = bio_list_peek(&pool->limiter.waiters);
709
710 if (bio != NULL)
711 pool->limiter.arrival = get_arrival_time(bio);
712 }
713
714 for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
715 struct data_vio *data_vio;
716 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
717
718 if (entry == NULL)
719 break;
720
721 data_vio = as_data_vio(container_of(entry, struct vdo_completion,
722 work_queue_entry_link));
723 acknowledge_data_vio(data_vio);
724 reuse_or_release_resources(pool, data_vio, &returned);
725 }
726
727 spin_lock(&pool->lock);
728 /*
729 * There is a race where waiters could be added while we are in the unlocked section above.
730 * Those waiters could not see the resources we are now about to release, so we assign
731 * those resources now as we have no guarantee of being rescheduled. This is handled in
732 * update_limiter().
733 */
734 update_limiter(&pool->discard_limiter);
735 list_splice(&returned, &pool->available);
736 update_limiter(&pool->limiter);
737 to_wake = pool->limiter.wake_count;
738 pool->limiter.wake_count = 0;
739 discards_to_wake = pool->discard_limiter.wake_count;
740 pool->discard_limiter.wake_count = 0;
741
742 atomic_set(&pool->processing, false);
743 /* Pairs with the barrier in schedule_releases(). */
744 smp_mb();
745
746 reschedule = !vdo_is_funnel_queue_empty(pool->queue);
747 drained = (!reschedule &&
748 vdo_is_state_draining(&pool->state) &&
749 check_for_drain_complete_locked(pool));
750 spin_unlock(&pool->lock);
751
752 if (to_wake > 0)
753 wake_up_nr(&pool->limiter.blocked_threads, to_wake);
754
755 if (discards_to_wake > 0)
756 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
757
758 if (reschedule)
759 schedule_releases(pool);
760 else if (drained)
761 vdo_finish_draining(&pool->state);
762 }
763
initialize_limiter(struct limiter * limiter,struct data_vio_pool * pool,assigner_fn assigner,data_vio_count_t limit)764 static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
765 assigner_fn assigner, data_vio_count_t limit)
766 {
767 limiter->pool = pool;
768 limiter->assigner = assigner;
769 limiter->limit = limit;
770 limiter->arrival = U64_MAX;
771 init_waitqueue_head(&limiter->blocked_threads);
772 }
773
774 /**
775 * initialize_data_vio() - Allocate the components of a data_vio.
776 * @data_vio: The data_vio to initialize.
777 * @vdo: The vdo containing the data_vio.
778 *
779 * The caller is responsible for cleaning up the data_vio on error.
780 *
781 * Return: VDO_SUCCESS or an error.
782 */
initialize_data_vio(struct data_vio * data_vio,struct vdo * vdo)783 static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
784 {
785 struct bio *bio;
786 int result;
787
788 BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
789 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
790 &data_vio->vio.data);
791 if (result != VDO_SUCCESS)
792 return vdo_log_error_strerror(result,
793 "data_vio data allocation failure");
794
795 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
796 &data_vio->compression.block);
797 if (result != VDO_SUCCESS) {
798 return vdo_log_error_strerror(result,
799 "data_vio compressed block allocation failure");
800 }
801
802 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
803 &data_vio->scratch_block);
804 if (result != VDO_SUCCESS)
805 return vdo_log_error_strerror(result,
806 "data_vio scratch allocation failure");
807
808 result = vdo_create_bio(&bio);
809 if (result != VDO_SUCCESS)
810 return vdo_log_error_strerror(result,
811 "data_vio data bio allocation failure");
812
813 vdo_initialize_completion(&data_vio->decrement_completion, vdo,
814 VDO_DECREMENT_COMPLETION);
815 initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
816
817 return VDO_SUCCESS;
818 }
819
destroy_data_vio(struct data_vio * data_vio)820 static void destroy_data_vio(struct data_vio *data_vio)
821 {
822 if (data_vio == NULL)
823 return;
824
825 vdo_free_bio(vdo_forget(data_vio->vio.bio));
826 vdo_free(vdo_forget(data_vio->vio.data));
827 vdo_free(vdo_forget(data_vio->compression.block));
828 vdo_free(vdo_forget(data_vio->scratch_block));
829 }
830
831 /**
832 * make_data_vio_pool() - Initialize a data_vio pool.
833 * @vdo: The vdo to which the pool will belong.
834 * @pool_size: The number of data_vios in the pool.
835 * @discard_limit: The maximum number of data_vios which may be used for discards.
836 * @pool_ptr: A pointer to hold the newly allocated pool.
837 */
make_data_vio_pool(struct vdo * vdo,data_vio_count_t pool_size,data_vio_count_t discard_limit,struct data_vio_pool ** pool_ptr)838 int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
839 data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
840 {
841 int result;
842 struct data_vio_pool *pool;
843 data_vio_count_t i;
844
845 result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
846 __func__, &pool);
847 if (result != VDO_SUCCESS)
848 return result;
849
850 VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
851 "discard limit does not exceed pool size");
852 initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
853 discard_limit);
854 pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
855 initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
856 pool->limiter.permitted_waiters = &pool->limiter.waiters;
857 INIT_LIST_HEAD(&pool->available);
858 spin_lock_init(&pool->lock);
859 vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
860 vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
861 vdo_prepare_completion(&pool->completion, process_release_callback,
862 process_release_callback, vdo->thread_config.cpu_thread,
863 NULL);
864
865 result = vdo_make_funnel_queue(&pool->queue);
866 if (result != VDO_SUCCESS) {
867 free_data_vio_pool(vdo_forget(pool));
868 return result;
869 }
870
871 for (i = 0; i < pool_size; i++) {
872 struct data_vio *data_vio = &pool->data_vios[i];
873
874 result = initialize_data_vio(data_vio, vdo);
875 if (result != VDO_SUCCESS) {
876 destroy_data_vio(data_vio);
877 free_data_vio_pool(pool);
878 return result;
879 }
880
881 list_add(&data_vio->pool_entry, &pool->available);
882 }
883
884 *pool_ptr = pool;
885 return VDO_SUCCESS;
886 }
887
888 /**
889 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
890 * @pool: The data_vio pool to free.
891 *
892 * All data_vios must be returned to the pool before calling this function.
893 */
free_data_vio_pool(struct data_vio_pool * pool)894 void free_data_vio_pool(struct data_vio_pool *pool)
895 {
896 struct data_vio *data_vio, *tmp;
897
898 if (pool == NULL)
899 return;
900
901 /*
902 * Pairs with the barrier in process_release_callback(). Possibly not needed since it
903 * caters to an enqueue vs. free race.
904 */
905 smp_mb();
906 BUG_ON(atomic_read(&pool->processing));
907
908 spin_lock(&pool->lock);
909 VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
910 "data_vio pool must not have %u busy entries when being freed",
911 pool->limiter.busy);
912 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
913 bio_list_empty(&pool->limiter.new_waiters)),
914 "data_vio pool must not have threads waiting to read or write when being freed");
915 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
916 bio_list_empty(&pool->discard_limiter.new_waiters)),
917 "data_vio pool must not have threads waiting to discard when being freed");
918 spin_unlock(&pool->lock);
919
920 list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
921 list_del_init(&data_vio->pool_entry);
922 destroy_data_vio(data_vio);
923 }
924
925 vdo_free_funnel_queue(vdo_forget(pool->queue));
926 vdo_free(pool);
927 }
928
acquire_permit(struct limiter * limiter)929 static bool acquire_permit(struct limiter *limiter)
930 {
931 if (limiter->busy >= limiter->limit)
932 return false;
933
934 WRITE_ONCE(limiter->busy, limiter->busy + 1);
935 if (limiter->max_busy < limiter->busy)
936 WRITE_ONCE(limiter->max_busy, limiter->busy);
937 return true;
938 }
939
wait_permit(struct limiter * limiter,struct bio * bio)940 static void wait_permit(struct limiter *limiter, struct bio *bio)
941 __releases(&limiter->pool->lock)
942 {
943 DEFINE_WAIT(wait);
944
945 bio_list_add(&limiter->new_waiters, bio);
946 prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
947 TASK_UNINTERRUPTIBLE);
948 spin_unlock(&limiter->pool->lock);
949 io_schedule();
950 finish_wait(&limiter->blocked_threads, &wait);
951 }
952
953 /**
954 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
955 * @pool: The data_vio pool.
956 * @bio: The bio to launch.
957 *
958 * This will block if data_vios or discard permits are not available.
959 */
vdo_launch_bio(struct data_vio_pool * pool,struct bio * bio)960 void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
961 {
962 struct data_vio *data_vio;
963
964 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
965 "data_vio_pool not quiescent on acquire");
966
967 bio->bi_private = (void *) jiffies;
968 spin_lock(&pool->lock);
969 if ((bio_op(bio) == REQ_OP_DISCARD) &&
970 !acquire_permit(&pool->discard_limiter)) {
971 wait_permit(&pool->discard_limiter, bio);
972 return;
973 }
974
975 if (!acquire_permit(&pool->limiter)) {
976 wait_permit(&pool->limiter, bio);
977 return;
978 }
979
980 data_vio = get_available_data_vio(pool);
981 spin_unlock(&pool->lock);
982 launch_bio(pool->completion.vdo, data_vio, bio);
983 }
984
985 /* Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)986 static void initiate_drain(struct admin_state *state)
987 {
988 bool drained;
989 struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
990
991 spin_lock(&pool->lock);
992 drained = check_for_drain_complete_locked(pool);
993 spin_unlock(&pool->lock);
994
995 if (drained)
996 vdo_finish_draining(state);
997 }
998
assert_on_vdo_cpu_thread(const struct vdo * vdo,const char * name)999 static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
1000 {
1001 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
1002 "%s called on cpu thread", name);
1003 }
1004
1005 /**
1006 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1007 * @pool: The data_vio pool.
1008 * @completion: The completion to notify when the pool has drained.
1009 */
drain_data_vio_pool(struct data_vio_pool * pool,struct vdo_completion * completion)1010 void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1011 {
1012 assert_on_vdo_cpu_thread(completion->vdo, __func__);
1013 vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
1014 initiate_drain);
1015 }
1016
1017 /**
1018 * resume_data_vio_pool() - Resume a data_vio pool.
1019 * @pool: The data_vio pool.
1020 * @completion: The completion to notify when the pool has resumed.
1021 */
resume_data_vio_pool(struct data_vio_pool * pool,struct vdo_completion * completion)1022 void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1023 {
1024 assert_on_vdo_cpu_thread(completion->vdo, __func__);
1025 vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
1026 }
1027
dump_limiter(const char * name,struct limiter * limiter)1028 static void dump_limiter(const char *name, struct limiter *limiter)
1029 {
1030 vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
1031 limiter->limit, limiter->max_busy,
1032 ((bio_list_empty(&limiter->waiters) &&
1033 bio_list_empty(&limiter->new_waiters)) ?
1034 "no waiters" : "has waiters"));
1035 }
1036
1037 /**
1038 * dump_data_vio_pool() - Dump a data_vio pool to the log.
1039 * @pool: The data_vio pool.
1040 * @dump_vios: Whether to dump the details of each busy data_vio as well.
1041 */
dump_data_vio_pool(struct data_vio_pool * pool,bool dump_vios)1042 void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
1043 {
1044 /*
1045 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1046 * second clock tick). These numbers were picked based on experiments with lab machines.
1047 */
1048 static const int ELEMENTS_PER_BATCH = 35;
1049 static const int SLEEP_FOR_SYSLOG = 4000;
1050
1051 if (pool == NULL)
1052 return;
1053
1054 spin_lock(&pool->lock);
1055 dump_limiter("data_vios", &pool->limiter);
1056 dump_limiter("discard permits", &pool->discard_limiter);
1057 if (dump_vios) {
1058 int i;
1059 int dumped = 0;
1060
1061 for (i = 0; i < pool->limiter.limit; i++) {
1062 struct data_vio *data_vio = &pool->data_vios[i];
1063
1064 if (!list_empty(&data_vio->pool_entry))
1065 continue;
1066
1067 dump_data_vio(data_vio);
1068 if (++dumped >= ELEMENTS_PER_BATCH) {
1069 spin_unlock(&pool->lock);
1070 dumped = 0;
1071 fsleep(SLEEP_FOR_SYSLOG);
1072 spin_lock(&pool->lock);
1073 }
1074 }
1075 }
1076
1077 spin_unlock(&pool->lock);
1078 }
1079
get_data_vio_pool_active_requests(struct data_vio_pool * pool)1080 data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
1081 {
1082 return READ_ONCE(pool->limiter.busy);
1083 }
1084
get_data_vio_pool_request_limit(struct data_vio_pool * pool)1085 data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
1086 {
1087 return READ_ONCE(pool->limiter.limit);
1088 }
1089
get_data_vio_pool_maximum_requests(struct data_vio_pool * pool)1090 data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
1091 {
1092 return READ_ONCE(pool->limiter.max_busy);
1093 }
1094
update_data_vio_error_stats(struct data_vio * data_vio)1095 static void update_data_vio_error_stats(struct data_vio *data_vio)
1096 {
1097 u8 index = 0;
1098 static const char * const operations[] = {
1099 [0] = "empty",
1100 [1] = "read",
1101 [2] = "write",
1102 [3] = "read-modify-write",
1103 [5] = "read+fua",
1104 [6] = "write+fua",
1105 [7] = "read-modify-write+fua",
1106 };
1107
1108 if (data_vio->read)
1109 index = 1;
1110
1111 if (data_vio->write)
1112 index += 2;
1113
1114 if (data_vio->fua)
1115 index += 4;
1116
1117 update_vio_error_stats(&data_vio->vio,
1118 "Completing %s vio for LBN %llu with error after %s",
1119 operations[index],
1120 (unsigned long long) data_vio->logical.lbn,
1121 get_data_vio_operation_name(data_vio));
1122 }
1123
1124 static void perform_cleanup_stage(struct data_vio *data_vio,
1125 enum data_vio_cleanup_stage stage);
1126
1127 /**
1128 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1129 * the end of processing a data_vio.
1130 * @completion: The data_vio holding the lock.
1131 */
release_allocated_lock(struct vdo_completion * completion)1132 static void release_allocated_lock(struct vdo_completion *completion)
1133 {
1134 struct data_vio *data_vio = as_data_vio(completion);
1135
1136 assert_data_vio_in_allocated_zone(data_vio);
1137 release_data_vio_allocation_lock(data_vio, false);
1138 perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
1139 }
1140
1141 /** release_lock() - Release an uncontended LBN lock. */
release_lock(struct data_vio * data_vio,struct lbn_lock * lock)1142 static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1143 {
1144 struct int_map *lock_map = lock->zone->lbn_operations;
1145 struct data_vio *lock_holder;
1146
1147 if (!lock->locked) {
1148 /* The lock is not locked, so it had better not be registered in the lock map. */
1149 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
1150
1151 VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
1152 "no logical block lock held for block %llu",
1153 (unsigned long long) lock->lbn);
1154 return;
1155 }
1156
1157 /* Release the lock by removing the lock from the map. */
1158 lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
1159 VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
1160 "logical block lock mismatch for block %llu",
1161 (unsigned long long) lock->lbn);
1162 lock->locked = false;
1163 }
1164
1165 /** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
transfer_lock(struct data_vio * data_vio,struct lbn_lock * lock)1166 static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1167 {
1168 struct data_vio *lock_holder, *next_lock_holder;
1169 int result;
1170
1171 VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
1172
1173 /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1174 next_lock_holder =
1175 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
1176
1177 /* Transfer the remaining lock waiters to the next lock holder. */
1178 vdo_waitq_transfer_all_waiters(&lock->waiters,
1179 &next_lock_holder->logical.waiters);
1180
1181 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
1182 next_lock_holder, true, (void **) &lock_holder);
1183 if (result != VDO_SUCCESS) {
1184 continue_data_vio_with_error(next_lock_holder, result);
1185 return;
1186 }
1187
1188 VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
1189 "logical block lock mismatch for block %llu",
1190 (unsigned long long) lock->lbn);
1191 lock->locked = false;
1192
1193 /*
1194 * If there are still waiters, other data_vios must be trying to get the lock we just
1195 * transferred. We must ensure that the new lock holder doesn't block in the packer.
1196 */
1197 if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
1198 cancel_data_vio_compression(next_lock_holder);
1199
1200 /*
1201 * Avoid stack overflow on lock transfer.
1202 * FIXME: this is only an issue in the 1 thread config.
1203 */
1204 next_lock_holder->vio.completion.requeue = true;
1205 launch_locked_request(next_lock_holder);
1206 }
1207
1208 /**
1209 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1210 * processing a data_vio.
1211 * @completion: The data_vio holding the lock.
1212 */
release_logical_lock(struct vdo_completion * completion)1213 static void release_logical_lock(struct vdo_completion *completion)
1214 {
1215 struct data_vio *data_vio = as_data_vio(completion);
1216 struct lbn_lock *lock = &data_vio->logical;
1217
1218 assert_data_vio_in_logical_zone(data_vio);
1219
1220 if (vdo_waitq_has_waiters(&lock->waiters))
1221 transfer_lock(data_vio, lock);
1222 else
1223 release_lock(data_vio, lock);
1224
1225 vdo_release_flush_generation_lock(data_vio);
1226 perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
1227 }
1228
1229 /** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
clean_hash_lock(struct vdo_completion * completion)1230 static void clean_hash_lock(struct vdo_completion *completion)
1231 {
1232 struct data_vio *data_vio = as_data_vio(completion);
1233
1234 assert_data_vio_in_hash_zone(data_vio);
1235 if (completion->result != VDO_SUCCESS) {
1236 vdo_clean_failed_hash_lock(data_vio);
1237 return;
1238 }
1239
1240 vdo_release_hash_lock(data_vio);
1241 perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
1242 }
1243
1244 /**
1245 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1246 * @data_vio: The data_vio.
1247 *
1248 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1249 * pool.
1250 */
finish_cleanup(struct data_vio * data_vio)1251 static void finish_cleanup(struct data_vio *data_vio)
1252 {
1253 struct vdo_completion *completion = &data_vio->vio.completion;
1254 u32 discard_size = min_t(u32, data_vio->remaining_discard,
1255 VDO_BLOCK_SIZE - data_vio->offset);
1256
1257 VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
1258 "complete data_vio has no allocation lock");
1259 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
1260 "complete data_vio has no hash lock");
1261 if ((data_vio->remaining_discard <= discard_size) ||
1262 (completion->result != VDO_SUCCESS)) {
1263 struct data_vio_pool *pool = completion->vdo->data_vio_pool;
1264
1265 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
1266 schedule_releases(pool);
1267 return;
1268 }
1269
1270 data_vio->remaining_discard -= discard_size;
1271 data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
1272 data_vio->read = data_vio->is_partial;
1273 data_vio->offset = 0;
1274 completion->requeue = true;
1275 data_vio->first_reference_operation_complete = false;
1276 launch_data_vio(data_vio, data_vio->logical.lbn + 1);
1277 }
1278
1279 /** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
perform_cleanup_stage(struct data_vio * data_vio,enum data_vio_cleanup_stage stage)1280 static void perform_cleanup_stage(struct data_vio *data_vio,
1281 enum data_vio_cleanup_stage stage)
1282 {
1283 struct vdo *vdo = vdo_from_data_vio(data_vio);
1284
1285 switch (stage) {
1286 case VIO_RELEASE_HASH_LOCK:
1287 if (data_vio->hash_lock != NULL) {
1288 launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
1289 return;
1290 }
1291 fallthrough;
1292
1293 case VIO_RELEASE_ALLOCATED:
1294 if (data_vio_has_allocation(data_vio)) {
1295 launch_data_vio_allocated_zone_callback(data_vio,
1296 release_allocated_lock);
1297 return;
1298 }
1299 fallthrough;
1300
1301 case VIO_RELEASE_RECOVERY_LOCKS:
1302 if ((data_vio->recovery_sequence_number > 0) &&
1303 (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
1304 (data_vio->vio.completion.result != VDO_READ_ONLY))
1305 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1306 fallthrough;
1307
1308 case VIO_RELEASE_LOGICAL:
1309 launch_data_vio_logical_callback(data_vio, release_logical_lock);
1310 return;
1311
1312 default:
1313 finish_cleanup(data_vio);
1314 }
1315 }
1316
complete_data_vio(struct vdo_completion * completion)1317 void complete_data_vio(struct vdo_completion *completion)
1318 {
1319 struct data_vio *data_vio = as_data_vio(completion);
1320
1321 completion->error_handler = NULL;
1322 data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
1323 perform_cleanup_stage(data_vio,
1324 (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
1325 }
1326
enter_read_only_mode(struct vdo_completion * completion)1327 static void enter_read_only_mode(struct vdo_completion *completion)
1328 {
1329 if (vdo_is_read_only(completion->vdo))
1330 return;
1331
1332 if (completion->result != VDO_READ_ONLY) {
1333 struct data_vio *data_vio = as_data_vio(completion);
1334
1335 vdo_log_error_strerror(completion->result,
1336 "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1337 (unsigned long long) data_vio->logical.lbn,
1338 (unsigned long long) data_vio->new_mapped.pbn,
1339 (unsigned long long) data_vio->mapped.pbn,
1340 (unsigned long long) data_vio->allocation.pbn,
1341 get_data_vio_operation_name(data_vio));
1342 }
1343
1344 vdo_enter_read_only_mode(completion->vdo, completion->result);
1345 }
1346
handle_data_vio_error(struct vdo_completion * completion)1347 void handle_data_vio_error(struct vdo_completion *completion)
1348 {
1349 struct data_vio *data_vio = as_data_vio(completion);
1350
1351 if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
1352 enter_read_only_mode(completion);
1353
1354 update_data_vio_error_stats(data_vio);
1355 complete_data_vio(completion);
1356 }
1357
1358 /**
1359 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1360 * data_vio.
1361 * @data_vio: The data_vio.
1362 */
get_data_vio_operation_name(struct data_vio * data_vio)1363 const char *get_data_vio_operation_name(struct data_vio *data_vio)
1364 {
1365 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
1366 ARRAY_SIZE(ASYNC_OPERATION_NAMES));
1367
1368 return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
1369 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
1370 "unknown async operation");
1371 }
1372
1373 /**
1374 * data_vio_allocate_data_block() - Allocate a data block.
1375 * @data_vio: The data_vio.
1376 * @write_lock_type: The type of write lock to obtain on the block.
1377 * @callback: The callback which will attempt an allocation in the current zone and continue if it
1378 * succeeds.
1379 * @error_handler: The handler for errors while allocating.
1380 */
data_vio_allocate_data_block(struct data_vio * data_vio,enum pbn_lock_type write_lock_type,vdo_action_fn callback,vdo_action_fn error_handler)1381 void data_vio_allocate_data_block(struct data_vio *data_vio,
1382 enum pbn_lock_type write_lock_type,
1383 vdo_action_fn callback, vdo_action_fn error_handler)
1384 {
1385 struct allocation *allocation = &data_vio->allocation;
1386
1387 VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
1388 "data_vio does not have an allocation");
1389 allocation->write_lock_type = write_lock_type;
1390 allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
1391 allocation->first_allocation_zone = allocation->zone->zone_number;
1392
1393 data_vio->vio.completion.error_handler = error_handler;
1394 launch_data_vio_allocated_zone_callback(data_vio, callback);
1395 }
1396
1397 /**
1398 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1399 * @data_vio: The data_vio.
1400 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1401 *
1402 * If the reference to the locked block is still provisional, it will be released as well.
1403 */
release_data_vio_allocation_lock(struct data_vio * data_vio,bool reset)1404 void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
1405 {
1406 struct allocation *allocation = &data_vio->allocation;
1407 physical_block_number_t locked_pbn = allocation->pbn;
1408
1409 assert_data_vio_in_allocated_zone(data_vio);
1410
1411 if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
1412 allocation->pbn = VDO_ZERO_BLOCK;
1413
1414 vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
1415 vdo_forget(allocation->lock));
1416 }
1417
1418 /**
1419 * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1420 * @data_vio: The data_vio.
1421 * @mapping_state: The mapping state indicating which fragment to decompress.
1422 * @buffer: The buffer to receive the uncompressed data.
1423 */
uncompress_data_vio(struct data_vio * data_vio,enum block_mapping_state mapping_state,char * buffer)1424 int uncompress_data_vio(struct data_vio *data_vio,
1425 enum block_mapping_state mapping_state, char *buffer)
1426 {
1427 int size;
1428 u16 fragment_offset, fragment_size;
1429 struct compressed_block *block = data_vio->compression.block;
1430 int result = vdo_get_compressed_block_fragment(mapping_state, block,
1431 &fragment_offset, &fragment_size);
1432
1433 if (result != VDO_SUCCESS) {
1434 vdo_log_debug("%s: compressed fragment error %d", __func__, result);
1435 return result;
1436 }
1437
1438 size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
1439 fragment_size, VDO_BLOCK_SIZE);
1440 if (size != VDO_BLOCK_SIZE) {
1441 vdo_log_debug("%s: lz4 error", __func__);
1442 return VDO_INVALID_FRAGMENT;
1443 }
1444
1445 return VDO_SUCCESS;
1446 }
1447
1448 /**
1449 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1450 * @completion: The data_vio which has just finished its read.
1451 *
1452 * This callback is registered in read_block().
1453 */
modify_for_partial_write(struct vdo_completion * completion)1454 static void modify_for_partial_write(struct vdo_completion *completion)
1455 {
1456 struct data_vio *data_vio = as_data_vio(completion);
1457 char *data = data_vio->vio.data;
1458 struct bio *bio = data_vio->user_bio;
1459
1460 assert_data_vio_on_cpu_thread(data_vio);
1461
1462 if (bio_op(bio) == REQ_OP_DISCARD) {
1463 memset(data + data_vio->offset, '\0', min_t(u32,
1464 data_vio->remaining_discard,
1465 VDO_BLOCK_SIZE - data_vio->offset));
1466 } else {
1467 copy_from_bio(bio, data + data_vio->offset);
1468 }
1469
1470 data_vio->is_zero = mem_is_zero(data, VDO_BLOCK_SIZE);
1471 data_vio->read = false;
1472 launch_data_vio_logical_callback(data_vio,
1473 continue_data_vio_with_block_map_slot);
1474 }
1475
complete_read(struct vdo_completion * completion)1476 static void complete_read(struct vdo_completion *completion)
1477 {
1478 struct data_vio *data_vio = as_data_vio(completion);
1479 char *data = data_vio->vio.data;
1480 bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
1481
1482 assert_data_vio_on_cpu_thread(data_vio);
1483
1484 if (compressed) {
1485 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
1486
1487 if (result != VDO_SUCCESS) {
1488 continue_data_vio_with_error(data_vio, result);
1489 return;
1490 }
1491 }
1492
1493 if (data_vio->write) {
1494 modify_for_partial_write(completion);
1495 return;
1496 }
1497
1498 if (compressed || data_vio->is_partial)
1499 copy_to_bio(data_vio->user_bio, data + data_vio->offset);
1500
1501 acknowledge_data_vio(data_vio);
1502 complete_data_vio(completion);
1503 }
1504
read_endio(struct bio * bio)1505 static void read_endio(struct bio *bio)
1506 {
1507 struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
1508 int result = blk_status_to_errno(bio->bi_status);
1509
1510 vdo_count_completed_bios(bio);
1511 if (result != VDO_SUCCESS) {
1512 continue_data_vio_with_error(data_vio, result);
1513 return;
1514 }
1515
1516 launch_data_vio_cpu_callback(data_vio, complete_read,
1517 CPU_Q_COMPLETE_READ_PRIORITY);
1518 }
1519
complete_zero_read(struct vdo_completion * completion)1520 static void complete_zero_read(struct vdo_completion *completion)
1521 {
1522 struct data_vio *data_vio = as_data_vio(completion);
1523
1524 assert_data_vio_on_cpu_thread(data_vio);
1525
1526 if (data_vio->is_partial) {
1527 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
1528 if (data_vio->write) {
1529 modify_for_partial_write(completion);
1530 return;
1531 }
1532 } else {
1533 zero_fill_bio(data_vio->user_bio);
1534 }
1535
1536 complete_read(completion);
1537 }
1538
1539 /**
1540 * read_block() - Read a block asynchronously.
1541 * @completion: The data_vio doing the read.
1542 *
1543 * This is the callback registered in read_block_mapping().
1544 */
read_block(struct vdo_completion * completion)1545 static void read_block(struct vdo_completion *completion)
1546 {
1547 struct data_vio *data_vio = as_data_vio(completion);
1548 struct vio *vio = as_vio(completion);
1549 int result = VDO_SUCCESS;
1550
1551 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1552 launch_data_vio_cpu_callback(data_vio, complete_zero_read,
1553 CPU_Q_COMPLETE_VIO_PRIORITY);
1554 return;
1555 }
1556
1557 data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
1558 if (vdo_is_state_compressed(data_vio->mapped.state)) {
1559 result = vio_reset_bio(vio, (char *) data_vio->compression.block,
1560 read_endio, REQ_OP_READ, data_vio->mapped.pbn);
1561 } else {
1562 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
1563
1564 if (data_vio->is_partial) {
1565 result = vio_reset_bio(vio, vio->data, read_endio, opf,
1566 data_vio->mapped.pbn);
1567 } else {
1568 /* A full 4k read. Use the incoming bio to avoid having to copy the data */
1569 bio_reset(vio->bio, vio->bio->bi_bdev, opf);
1570 bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
1571 data_vio->user_bio, GFP_KERNEL);
1572
1573 /* Copy over the original bio iovec and opflags. */
1574 vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
1575 data_vio->mapped.pbn);
1576 }
1577 }
1578
1579 if (result != VDO_SUCCESS) {
1580 continue_data_vio_with_error(data_vio, result);
1581 return;
1582 }
1583
1584 vdo_submit_data_vio(data_vio);
1585 }
1586
1587 static inline struct data_vio *
reference_count_update_completion_as_data_vio(struct vdo_completion * completion)1588 reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
1589 {
1590 if (completion->type == VIO_COMPLETION)
1591 return as_data_vio(completion);
1592
1593 return container_of(completion, struct data_vio, decrement_completion);
1594 }
1595
1596 /**
1597 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1598 * made its reference updates. Handle any error from either, or proceed
1599 * to updating the block map.
1600 * @completion: The completion of the write in progress.
1601 */
update_block_map(struct vdo_completion * completion)1602 static void update_block_map(struct vdo_completion *completion)
1603 {
1604 struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
1605
1606 assert_data_vio_in_logical_zone(data_vio);
1607
1608 if (!data_vio->first_reference_operation_complete) {
1609 /* Rendezvous, we're first */
1610 data_vio->first_reference_operation_complete = true;
1611 return;
1612 }
1613
1614 completion = &data_vio->vio.completion;
1615 vdo_set_completion_result(completion, data_vio->decrement_completion.result);
1616 if (completion->result != VDO_SUCCESS) {
1617 handle_data_vio_error(completion);
1618 return;
1619 }
1620
1621 completion->error_handler = handle_data_vio_error;
1622 if (data_vio->hash_lock != NULL)
1623 set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
1624 else
1625 completion->callback = complete_data_vio;
1626
1627 data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
1628 vdo_put_mapped_block(data_vio);
1629 }
1630
decrement_reference_count(struct vdo_completion * completion)1631 static void decrement_reference_count(struct vdo_completion *completion)
1632 {
1633 struct data_vio *data_vio = container_of(completion, struct data_vio,
1634 decrement_completion);
1635
1636 assert_data_vio_in_mapped_zone(data_vio);
1637
1638 vdo_set_completion_callback(completion, update_block_map,
1639 data_vio->logical.zone->thread_id);
1640 completion->error_handler = update_block_map;
1641 vdo_modify_reference_count(completion, &data_vio->decrement_updater);
1642 }
1643
increment_reference_count(struct vdo_completion * completion)1644 static void increment_reference_count(struct vdo_completion *completion)
1645 {
1646 struct data_vio *data_vio = as_data_vio(completion);
1647
1648 assert_data_vio_in_new_mapped_zone(data_vio);
1649
1650 if (data_vio->downgrade_allocation_lock) {
1651 /*
1652 * Now that the data has been written, it's safe to deduplicate against the
1653 * block. Downgrade the allocation lock to a read lock so it can be used later by
1654 * the hash lock. This is done here since it needs to happen sometime before we
1655 * return to the hash zone, and we are currently on the correct thread. For
1656 * compressed blocks, the downgrade will have already been done.
1657 */
1658 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
1659 }
1660
1661 set_data_vio_logical_callback(data_vio, update_block_map);
1662 completion->error_handler = update_block_map;
1663 vdo_modify_reference_count(completion, &data_vio->increment_updater);
1664 }
1665
1666 /** journal_remapping() - Add a recovery journal entry for a data remapping. */
journal_remapping(struct vdo_completion * completion)1667 static void journal_remapping(struct vdo_completion *completion)
1668 {
1669 struct data_vio *data_vio = as_data_vio(completion);
1670
1671 assert_data_vio_in_journal_zone(data_vio);
1672
1673 data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
1674 data_vio->decrement_updater.zpbn = data_vio->mapped;
1675 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1676 data_vio->first_reference_operation_complete = true;
1677 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
1678 set_data_vio_logical_callback(data_vio, update_block_map);
1679 } else {
1680 set_data_vio_new_mapped_zone_callback(data_vio,
1681 increment_reference_count);
1682 }
1683
1684 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1685 data_vio->first_reference_operation_complete = true;
1686 } else {
1687 vdo_set_completion_callback(&data_vio->decrement_completion,
1688 decrement_reference_count,
1689 data_vio->mapped.zone->thread_id);
1690 }
1691
1692 data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
1693 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
1694 }
1695
1696 /**
1697 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1698 * @completion: The data_vio doing the read.
1699 *
1700 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1701 * journal entry referencing the removal of this LBN->PBN mapping.
1702 */
read_old_block_mapping(struct vdo_completion * completion)1703 static void read_old_block_mapping(struct vdo_completion *completion)
1704 {
1705 struct data_vio *data_vio = as_data_vio(completion);
1706
1707 assert_data_vio_in_logical_zone(data_vio);
1708
1709 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
1710 set_data_vio_journal_callback(data_vio, journal_remapping);
1711 vdo_get_mapped_block(data_vio);
1712 }
1713
update_metadata_for_data_vio_write(struct data_vio * data_vio,struct pbn_lock * lock)1714 void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
1715 {
1716 data_vio->increment_updater = (struct reference_updater) {
1717 .operation = VDO_JOURNAL_DATA_REMAPPING,
1718 .increment = true,
1719 .zpbn = data_vio->new_mapped,
1720 .lock = lock,
1721 };
1722
1723 launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
1724 }
1725
1726 /**
1727 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1728 * @completion: The data_vio.
1729 *
1730 * This is the callback registered in launch_compress_data_vio().
1731 */
pack_compressed_data(struct vdo_completion * completion)1732 static void pack_compressed_data(struct vdo_completion *completion)
1733 {
1734 struct data_vio *data_vio = as_data_vio(completion);
1735
1736 assert_data_vio_in_packer_zone(data_vio);
1737
1738 if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1739 get_data_vio_compression_status(data_vio).may_not_compress) {
1740 write_data_vio(data_vio);
1741 return;
1742 }
1743
1744 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
1745 vdo_attempt_packing(data_vio);
1746 }
1747
1748 /**
1749 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1750 * @completion: The data_vio.
1751 *
1752 * This callback is registered in launch_compress_data_vio().
1753 */
compress_data_vio(struct vdo_completion * completion)1754 static void compress_data_vio(struct vdo_completion *completion)
1755 {
1756 struct data_vio *data_vio = as_data_vio(completion);
1757 int size;
1758
1759 assert_data_vio_on_cpu_thread(data_vio);
1760
1761 /*
1762 * By putting the compressed data at the start of the compressed block data field, we won't
1763 * need to copy it if this data_vio becomes a compressed write agent.
1764 */
1765 size = LZ4_compress_default(data_vio->vio.data,
1766 data_vio->compression.block->data, VDO_BLOCK_SIZE,
1767 VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
1768 (char *) vdo_get_work_queue_private_data());
1769 if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
1770 data_vio->compression.size = size;
1771 launch_data_vio_packer_callback(data_vio, pack_compressed_data);
1772 return;
1773 }
1774
1775 write_data_vio(data_vio);
1776 }
1777
1778 /**
1779 * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1780 * @data_vio: The data_vio.
1781 *
1782 * This is a re-entry point to vio_write used by hash locks.
1783 */
launch_compress_data_vio(struct data_vio * data_vio)1784 void launch_compress_data_vio(struct data_vio *data_vio)
1785 {
1786 VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
1787 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
1788 "data_vio to compress has a hash_lock");
1789 VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
1790 "data_vio to compress has an allocation");
1791
1792 /*
1793 * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1794 * compression:
1795 *
1796 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1797 * write request also requests FUA.
1798 *
1799 * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1800 *
1801 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1802 * yet been acknowledged and hence blocking in the packer would be bad.
1803 *
1804 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1805 * packer would also be bad.
1806 */
1807 if (data_vio->fua ||
1808 !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1809 ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
1810 (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
1811 write_data_vio(data_vio);
1812 return;
1813 }
1814
1815 data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
1816 launch_data_vio_cpu_callback(data_vio, compress_data_vio,
1817 CPU_Q_COMPRESS_BLOCK_PRIORITY);
1818 }
1819
1820 /**
1821 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1822 * name as set).
1823 * @completion: The data_vio.
1824 *
1825 * This callback is registered in prepare_for_dedupe().
1826 */
hash_data_vio(struct vdo_completion * completion)1827 static void hash_data_vio(struct vdo_completion *completion)
1828 {
1829 struct data_vio *data_vio = as_data_vio(completion);
1830
1831 assert_data_vio_on_cpu_thread(data_vio);
1832 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
1833
1834 murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
1835 &data_vio->record_name);
1836
1837 data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
1838 &data_vio->record_name);
1839 data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
1840 launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
1841 }
1842
1843 /** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
prepare_for_dedupe(struct data_vio * data_vio)1844 static void prepare_for_dedupe(struct data_vio *data_vio)
1845 {
1846 /* We don't care what thread we are on. */
1847 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
1848
1849 /*
1850 * Before we can dedupe, we need to know the record name, so the first
1851 * step is to hash the block data.
1852 */
1853 data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
1854 launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
1855 }
1856
1857 /**
1858 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1859 * when a data_vio's write to the underlying storage has completed.
1860 * @bio: The bio to update.
1861 */
write_bio_finished(struct bio * bio)1862 static void write_bio_finished(struct bio *bio)
1863 {
1864 struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
1865
1866 vdo_count_completed_bios(bio);
1867 vdo_set_completion_result(&data_vio->vio.completion,
1868 blk_status_to_errno(bio->bi_status));
1869 data_vio->downgrade_allocation_lock = true;
1870 update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
1871 }
1872
1873 /** write_data_vio() - Write a data block to storage without compression. */
write_data_vio(struct data_vio * data_vio)1874 void write_data_vio(struct data_vio *data_vio)
1875 {
1876 struct data_vio_compression_status status, new_status;
1877 int result;
1878
1879 if (!data_vio_has_allocation(data_vio)) {
1880 /*
1881 * There was no space to write this block and we failed to deduplicate or compress
1882 * it.
1883 */
1884 continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
1885 return;
1886 }
1887
1888 new_status = (struct data_vio_compression_status) {
1889 .stage = DATA_VIO_POST_PACKER,
1890 .may_not_compress = true,
1891 };
1892
1893 do {
1894 status = get_data_vio_compression_status(data_vio);
1895 } while ((status.stage != DATA_VIO_POST_PACKER) &&
1896 !set_data_vio_compression_status(data_vio, status, new_status));
1897
1898 /* Write the data from the data block buffer. */
1899 result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
1900 write_bio_finished, REQ_OP_WRITE,
1901 data_vio->allocation.pbn);
1902 if (result != VDO_SUCCESS) {
1903 continue_data_vio_with_error(data_vio, result);
1904 return;
1905 }
1906
1907 data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
1908 vdo_submit_data_vio(data_vio);
1909 }
1910
1911 /**
1912 * acknowledge_write_callback() - Acknowledge a write to the requestor.
1913 * @completion: The data_vio.
1914 *
1915 * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1916 */
acknowledge_write_callback(struct vdo_completion * completion)1917 static void acknowledge_write_callback(struct vdo_completion *completion)
1918 {
1919 struct data_vio *data_vio = as_data_vio(completion);
1920 struct vdo *vdo = completion->vdo;
1921
1922 VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
1923 (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
1924 "%s() called on bio ack queue", __func__);
1925 VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
1926 "write VIO to be acknowledged has a flush generation lock");
1927 acknowledge_data_vio(data_vio);
1928 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1929 /* This is a zero write or discard */
1930 update_metadata_for_data_vio_write(data_vio, NULL);
1931 return;
1932 }
1933
1934 prepare_for_dedupe(data_vio);
1935 }
1936
1937 /**
1938 * allocate_block() - Attempt to allocate a block in the current allocation zone.
1939 * @completion: The data_vio.
1940 *
1941 * This callback is registered in continue_write_with_block_map_slot().
1942 */
allocate_block(struct vdo_completion * completion)1943 static void allocate_block(struct vdo_completion *completion)
1944 {
1945 struct data_vio *data_vio = as_data_vio(completion);
1946
1947 assert_data_vio_in_allocated_zone(data_vio);
1948
1949 if (!vdo_allocate_block_in_zone(data_vio))
1950 return;
1951
1952 completion->error_handler = handle_data_vio_error;
1953 WRITE_ONCE(data_vio->allocation_succeeded, true);
1954 data_vio->new_mapped = (struct zoned_pbn) {
1955 .zone = data_vio->allocation.zone,
1956 .pbn = data_vio->allocation.pbn,
1957 .state = VDO_MAPPING_STATE_UNCOMPRESSED,
1958 };
1959
1960 if (data_vio->fua ||
1961 data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
1962 prepare_for_dedupe(data_vio);
1963 return;
1964 }
1965
1966 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
1967 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
1968 }
1969
1970 /**
1971 * handle_allocation_error() - Handle an error attempting to allocate a block.
1972 * @completion: The data_vio.
1973 *
1974 * This error handler is registered in continue_write_with_block_map_slot().
1975 */
handle_allocation_error(struct vdo_completion * completion)1976 static void handle_allocation_error(struct vdo_completion *completion)
1977 {
1978 struct data_vio *data_vio = as_data_vio(completion);
1979
1980 if (completion->result == VDO_NO_SPACE) {
1981 /* We failed to get an allocation, but we can try to dedupe. */
1982 vdo_reset_completion(completion);
1983 completion->error_handler = handle_data_vio_error;
1984 prepare_for_dedupe(data_vio);
1985 return;
1986 }
1987
1988 /* We got a "real" error, not just a failure to allocate, so fail the request. */
1989 handle_data_vio_error(completion);
1990 }
1991
assert_is_discard(struct data_vio * data_vio)1992 static int assert_is_discard(struct data_vio *data_vio)
1993 {
1994 int result = VDO_ASSERT(data_vio->is_discard,
1995 "data_vio with no block map page is a discard");
1996
1997 return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
1998 }
1999
2000 /**
2001 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
2002 * @completion: The data_vio to continue.
2003 *
2004 * This callback is registered in launch_read_data_vio().
2005 */
continue_data_vio_with_block_map_slot(struct vdo_completion * completion)2006 void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
2007 {
2008 struct data_vio *data_vio = as_data_vio(completion);
2009
2010 assert_data_vio_in_logical_zone(data_vio);
2011 if (data_vio->read) {
2012 set_data_vio_logical_callback(data_vio, read_block);
2013 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
2014 vdo_get_mapped_block(data_vio);
2015 return;
2016 }
2017
2018 vdo_acquire_flush_generation_lock(data_vio);
2019
2020 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
2021 /*
2022 * This is a discard for a block on a block map page which has not been allocated, so
2023 * there's nothing more we need to do.
2024 */
2025 completion->callback = complete_data_vio;
2026 continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
2027 return;
2028 }
2029
2030 /*
2031 * We need an allocation if this is neither a full-block discard nor a
2032 * full-block zero write.
2033 */
2034 if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
2035 data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
2036 handle_allocation_error);
2037 return;
2038 }
2039
2040 /*
2041 * We don't need to write any data, so skip allocation and just update the block map and
2042 * reference counts (via the journal).
2043 */
2044 data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
2045 if (data_vio->is_zero)
2046 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
2047
2048 if (data_vio->remaining_discard > (u32) (VDO_BLOCK_SIZE - data_vio->offset)) {
2049 /* This is not the final block of a discard so we can't acknowledge it yet. */
2050 update_metadata_for_data_vio_write(data_vio, NULL);
2051 return;
2052 }
2053
2054 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
2055 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
2056 }
2057