1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "vio.h" 7 8 #include <linux/bio.h> 9 #include <linux/blkdev.h> 10 #include <linux/kernel.h> 11 #include <linux/ratelimit.h> 12 13 #include "logger.h" 14 #include "memory-alloc.h" 15 #include "permassert.h" 16 17 #include "constants.h" 18 #include "io-submitter.h" 19 #include "vdo.h" 20 21 /* A vio_pool is a collection of preallocated vios. */ 22 struct vio_pool { 23 /* The number of objects managed by the pool */ 24 size_t size; 25 /* The list of objects which are available */ 26 struct list_head available; 27 /* The queue of requestors waiting for objects from the pool */ 28 struct vdo_wait_queue waiting; 29 /* The number of objects currently in use */ 30 size_t busy_count; 31 /* The list of objects which are in use */ 32 struct list_head busy; 33 /* The ID of the thread on which this pool may be used */ 34 thread_id_t thread_id; 35 /* The buffer backing the pool's vios */ 36 char *buffer; 37 /* The pool entries */ 38 struct pooled_vio vios[]; 39 }; 40 41 physical_block_number_t pbn_from_vio_bio(struct bio *bio) 42 { 43 struct vio *vio = bio->bi_private; 44 struct vdo *vdo = vio->completion.vdo; 45 physical_block_number_t pbn = bio->bi_iter.bi_sector / VDO_SECTORS_PER_BLOCK; 46 47 return ((pbn == VDO_GEOMETRY_BLOCK_LOCATION) ? pbn : pbn + vdo->geometry.bio_offset); 48 } 49 50 static int create_multi_block_bio(block_count_t size, struct bio **bio_ptr) 51 { 52 struct bio *bio = NULL; 53 int result; 54 55 result = vdo_allocate_extended(struct bio, size + 1, struct bio_vec, 56 "bio", &bio); 57 if (result != VDO_SUCCESS) 58 return result; 59 60 *bio_ptr = bio; 61 return VDO_SUCCESS; 62 } 63 64 int vdo_create_bio(struct bio **bio_ptr) 65 { 66 return create_multi_block_bio(1, bio_ptr); 67 } 68 69 void vdo_free_bio(struct bio *bio) 70 { 71 if (bio == NULL) 72 return; 73 74 bio_uninit(bio); 75 vdo_free(vdo_forget(bio)); 76 } 77 78 int allocate_vio_components(struct vdo *vdo, enum vio_type vio_type, 79 enum vio_priority priority, void *parent, 80 unsigned int block_count, char *data, struct vio *vio) 81 { 82 struct bio *bio; 83 int result; 84 85 result = VDO_ASSERT(block_count <= MAX_BLOCKS_PER_VIO, 86 "block count %u does not exceed maximum %u", block_count, 87 MAX_BLOCKS_PER_VIO); 88 if (result != VDO_SUCCESS) 89 return result; 90 91 result = VDO_ASSERT(((vio_type != VIO_TYPE_UNINITIALIZED) && (vio_type != VIO_TYPE_DATA)), 92 "%d is a metadata type", vio_type); 93 if (result != VDO_SUCCESS) 94 return result; 95 96 result = create_multi_block_bio(block_count, &bio); 97 if (result != VDO_SUCCESS) 98 return result; 99 100 initialize_vio(vio, bio, block_count, vio_type, priority, vdo); 101 vio->completion.parent = parent; 102 vio->data = data; 103 return VDO_SUCCESS; 104 } 105 106 /** 107 * create_multi_block_metadata_vio() - Create a vio. 108 * @vdo: The vdo on which the vio will operate. 109 * @vio_type: The type of vio to create. 110 * @priority: The relative priority to assign to the vio. 111 * @parent: The parent of the vio. 112 * @block_count: The size of the vio in blocks. 113 * @data: The buffer. 114 * @vio_ptr: A pointer to hold the new vio. 115 * 116 * Return: VDO_SUCCESS or an error. 117 */ 118 int create_multi_block_metadata_vio(struct vdo *vdo, enum vio_type vio_type, 119 enum vio_priority priority, void *parent, 120 unsigned int block_count, char *data, 121 struct vio **vio_ptr) 122 { 123 struct vio *vio; 124 int result; 125 126 BUILD_BUG_ON(sizeof(struct vio) > 256); 127 128 /* 129 * Metadata vios should use direct allocation and not use the buffer pool, which is 130 * reserved for submissions from the linux block layer. 131 */ 132 result = vdo_allocate(1, struct vio, __func__, &vio); 133 if (result != VDO_SUCCESS) { 134 vdo_log_error("metadata vio allocation failure %d", result); 135 return result; 136 } 137 138 result = allocate_vio_components(vdo, vio_type, priority, parent, block_count, 139 data, vio); 140 if (result != VDO_SUCCESS) { 141 vdo_free(vio); 142 return result; 143 } 144 145 *vio_ptr = vio; 146 return VDO_SUCCESS; 147 } 148 149 /** 150 * free_vio_components() - Free the components of a vio embedded in a larger structure. 151 * @vio: The vio to destroy 152 */ 153 void free_vio_components(struct vio *vio) 154 { 155 if (vio == NULL) 156 return; 157 158 BUG_ON(is_data_vio(vio)); 159 vdo_free_bio(vdo_forget(vio->bio)); 160 } 161 162 /** 163 * free_vio() - Destroy a vio. 164 * @vio: The vio to destroy. 165 */ 166 void free_vio(struct vio *vio) 167 { 168 free_vio_components(vio); 169 vdo_free(vio); 170 } 171 172 /* Set bio properties for a VDO read or write. */ 173 void vdo_set_bio_properties(struct bio *bio, struct vio *vio, bio_end_io_t callback, 174 blk_opf_t bi_opf, physical_block_number_t pbn) 175 { 176 struct vdo *vdo = vio->completion.vdo; 177 struct device_config *config = vdo->device_config; 178 179 pbn -= vdo->geometry.bio_offset; 180 vio->bio_zone = ((pbn / config->thread_counts.bio_rotation_interval) % 181 config->thread_counts.bio_threads); 182 183 bio->bi_private = vio; 184 bio->bi_end_io = callback; 185 bio->bi_opf = bi_opf; 186 bio->bi_iter.bi_sector = pbn * VDO_SECTORS_PER_BLOCK; 187 } 188 189 /* 190 * Prepares the bio to perform IO with the specified buffer. May only be used on a VDO-allocated 191 * bio, as it assumes the bio wraps a 4k-multiple buffer that is 4k aligned, but there does not 192 * have to be a vio associated with the bio. 193 */ 194 int vio_reset_bio(struct vio *vio, char *data, bio_end_io_t callback, 195 blk_opf_t bi_opf, physical_block_number_t pbn) 196 { 197 return vio_reset_bio_with_size(vio, data, vio->block_count * VDO_BLOCK_SIZE, 198 callback, bi_opf, pbn); 199 } 200 201 int vio_reset_bio_with_size(struct vio *vio, char *data, int size, bio_end_io_t callback, 202 blk_opf_t bi_opf, physical_block_number_t pbn) 203 { 204 int bvec_count, offset, i; 205 struct bio *bio = vio->bio; 206 int vio_size = vio->block_count * VDO_BLOCK_SIZE; 207 int remaining; 208 209 bio_reset(bio, bio->bi_bdev, bi_opf); 210 vdo_set_bio_properties(bio, vio, callback, bi_opf, pbn); 211 if (data == NULL) 212 return VDO_SUCCESS; 213 214 bio->bi_ioprio = 0; 215 bio->bi_io_vec = bio->bi_inline_vecs; 216 bio->bi_max_vecs = vio->block_count + 1; 217 if (VDO_ASSERT(size <= vio_size, "specified size %d is not greater than allocated %d", 218 size, vio_size) != VDO_SUCCESS) 219 size = vio_size; 220 vio->io_size = size; 221 offset = offset_in_page(data); 222 bvec_count = DIV_ROUND_UP(offset + size, PAGE_SIZE); 223 remaining = size; 224 225 for (i = 0; (i < bvec_count) && (remaining > 0); i++) { 226 struct page *page; 227 int bytes_added; 228 int bytes = PAGE_SIZE - offset; 229 230 if (bytes > remaining) 231 bytes = remaining; 232 233 page = is_vmalloc_addr(data) ? vmalloc_to_page(data) : virt_to_page(data); 234 bytes_added = bio_add_page(bio, page, bytes, offset); 235 236 if (bytes_added != bytes) { 237 return vdo_log_error_strerror(VDO_BIO_CREATION_FAILED, 238 "Could only add %i bytes to bio", 239 bytes_added); 240 } 241 242 data += bytes; 243 remaining -= bytes; 244 offset = 0; 245 } 246 247 return VDO_SUCCESS; 248 } 249 250 /** 251 * update_vio_error_stats() - Update per-vio error stats and log the error. 252 * @vio: The vio which got an error. 253 * @format: The format of the message to log (a printf style format). 254 */ 255 void update_vio_error_stats(struct vio *vio, const char *format, ...) 256 { 257 static DEFINE_RATELIMIT_STATE(error_limiter, DEFAULT_RATELIMIT_INTERVAL, 258 DEFAULT_RATELIMIT_BURST); 259 va_list args; 260 int priority; 261 struct vdo *vdo = vio->completion.vdo; 262 263 switch (vio->completion.result) { 264 case VDO_READ_ONLY: 265 atomic64_inc(&vdo->stats.read_only_error_count); 266 return; 267 268 case VDO_NO_SPACE: 269 atomic64_inc(&vdo->stats.no_space_error_count); 270 priority = VDO_LOG_DEBUG; 271 break; 272 273 default: 274 priority = VDO_LOG_ERR; 275 } 276 277 if (!__ratelimit(&error_limiter)) 278 return; 279 280 va_start(args, format); 281 vdo_vlog_strerror(priority, vio->completion.result, VDO_LOGGING_MODULE_NAME, 282 format, args); 283 va_end(args); 284 } 285 286 void vio_record_metadata_io_error(struct vio *vio) 287 { 288 const char *description; 289 physical_block_number_t pbn = pbn_from_vio_bio(vio->bio); 290 291 if (bio_op(vio->bio) == REQ_OP_READ) { 292 description = "read"; 293 } else if ((vio->bio->bi_opf & REQ_PREFLUSH) == REQ_PREFLUSH) { 294 description = (((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) ? 295 "write+preflush+fua" : 296 "write+preflush"); 297 } else if ((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) { 298 description = "write+fua"; 299 } else { 300 description = "write"; 301 } 302 303 update_vio_error_stats(vio, 304 "Completing %s vio of type %u for physical block %llu with error", 305 description, vio->type, (unsigned long long) pbn); 306 } 307 308 /** 309 * make_vio_pool() - Create a new vio pool. 310 * @vdo: The vdo. 311 * @pool_size: The number of vios in the pool. 312 * @block_count: The number of 4k blocks per vio. 313 * @thread_id: The ID of the thread using this pool. 314 * @vio_type: The type of vios in the pool. 315 * @priority: The priority with which vios from the pool should be enqueued. 316 * @context: The context that each entry will have. 317 * @pool_ptr: The resulting pool. 318 * 319 * Return: A success or error code. 320 */ 321 int make_vio_pool(struct vdo *vdo, size_t pool_size, size_t block_count, thread_id_t thread_id, 322 enum vio_type vio_type, enum vio_priority priority, void *context, 323 struct vio_pool **pool_ptr) 324 { 325 struct vio_pool *pool; 326 char *ptr; 327 int result; 328 size_t per_vio_size = VDO_BLOCK_SIZE * block_count; 329 330 result = vdo_allocate_extended(struct vio_pool, pool_size, struct pooled_vio, 331 __func__, &pool); 332 if (result != VDO_SUCCESS) 333 return result; 334 335 pool->thread_id = thread_id; 336 INIT_LIST_HEAD(&pool->available); 337 INIT_LIST_HEAD(&pool->busy); 338 339 result = vdo_allocate(pool_size * per_vio_size, char, 340 "VIO pool buffer", &pool->buffer); 341 if (result != VDO_SUCCESS) { 342 free_vio_pool(pool); 343 return result; 344 } 345 346 ptr = pool->buffer; 347 for (pool->size = 0; pool->size < pool_size; pool->size++, ptr += per_vio_size) { 348 struct pooled_vio *pooled = &pool->vios[pool->size]; 349 350 result = allocate_vio_components(vdo, vio_type, priority, NULL, block_count, ptr, 351 &pooled->vio); 352 if (result != VDO_SUCCESS) { 353 free_vio_pool(pool); 354 return result; 355 } 356 357 pooled->context = context; 358 pooled->pool = pool; 359 list_add_tail(&pooled->pool_entry, &pool->available); 360 } 361 362 *pool_ptr = pool; 363 return VDO_SUCCESS; 364 } 365 366 /** 367 * free_vio_pool() - Destroy a vio pool. 368 * @pool: The pool to free. 369 */ 370 void free_vio_pool(struct vio_pool *pool) 371 { 372 struct pooled_vio *pooled, *tmp; 373 374 if (pool == NULL) 375 return; 376 377 /* Remove all available vios from the object pool. */ 378 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&pool->waiting), 379 "VIO pool must not have any waiters when being freed"); 380 VDO_ASSERT_LOG_ONLY((pool->busy_count == 0), 381 "VIO pool must not have %zu busy entries when being freed", 382 pool->busy_count); 383 VDO_ASSERT_LOG_ONLY(list_empty(&pool->busy), 384 "VIO pool must not have busy entries when being freed"); 385 386 list_for_each_entry_safe(pooled, tmp, &pool->available, pool_entry) { 387 list_del(&pooled->pool_entry); 388 free_vio_components(&pooled->vio); 389 pool->size--; 390 } 391 392 VDO_ASSERT_LOG_ONLY(pool->size == 0, 393 "VIO pool must not have missing entries when being freed"); 394 395 vdo_free(vdo_forget(pool->buffer)); 396 vdo_free(pool); 397 } 398 399 /** 400 * is_vio_pool_busy() - Check whether an vio pool has outstanding entries. 401 * 402 * Return: true if the pool is busy. 403 */ 404 bool is_vio_pool_busy(struct vio_pool *pool) 405 { 406 return (pool->busy_count != 0); 407 } 408 409 /** 410 * acquire_vio_from_pool() - Acquire a vio and buffer from the pool (asynchronous). 411 * @pool: The vio pool. 412 * @waiter: Object that is requesting a vio. 413 */ 414 void acquire_vio_from_pool(struct vio_pool *pool, struct vdo_waiter *waiter) 415 { 416 struct pooled_vio *pooled; 417 418 VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()), 419 "acquire from active vio_pool called from correct thread"); 420 421 if (list_empty(&pool->available)) { 422 vdo_waitq_enqueue_waiter(&pool->waiting, waiter); 423 return; 424 } 425 426 pooled = list_first_entry(&pool->available, struct pooled_vio, pool_entry); 427 pool->busy_count++; 428 list_move_tail(&pooled->pool_entry, &pool->busy); 429 (*waiter->callback)(waiter, pooled); 430 } 431 432 /** 433 * return_vio_to_pool() - Return a vio to its pool 434 * @vio: The pooled vio to return. 435 */ 436 void return_vio_to_pool(struct pooled_vio *vio) 437 { 438 struct vio_pool *pool = vio->pool; 439 440 VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()), 441 "vio pool entry returned on same thread as it was acquired"); 442 443 vio->vio.completion.error_handler = NULL; 444 vio->vio.completion.parent = NULL; 445 if (vdo_waitq_has_waiters(&pool->waiting)) { 446 vdo_waitq_notify_next_waiter(&pool->waiting, NULL, vio); 447 return; 448 } 449 450 list_move_tail(&vio->pool_entry, &pool->available); 451 --pool->busy_count; 452 } 453 454 /* 455 * Various counting functions for statistics. 456 * These are used for bios coming into VDO, as well as bios generated by VDO. 457 */ 458 void vdo_count_bios(struct atomic_bio_stats *bio_stats, struct bio *bio) 459 { 460 if (((bio->bi_opf & REQ_PREFLUSH) != 0) && (bio->bi_iter.bi_size == 0)) { 461 atomic64_inc(&bio_stats->empty_flush); 462 atomic64_inc(&bio_stats->flush); 463 return; 464 } 465 466 switch (bio_op(bio)) { 467 case REQ_OP_WRITE: 468 atomic64_inc(&bio_stats->write); 469 break; 470 case REQ_OP_READ: 471 atomic64_inc(&bio_stats->read); 472 break; 473 case REQ_OP_DISCARD: 474 atomic64_inc(&bio_stats->discard); 475 break; 476 /* 477 * All other operations are filtered out in dmvdo.c, or not created by VDO, so 478 * shouldn't exist. 479 */ 480 default: 481 VDO_ASSERT_LOG_ONLY(0, "Bio operation %d not a write, read, discard, or empty flush", 482 bio_op(bio)); 483 } 484 485 if ((bio->bi_opf & REQ_PREFLUSH) != 0) 486 atomic64_inc(&bio_stats->flush); 487 if (bio->bi_opf & REQ_FUA) 488 atomic64_inc(&bio_stats->fua); 489 } 490 491 static void count_all_bios_completed(struct vio *vio, struct bio *bio) 492 { 493 struct atomic_statistics *stats = &vio->completion.vdo->stats; 494 495 if (is_data_vio(vio)) { 496 vdo_count_bios(&stats->bios_out_completed, bio); 497 return; 498 } 499 500 vdo_count_bios(&stats->bios_meta_completed, bio); 501 if (vio->type == VIO_TYPE_RECOVERY_JOURNAL) 502 vdo_count_bios(&stats->bios_journal_completed, bio); 503 else if (vio->type == VIO_TYPE_BLOCK_MAP) 504 vdo_count_bios(&stats->bios_page_cache_completed, bio); 505 } 506 507 void vdo_count_completed_bios(struct bio *bio) 508 { 509 struct vio *vio = (struct vio *) bio->bi_private; 510 511 atomic64_inc(&vio->completion.vdo->stats.bios_completed); 512 count_all_bios_completed(vio, bio); 513 } 514