xref: /linux/drivers/md/dm-vdo/vio.c (revision 5014bebee0cffda14fafae5a2534d08120b7b9e8)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "vio.h"
7 
8 #include <linux/bio.h>
9 #include <linux/blkdev.h>
10 #include <linux/kernel.h>
11 #include <linux/ratelimit.h>
12 
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "permassert.h"
16 
17 #include "constants.h"
18 #include "io-submitter.h"
19 #include "vdo.h"
20 
21 /* A vio_pool is a collection of preallocated vios. */
22 struct vio_pool {
23 	/* The number of objects managed by the pool */
24 	size_t size;
25 	/* The list of objects which are available */
26 	struct list_head available;
27 	/* The queue of requestors waiting for objects from the pool */
28 	struct vdo_wait_queue waiting;
29 	/* The number of objects currently in use */
30 	size_t busy_count;
31 	/* The list of objects which are in use */
32 	struct list_head busy;
33 	/* The ID of the thread on which this pool may be used */
34 	thread_id_t thread_id;
35 	/* The buffer backing the pool's vios */
36 	char *buffer;
37 	/* The pool entries */
38 	struct pooled_vio vios[];
39 };
40 
pbn_from_vio_bio(struct bio * bio)41 physical_block_number_t pbn_from_vio_bio(struct bio *bio)
42 {
43 	struct vio *vio = bio->bi_private;
44 	struct vdo *vdo = vio->completion.vdo;
45 	physical_block_number_t pbn = bio->bi_iter.bi_sector / VDO_SECTORS_PER_BLOCK;
46 
47 	return ((pbn == VDO_GEOMETRY_BLOCK_LOCATION) ? pbn : pbn + vdo->geometry.bio_offset);
48 }
49 
create_multi_block_bio(block_count_t size,struct bio ** bio_ptr)50 static int create_multi_block_bio(block_count_t size, struct bio **bio_ptr)
51 {
52 	struct bio *bio = NULL;
53 	int result;
54 
55 	result = vdo_allocate_extended(struct bio, size + 1, struct bio_vec,
56 				       "bio", &bio);
57 	if (result != VDO_SUCCESS)
58 		return result;
59 
60 	*bio_ptr = bio;
61 	return VDO_SUCCESS;
62 }
63 
vdo_create_bio(struct bio ** bio_ptr)64 int vdo_create_bio(struct bio **bio_ptr)
65 {
66 	return create_multi_block_bio(1, bio_ptr);
67 }
68 
vdo_free_bio(struct bio * bio)69 void vdo_free_bio(struct bio *bio)
70 {
71 	if (bio == NULL)
72 		return;
73 
74 	bio_uninit(bio);
75 	vdo_free(vdo_forget(bio));
76 }
77 
allocate_vio_components(struct vdo * vdo,enum vio_type vio_type,enum vio_priority priority,void * parent,unsigned int block_count,char * data,struct vio * vio)78 int allocate_vio_components(struct vdo *vdo, enum vio_type vio_type,
79 			    enum vio_priority priority, void *parent,
80 			    unsigned int block_count, char *data, struct vio *vio)
81 {
82 	struct bio *bio;
83 	int result;
84 
85 	result = VDO_ASSERT(block_count <= MAX_BLOCKS_PER_VIO,
86 			    "block count %u does not exceed maximum %u", block_count,
87 			    MAX_BLOCKS_PER_VIO);
88 	if (result != VDO_SUCCESS)
89 		return result;
90 
91 	result = VDO_ASSERT(((vio_type != VIO_TYPE_UNINITIALIZED) && (vio_type != VIO_TYPE_DATA)),
92 			    "%d is a metadata type", vio_type);
93 	if (result != VDO_SUCCESS)
94 		return result;
95 
96 	result = create_multi_block_bio(block_count, &bio);
97 	if (result != VDO_SUCCESS)
98 		return result;
99 
100 	initialize_vio(vio, bio, block_count, vio_type, priority, vdo);
101 	vio->completion.parent = parent;
102 	vio->data = data;
103 	return VDO_SUCCESS;
104 }
105 
106 /**
107  * create_multi_block_metadata_vio() - Create a vio.
108  * @vdo: The vdo on which the vio will operate.
109  * @vio_type: The type of vio to create.
110  * @priority: The relative priority to assign to the vio.
111  * @parent: The parent of the vio.
112  * @block_count: The size of the vio in blocks.
113  * @data: The buffer.
114  * @vio_ptr: A pointer to hold the new vio.
115  *
116  * Return: VDO_SUCCESS or an error.
117  */
create_multi_block_metadata_vio(struct vdo * vdo,enum vio_type vio_type,enum vio_priority priority,void * parent,unsigned int block_count,char * data,struct vio ** vio_ptr)118 int create_multi_block_metadata_vio(struct vdo *vdo, enum vio_type vio_type,
119 				    enum vio_priority priority, void *parent,
120 				    unsigned int block_count, char *data,
121 				    struct vio **vio_ptr)
122 {
123 	struct vio *vio;
124 	int result;
125 
126 	BUILD_BUG_ON(sizeof(struct vio) > 256);
127 
128 	/*
129 	 * Metadata vios should use direct allocation and not use the buffer pool, which is
130 	 * reserved for submissions from the linux block layer.
131 	 */
132 	result = vdo_allocate(1, struct vio, __func__, &vio);
133 	if (result != VDO_SUCCESS) {
134 		vdo_log_error("metadata vio allocation failure %d", result);
135 		return result;
136 	}
137 
138 	result = allocate_vio_components(vdo, vio_type, priority, parent, block_count,
139 					 data, vio);
140 	if (result != VDO_SUCCESS) {
141 		vdo_free(vio);
142 		return result;
143 	}
144 
145 	*vio_ptr  = vio;
146 	return VDO_SUCCESS;
147 }
148 
149 /**
150  * free_vio_components() - Free the components of a vio embedded in a larger structure.
151  * @vio: The vio to destroy
152  */
free_vio_components(struct vio * vio)153 void free_vio_components(struct vio *vio)
154 {
155 	if (vio == NULL)
156 		return;
157 
158 	BUG_ON(is_data_vio(vio));
159 	vdo_free_bio(vdo_forget(vio->bio));
160 }
161 
162 /**
163  * free_vio() - Destroy a vio.
164  * @vio: The vio to destroy.
165  */
free_vio(struct vio * vio)166 void free_vio(struct vio *vio)
167 {
168 	free_vio_components(vio);
169 	vdo_free(vio);
170 }
171 
172 /* Set bio properties for a VDO read or write. */
vdo_set_bio_properties(struct bio * bio,struct vio * vio,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)173 void vdo_set_bio_properties(struct bio *bio, struct vio *vio, bio_end_io_t callback,
174 			    blk_opf_t bi_opf, physical_block_number_t pbn)
175 {
176 	struct vdo *vdo = vio->completion.vdo;
177 	struct device_config *config = vdo->device_config;
178 
179 	pbn -= vdo->geometry.bio_offset;
180 	vio->bio_zone = ((pbn / config->thread_counts.bio_rotation_interval) %
181 			 config->thread_counts.bio_threads);
182 
183 	bio->bi_private = vio;
184 	bio->bi_end_io = callback;
185 	bio->bi_opf = bi_opf;
186 	bio->bi_iter.bi_sector = pbn * VDO_SECTORS_PER_BLOCK;
187 }
188 
189 /*
190  * Prepares the bio to perform IO with the specified buffer. May only be used on a VDO-allocated
191  * bio, as it assumes the bio wraps a 4k-multiple buffer that is 4k aligned, but there does not
192  * have to be a vio associated with the bio.
193  */
vio_reset_bio(struct vio * vio,char * data,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)194 int vio_reset_bio(struct vio *vio, char *data, bio_end_io_t callback,
195 		  blk_opf_t bi_opf, physical_block_number_t pbn)
196 {
197 	return vio_reset_bio_with_size(vio, data, vio->block_count * VDO_BLOCK_SIZE,
198 				       callback, bi_opf, pbn);
199 }
200 
vio_reset_bio_with_size(struct vio * vio,char * data,int size,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)201 int vio_reset_bio_with_size(struct vio *vio, char *data, int size, bio_end_io_t callback,
202 			    blk_opf_t bi_opf, physical_block_number_t pbn)
203 {
204 	int bvec_count, offset, i;
205 	struct bio *bio = vio->bio;
206 	int vio_size = vio->block_count * VDO_BLOCK_SIZE;
207 	int remaining;
208 
209 	bio_reset(bio, bio->bi_bdev, bi_opf);
210 	vdo_set_bio_properties(bio, vio, callback, bi_opf, pbn);
211 	if (data == NULL)
212 		return VDO_SUCCESS;
213 
214 	bio->bi_ioprio = 0;
215 	bio->bi_io_vec = bio->bi_inline_vecs;
216 	bio->bi_max_vecs = vio->block_count + 1;
217 	if (VDO_ASSERT(size <= vio_size, "specified size %d is not greater than allocated %d",
218 		       size, vio_size) != VDO_SUCCESS)
219 		size = vio_size;
220 	vio->io_size = size;
221 	offset = offset_in_page(data);
222 	bvec_count = DIV_ROUND_UP(offset + size, PAGE_SIZE);
223 	remaining = size;
224 
225 	for (i = 0; (i < bvec_count) && (remaining > 0); i++) {
226 		struct page *page;
227 		int bytes_added;
228 		int bytes = PAGE_SIZE - offset;
229 
230 		if (bytes > remaining)
231 			bytes = remaining;
232 
233 		page = is_vmalloc_addr(data) ? vmalloc_to_page(data) : virt_to_page(data);
234 		bytes_added = bio_add_page(bio, page, bytes, offset);
235 
236 		if (bytes_added != bytes) {
237 			return vdo_log_error_strerror(VDO_BIO_CREATION_FAILED,
238 						      "Could only add %i bytes to bio",
239 						      bytes_added);
240 		}
241 
242 		data += bytes;
243 		remaining -= bytes;
244 		offset = 0;
245 	}
246 
247 	return VDO_SUCCESS;
248 }
249 
250 /**
251  * update_vio_error_stats() - Update per-vio error stats and log the error.
252  * @vio: The vio which got an error.
253  * @format: The format of the message to log (a printf style format).
254  */
update_vio_error_stats(struct vio * vio,const char * format,...)255 void update_vio_error_stats(struct vio *vio, const char *format, ...)
256 {
257 	static DEFINE_RATELIMIT_STATE(error_limiter, DEFAULT_RATELIMIT_INTERVAL,
258 				      DEFAULT_RATELIMIT_BURST);
259 	va_list args;
260 	int priority;
261 	struct vdo *vdo = vio->completion.vdo;
262 
263 	switch (vio->completion.result) {
264 	case VDO_READ_ONLY:
265 		atomic64_inc(&vdo->stats.read_only_error_count);
266 		return;
267 
268 	case VDO_NO_SPACE:
269 		atomic64_inc(&vdo->stats.no_space_error_count);
270 		priority = VDO_LOG_DEBUG;
271 		break;
272 
273 	default:
274 		priority = VDO_LOG_ERR;
275 	}
276 
277 	if (!__ratelimit(&error_limiter))
278 		return;
279 
280 	va_start(args, format);
281 	vdo_vlog_strerror(priority, vio->completion.result, VDO_LOGGING_MODULE_NAME,
282 			  format, args);
283 	va_end(args);
284 }
285 
vio_record_metadata_io_error(struct vio * vio)286 void vio_record_metadata_io_error(struct vio *vio)
287 {
288 	const char *description;
289 	physical_block_number_t pbn = pbn_from_vio_bio(vio->bio);
290 
291 	if (bio_op(vio->bio) == REQ_OP_READ) {
292 		description = "read";
293 	} else if ((vio->bio->bi_opf & REQ_PREFLUSH) == REQ_PREFLUSH) {
294 		description = (((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) ?
295 			       "write+preflush+fua" :
296 			       "write+preflush");
297 	} else if ((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) {
298 		description = "write+fua";
299 	} else {
300 		description = "write";
301 	}
302 
303 	update_vio_error_stats(vio,
304 			       "Completing %s vio of type %u for physical block %llu with error",
305 			       description, vio->type, (unsigned long long) pbn);
306 }
307 
308 /**
309  * make_vio_pool() - Create a new vio pool.
310  * @vdo: The vdo.
311  * @pool_size: The number of vios in the pool.
312  * @block_count: The number of 4k blocks per vio.
313  * @thread_id: The ID of the thread using this pool.
314  * @vio_type: The type of vios in the pool.
315  * @priority: The priority with which vios from the pool should be enqueued.
316  * @context: The context that each entry will have.
317  * @pool_ptr: The resulting pool.
318  *
319  * Return: A success or error code.
320  */
make_vio_pool(struct vdo * vdo,size_t pool_size,size_t block_count,thread_id_t thread_id,enum vio_type vio_type,enum vio_priority priority,void * context,struct vio_pool ** pool_ptr)321 int make_vio_pool(struct vdo *vdo, size_t pool_size, size_t block_count, thread_id_t thread_id,
322 		  enum vio_type vio_type, enum vio_priority priority, void *context,
323 		  struct vio_pool **pool_ptr)
324 {
325 	struct vio_pool *pool;
326 	char *ptr;
327 	int result;
328 	size_t per_vio_size = VDO_BLOCK_SIZE * block_count;
329 
330 	result = vdo_allocate_extended(struct vio_pool, pool_size, struct pooled_vio,
331 				       __func__, &pool);
332 	if (result != VDO_SUCCESS)
333 		return result;
334 
335 	pool->thread_id = thread_id;
336 	INIT_LIST_HEAD(&pool->available);
337 	INIT_LIST_HEAD(&pool->busy);
338 
339 	result = vdo_allocate(pool_size * per_vio_size, char,
340 			      "VIO pool buffer", &pool->buffer);
341 	if (result != VDO_SUCCESS) {
342 		free_vio_pool(pool);
343 		return result;
344 	}
345 
346 	ptr = pool->buffer;
347 	for (pool->size = 0; pool->size < pool_size; pool->size++, ptr += per_vio_size) {
348 		struct pooled_vio *pooled = &pool->vios[pool->size];
349 
350 		result = allocate_vio_components(vdo, vio_type, priority, NULL, block_count, ptr,
351 						 &pooled->vio);
352 		if (result != VDO_SUCCESS) {
353 			free_vio_pool(pool);
354 			return result;
355 		}
356 
357 		pooled->context = context;
358 		pooled->pool = pool;
359 		list_add_tail(&pooled->pool_entry, &pool->available);
360 	}
361 
362 	*pool_ptr = pool;
363 	return VDO_SUCCESS;
364 }
365 
366 /**
367  * free_vio_pool() - Destroy a vio pool.
368  * @pool: The pool to free.
369  */
free_vio_pool(struct vio_pool * pool)370 void free_vio_pool(struct vio_pool *pool)
371 {
372 	struct pooled_vio *pooled, *tmp;
373 
374 	if (pool == NULL)
375 		return;
376 
377 	/* Remove all available vios from the object pool. */
378 	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&pool->waiting),
379 			    "VIO pool must not have any waiters when being freed");
380 	VDO_ASSERT_LOG_ONLY((pool->busy_count == 0),
381 			    "VIO pool must not have %zu busy entries when being freed",
382 			    pool->busy_count);
383 	VDO_ASSERT_LOG_ONLY(list_empty(&pool->busy),
384 			    "VIO pool must not have busy entries when being freed");
385 
386 	list_for_each_entry_safe(pooled, tmp, &pool->available, pool_entry) {
387 		list_del(&pooled->pool_entry);
388 		free_vio_components(&pooled->vio);
389 		pool->size--;
390 	}
391 
392 	VDO_ASSERT_LOG_ONLY(pool->size == 0,
393 			    "VIO pool must not have missing entries when being freed");
394 
395 	vdo_free(vdo_forget(pool->buffer));
396 	vdo_free(pool);
397 }
398 
399 /**
400  * is_vio_pool_busy() - Check whether an vio pool has outstanding entries.
401  *
402  * Return: true if the pool is busy.
403  */
is_vio_pool_busy(struct vio_pool * pool)404 bool is_vio_pool_busy(struct vio_pool *pool)
405 {
406 	return (pool->busy_count != 0);
407 }
408 
409 /**
410  * acquire_vio_from_pool() - Acquire a vio and buffer from the pool (asynchronous).
411  * @pool: The vio pool.
412  * @waiter: Object that is requesting a vio.
413  */
acquire_vio_from_pool(struct vio_pool * pool,struct vdo_waiter * waiter)414 void acquire_vio_from_pool(struct vio_pool *pool, struct vdo_waiter *waiter)
415 {
416 	struct pooled_vio *pooled;
417 
418 	VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
419 			    "acquire from active vio_pool called from correct thread");
420 
421 	if (list_empty(&pool->available)) {
422 		vdo_waitq_enqueue_waiter(&pool->waiting, waiter);
423 		return;
424 	}
425 
426 	pooled = list_first_entry(&pool->available, struct pooled_vio, pool_entry);
427 	pool->busy_count++;
428 	list_move_tail(&pooled->pool_entry, &pool->busy);
429 	(*waiter->callback)(waiter, pooled);
430 }
431 
432 /**
433  * return_vio_to_pool() - Return a vio to its pool
434  * @vio: The pooled vio to return.
435  */
return_vio_to_pool(struct pooled_vio * vio)436 void return_vio_to_pool(struct pooled_vio *vio)
437 {
438 	struct vio_pool *pool = vio->pool;
439 
440 	VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
441 			    "vio pool entry returned on same thread as it was acquired");
442 
443 	vio->vio.completion.error_handler = NULL;
444 	vio->vio.completion.parent = NULL;
445 	if (vdo_waitq_has_waiters(&pool->waiting)) {
446 		vdo_waitq_notify_next_waiter(&pool->waiting, NULL, vio);
447 		return;
448 	}
449 
450 	list_move_tail(&vio->pool_entry, &pool->available);
451 	--pool->busy_count;
452 }
453 
454 /*
455  * Various counting functions for statistics.
456  * These are used for bios coming into VDO, as well as bios generated by VDO.
457  */
vdo_count_bios(struct atomic_bio_stats * bio_stats,struct bio * bio)458 void vdo_count_bios(struct atomic_bio_stats *bio_stats, struct bio *bio)
459 {
460 	if (((bio->bi_opf & REQ_PREFLUSH) != 0) && (bio->bi_iter.bi_size == 0)) {
461 		atomic64_inc(&bio_stats->empty_flush);
462 		atomic64_inc(&bio_stats->flush);
463 		return;
464 	}
465 
466 	switch (bio_op(bio)) {
467 	case REQ_OP_WRITE:
468 		atomic64_inc(&bio_stats->write);
469 		break;
470 	case REQ_OP_READ:
471 		atomic64_inc(&bio_stats->read);
472 		break;
473 	case REQ_OP_DISCARD:
474 		atomic64_inc(&bio_stats->discard);
475 		break;
476 		/*
477 		 * All other operations are filtered out in dmvdo.c, or not created by VDO, so
478 		 * shouldn't exist.
479 		 */
480 	default:
481 		VDO_ASSERT_LOG_ONLY(0, "Bio operation %d not a write, read, discard, or empty flush",
482 				    bio_op(bio));
483 	}
484 
485 	if ((bio->bi_opf & REQ_PREFLUSH) != 0)
486 		atomic64_inc(&bio_stats->flush);
487 	if (bio->bi_opf & REQ_FUA)
488 		atomic64_inc(&bio_stats->fua);
489 }
490 
count_all_bios_completed(struct vio * vio,struct bio * bio)491 static void count_all_bios_completed(struct vio *vio, struct bio *bio)
492 {
493 	struct atomic_statistics *stats = &vio->completion.vdo->stats;
494 
495 	if (is_data_vio(vio)) {
496 		vdo_count_bios(&stats->bios_out_completed, bio);
497 		return;
498 	}
499 
500 	vdo_count_bios(&stats->bios_meta_completed, bio);
501 	if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
502 		vdo_count_bios(&stats->bios_journal_completed, bio);
503 	else if (vio->type == VIO_TYPE_BLOCK_MAP)
504 		vdo_count_bios(&stats->bios_page_cache_completed, bio);
505 }
506 
vdo_count_completed_bios(struct bio * bio)507 void vdo_count_completed_bios(struct bio *bio)
508 {
509 	struct vio *vio = (struct vio *) bio->bi_private;
510 
511 	atomic64_inc(&vio->completion.vdo->stats.bios_completed);
512 	count_all_bios_completed(vio, bio);
513 }
514