1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 /* 7 * This file contains the main entry points for normal operations on a vdo as well as functions for 8 * constructing and destroying vdo instances (in memory). 9 */ 10 11 /** 12 * DOC: 13 * 14 * A read_only_notifier has a single completion which is used to perform read-only notifications, 15 * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected 16 * by a spinlock, are used to control the read-only mode entry process. The first field holds the 17 * read-only error. The second is the state field, which may hold any of the four special values 18 * enumerated here. 19 * 20 * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field 21 * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already 22 * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in 23 * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is 24 * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then 25 * notifications are currently disallowed, generally due to the vdo being suspended. In this case, 26 * the nothing more will be done until the vdo is resumed, at which point the notification will be 27 * performed. In any other case, the vdo is already read-only, and there is nothing more to do. 28 */ 29 30 #include "vdo.h" 31 32 #include <linux/completion.h> 33 #include <linux/device-mapper.h> 34 #include <linux/kernel.h> 35 #include <linux/lz4.h> 36 #include <linux/module.h> 37 #include <linux/mutex.h> 38 #include <linux/spinlock.h> 39 #include <linux/types.h> 40 41 #include "logger.h" 42 #include "memory-alloc.h" 43 #include "permassert.h" 44 #include "string-utils.h" 45 46 #include "block-map.h" 47 #include "completion.h" 48 #include "data-vio.h" 49 #include "dedupe.h" 50 #include "encodings.h" 51 #include "funnel-workqueue.h" 52 #include "io-submitter.h" 53 #include "logical-zone.h" 54 #include "packer.h" 55 #include "physical-zone.h" 56 #include "recovery-journal.h" 57 #include "slab-depot.h" 58 #include "statistics.h" 59 #include "status-codes.h" 60 #include "vio.h" 61 62 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0 63 64 struct sync_completion { 65 struct vdo_completion vdo_completion; 66 struct completion completion; 67 }; 68 69 /* A linked list is adequate for the small number of entries we expect. */ 70 struct device_registry { 71 struct list_head links; 72 /* TODO: Convert to rcu per kernel recommendation. */ 73 rwlock_t lock; 74 }; 75 76 static struct device_registry registry; 77 78 /** 79 * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device 80 * registry. 81 */ 82 void vdo_initialize_device_registry_once(void) 83 { 84 INIT_LIST_HEAD(®istry.links); 85 rwlock_init(®istry.lock); 86 } 87 88 /** vdo_is_equal() - Implements vdo_filter_fn. */ 89 static bool vdo_is_equal(struct vdo *vdo, const void *context) 90 { 91 return (vdo == context); 92 } 93 94 /** 95 * filter_vdos_locked() - Find a vdo in the registry if it exists there. 96 * @filter: The filter function to apply to devices. 97 * @context: A bit of context to provide the filter. 98 * 99 * Context: Must be called holding the lock. 100 * 101 * Return: the vdo object found, if any. 102 */ 103 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter, 104 const void *context) 105 { 106 struct vdo *vdo; 107 108 list_for_each_entry(vdo, ®istry.links, registration) { 109 if (filter(vdo, context)) 110 return vdo; 111 } 112 113 return NULL; 114 } 115 116 /** 117 * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function. 118 * @filter: The filter function to apply to vdos. 119 * @context: A bit of context to provide the filter. 120 */ 121 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context) 122 { 123 struct vdo *vdo; 124 125 read_lock(®istry.lock); 126 vdo = filter_vdos_locked(filter, context); 127 read_unlock(®istry.lock); 128 129 return vdo; 130 } 131 132 static void start_vdo_request_queue(void *ptr) 133 { 134 struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue()); 135 136 vdo_register_allocating_thread(&thread->allocating_thread, 137 &thread->vdo->allocations_allowed); 138 } 139 140 static void finish_vdo_request_queue(void *ptr) 141 { 142 vdo_unregister_allocating_thread(); 143 } 144 145 #ifdef MODULE 146 #define MODULE_NAME THIS_MODULE->name 147 #else 148 #define MODULE_NAME "dm-vdo" 149 #endif /* MODULE */ 150 151 static const struct vdo_work_queue_type default_queue_type = { 152 .start = start_vdo_request_queue, 153 .finish = finish_vdo_request_queue, 154 .max_priority = VDO_DEFAULT_Q_MAX_PRIORITY, 155 .default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY, 156 }; 157 158 static const struct vdo_work_queue_type bio_ack_q_type = { 159 .start = NULL, 160 .finish = NULL, 161 .max_priority = BIO_ACK_Q_MAX_PRIORITY, 162 .default_priority = BIO_ACK_Q_ACK_PRIORITY, 163 }; 164 165 static const struct vdo_work_queue_type cpu_q_type = { 166 .start = NULL, 167 .finish = NULL, 168 .max_priority = CPU_Q_MAX_PRIORITY, 169 .default_priority = CPU_Q_MAX_PRIORITY, 170 }; 171 172 static void uninitialize_thread_config(struct thread_config *config) 173 { 174 vdo_free(vdo_forget(config->logical_threads)); 175 vdo_free(vdo_forget(config->physical_threads)); 176 vdo_free(vdo_forget(config->hash_zone_threads)); 177 vdo_free(vdo_forget(config->bio_threads)); 178 memset(config, 0, sizeof(struct thread_config)); 179 } 180 181 static void assign_thread_ids(struct thread_config *config, 182 thread_id_t thread_ids[], zone_count_t count) 183 { 184 zone_count_t zone; 185 186 for (zone = 0; zone < count; zone++) 187 thread_ids[zone] = config->thread_count++; 188 } 189 190 /** 191 * initialize_thread_config() - Initialize the thread mapping 192 * 193 * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all 194 * three plus the packer and recovery journal. Otherwise, there must be at least one of each type, 195 * and each will have its own thread, as will the packer and recovery journal. 196 * 197 * Return: VDO_SUCCESS or an error. 198 */ 199 static int __must_check initialize_thread_config(struct thread_count_config counts, 200 struct thread_config *config) 201 { 202 int result; 203 bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0); 204 205 config->bio_thread_count = counts.bio_threads; 206 if (single) { 207 config->logical_zone_count = 1; 208 config->physical_zone_count = 1; 209 config->hash_zone_count = 1; 210 } else { 211 config->logical_zone_count = counts.logical_zones; 212 config->physical_zone_count = counts.physical_zones; 213 config->hash_zone_count = counts.hash_zones; 214 } 215 216 result = vdo_allocate(config->logical_zone_count, thread_id_t, 217 "logical thread array", &config->logical_threads); 218 if (result != VDO_SUCCESS) { 219 uninitialize_thread_config(config); 220 return result; 221 } 222 223 result = vdo_allocate(config->physical_zone_count, thread_id_t, 224 "physical thread array", &config->physical_threads); 225 if (result != VDO_SUCCESS) { 226 uninitialize_thread_config(config); 227 return result; 228 } 229 230 result = vdo_allocate(config->hash_zone_count, thread_id_t, 231 "hash thread array", &config->hash_zone_threads); 232 if (result != VDO_SUCCESS) { 233 uninitialize_thread_config(config); 234 return result; 235 } 236 237 result = vdo_allocate(config->bio_thread_count, thread_id_t, 238 "bio thread array", &config->bio_threads); 239 if (result != VDO_SUCCESS) { 240 uninitialize_thread_config(config); 241 return result; 242 } 243 244 if (single) { 245 config->logical_threads[0] = config->thread_count; 246 config->physical_threads[0] = config->thread_count; 247 config->hash_zone_threads[0] = config->thread_count++; 248 } else { 249 config->admin_thread = config->thread_count; 250 config->journal_thread = config->thread_count++; 251 config->packer_thread = config->thread_count++; 252 assign_thread_ids(config, config->logical_threads, counts.logical_zones); 253 assign_thread_ids(config, config->physical_threads, counts.physical_zones); 254 assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones); 255 } 256 257 config->dedupe_thread = config->thread_count++; 258 config->bio_ack_thread = 259 ((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID); 260 config->cpu_thread = config->thread_count++; 261 assign_thread_ids(config, config->bio_threads, counts.bio_threads); 262 return VDO_SUCCESS; 263 } 264 265 /** 266 * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block 267 * device. 268 * @vdo: The vdo whose geometry is to be read. 269 * 270 * Return: VDO_SUCCESS or an error code. 271 */ 272 static int __must_check read_geometry_block(struct vdo *vdo) 273 { 274 struct vio *vio; 275 char *block; 276 int result; 277 278 result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block); 279 if (result != VDO_SUCCESS) 280 return result; 281 282 result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL, 283 block, &vio); 284 if (result != VDO_SUCCESS) { 285 vdo_free(block); 286 return result; 287 } 288 289 /* 290 * This is only safe because, having not already loaded the geometry, the vdo's geometry's 291 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from 292 * the supplied pbn is not a problem. 293 */ 294 result = vio_reset_bio(vio, block, NULL, REQ_OP_READ, 295 VDO_GEOMETRY_BLOCK_LOCATION); 296 if (result != VDO_SUCCESS) { 297 free_vio(vdo_forget(vio)); 298 vdo_free(block); 299 return result; 300 } 301 302 bio_set_dev(vio->bio, vdo_get_backing_device(vdo)); 303 submit_bio_wait(vio->bio); 304 result = blk_status_to_errno(vio->bio->bi_status); 305 free_vio(vdo_forget(vio)); 306 if (result != 0) { 307 uds_log_error_strerror(result, "synchronous read failed"); 308 vdo_free(block); 309 return -EIO; 310 } 311 312 result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry); 313 vdo_free(block); 314 return result; 315 } 316 317 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count, 318 thread_id_t id, const char *prefix, 319 char *buffer, size_t buffer_length) 320 { 321 if (id >= thread_ids[0]) { 322 thread_id_t index = id - thread_ids[0]; 323 324 if (index < count) { 325 snprintf(buffer, buffer_length, "%s%d", prefix, index); 326 return true; 327 } 328 } 329 330 return false; 331 } 332 333 /** 334 * get_thread_name() - Format the name of the worker thread desired to support a given work queue. 335 * @thread_config: The thread configuration. 336 * @thread_id: The thread id. 337 * @buffer: Where to put the formatted name. 338 * @buffer_length: Size of the output buffer. 339 * 340 * The physical layer may add a prefix identifying the product; the output from this function 341 * should just identify the thread. 342 */ 343 static void get_thread_name(const struct thread_config *thread_config, 344 thread_id_t thread_id, char *buffer, size_t buffer_length) 345 { 346 if (thread_id == thread_config->journal_thread) { 347 if (thread_config->packer_thread == thread_id) { 348 /* 349 * This is the "single thread" config where one thread is used for the 350 * journal, packer, logical, physical, and hash zones. In that case, it is 351 * known as the "request queue." 352 */ 353 snprintf(buffer, buffer_length, "reqQ"); 354 return; 355 } 356 357 snprintf(buffer, buffer_length, "journalQ"); 358 return; 359 } else if (thread_id == thread_config->admin_thread) { 360 /* Theoretically this could be different from the journal thread. */ 361 snprintf(buffer, buffer_length, "adminQ"); 362 return; 363 } else if (thread_id == thread_config->packer_thread) { 364 snprintf(buffer, buffer_length, "packerQ"); 365 return; 366 } else if (thread_id == thread_config->dedupe_thread) { 367 snprintf(buffer, buffer_length, "dedupeQ"); 368 return; 369 } else if (thread_id == thread_config->bio_ack_thread) { 370 snprintf(buffer, buffer_length, "ackQ"); 371 return; 372 } else if (thread_id == thread_config->cpu_thread) { 373 snprintf(buffer, buffer_length, "cpuQ"); 374 return; 375 } 376 377 if (get_zone_thread_name(thread_config->logical_threads, 378 thread_config->logical_zone_count, 379 thread_id, "logQ", buffer, buffer_length)) 380 return; 381 382 if (get_zone_thread_name(thread_config->physical_threads, 383 thread_config->physical_zone_count, 384 thread_id, "physQ", buffer, buffer_length)) 385 return; 386 387 if (get_zone_thread_name(thread_config->hash_zone_threads, 388 thread_config->hash_zone_count, 389 thread_id, "hashQ", buffer, buffer_length)) 390 return; 391 392 if (get_zone_thread_name(thread_config->bio_threads, 393 thread_config->bio_thread_count, 394 thread_id, "bioQ", buffer, buffer_length)) 395 return; 396 397 /* Some sort of misconfiguration? */ 398 snprintf(buffer, buffer_length, "reqQ%d", thread_id); 399 } 400 401 /** 402 * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for 403 * round-robin queues). 404 * @vdo: The vdo which owns the thread. 405 * @thread_id: The id of the thread to create (as determined by the thread_config). 406 * @type: The description of the work queue for this thread. 407 * @queue_count: The number of actual threads/queues contained in the "thread". 408 * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL. 409 * 410 * Each "thread" constructed by this method is represented by a unique thread id in the thread 411 * config, and completions can be enqueued to the queue and run on the threads comprising this 412 * entity. 413 * 414 * Return: VDO_SUCCESS or an error. 415 */ 416 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id, 417 const struct vdo_work_queue_type *type, 418 unsigned int queue_count, void *contexts[]) 419 { 420 struct vdo_thread *thread = &vdo->threads[thread_id]; 421 char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN]; 422 423 if (type == NULL) 424 type = &default_queue_type; 425 426 if (thread->queue != NULL) { 427 return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type), 428 "already constructed vdo thread %u is of the correct type", 429 thread_id); 430 } 431 432 thread->vdo = vdo; 433 thread->thread_id = thread_id; 434 get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name)); 435 return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread, 436 type, queue_count, contexts, &thread->queue); 437 } 438 439 /** 440 * register_vdo() - Register a VDO; it must not already be registered. 441 * @vdo: The vdo to register. 442 * 443 * Return: VDO_SUCCESS or an error. 444 */ 445 static int register_vdo(struct vdo *vdo) 446 { 447 int result; 448 449 write_lock(®istry.lock); 450 result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL, 451 "VDO not already registered"); 452 if (result == VDO_SUCCESS) { 453 INIT_LIST_HEAD(&vdo->registration); 454 list_add_tail(&vdo->registration, ®istry.links); 455 } 456 write_unlock(®istry.lock); 457 458 return result; 459 } 460 461 /** 462 * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on 463 * error. 464 * @vdo: The vdo being initialized 465 * @config: The configuration of the vdo 466 * @instance: The instance number of the vdo 467 * @reason: The buffer to hold the failure reason on error 468 */ 469 static int initialize_vdo(struct vdo *vdo, struct device_config *config, 470 unsigned int instance, char **reason) 471 { 472 int result; 473 zone_count_t i; 474 475 vdo->device_config = config; 476 vdo->starting_sector_offset = config->owning_target->begin; 477 vdo->instance = instance; 478 vdo->allocations_allowed = true; 479 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW); 480 INIT_LIST_HEAD(&vdo->device_config_list); 481 vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION); 482 init_completion(&vdo->admin.callback_sync); 483 mutex_init(&vdo->stats_mutex); 484 result = read_geometry_block(vdo); 485 if (result != VDO_SUCCESS) { 486 *reason = "Could not load geometry block"; 487 return result; 488 } 489 490 result = initialize_thread_config(config->thread_counts, &vdo->thread_config); 491 if (result != VDO_SUCCESS) { 492 *reason = "Cannot create thread configuration"; 493 return result; 494 } 495 496 uds_log_info("zones: %d logical, %d physical, %d hash; total threads: %d", 497 config->thread_counts.logical_zones, 498 config->thread_counts.physical_zones, 499 config->thread_counts.hash_zones, vdo->thread_config.thread_count); 500 501 /* Compression context storage */ 502 result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context", 503 &vdo->compression_context); 504 if (result != VDO_SUCCESS) { 505 *reason = "cannot allocate LZ4 context"; 506 return result; 507 } 508 509 for (i = 0; i < config->thread_counts.cpu_threads; i++) { 510 result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context", 511 &vdo->compression_context[i]); 512 if (result != VDO_SUCCESS) { 513 *reason = "cannot allocate LZ4 context"; 514 return result; 515 } 516 } 517 518 result = register_vdo(vdo); 519 if (result != VDO_SUCCESS) { 520 *reason = "Cannot add VDO to device registry"; 521 return result; 522 } 523 524 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED); 525 return result; 526 } 527 528 /** 529 * vdo_make() - Allocate and initialize a vdo. 530 * @instance: Device instantiation counter. 531 * @config: The device configuration. 532 * @reason: The reason for any failure during this call. 533 * @vdo_ptr: A pointer to hold the created vdo. 534 * 535 * Return: VDO_SUCCESS or an error. 536 */ 537 int vdo_make(unsigned int instance, struct device_config *config, char **reason, 538 struct vdo **vdo_ptr) 539 { 540 int result; 541 struct vdo *vdo; 542 543 /* Initialize with a generic failure reason to prevent returning garbage. */ 544 *reason = "Unspecified error"; 545 546 result = vdo_allocate(1, struct vdo, __func__, &vdo); 547 if (result != VDO_SUCCESS) { 548 *reason = "Cannot allocate VDO"; 549 return result; 550 } 551 552 result = initialize_vdo(vdo, config, instance, reason); 553 if (result != VDO_SUCCESS) { 554 vdo_destroy(vdo); 555 return result; 556 } 557 558 /* From here on, the caller will clean up if there is an error. */ 559 *vdo_ptr = vdo; 560 561 snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix), 562 "%s%u", MODULE_NAME, instance); 563 BUG_ON(vdo->thread_name_prefix[0] == '\0'); 564 result = vdo_allocate(vdo->thread_config.thread_count, 565 struct vdo_thread, __func__, &vdo->threads); 566 if (result != VDO_SUCCESS) { 567 *reason = "Cannot allocate thread structures"; 568 return result; 569 } 570 571 result = vdo_make_thread(vdo, vdo->thread_config.admin_thread, 572 &default_queue_type, 1, NULL); 573 if (result != VDO_SUCCESS) { 574 *reason = "Cannot make admin thread"; 575 return result; 576 } 577 578 result = vdo_make_flusher(vdo); 579 if (result != VDO_SUCCESS) { 580 *reason = "Cannot make flusher zones"; 581 return result; 582 } 583 584 result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer); 585 if (result != VDO_SUCCESS) { 586 *reason = "Cannot make packer zones"; 587 return result; 588 } 589 590 BUG_ON(vdo->device_config->logical_block_size <= 0); 591 BUG_ON(vdo->device_config->owned_device == NULL); 592 result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS, 593 MAXIMUM_VDO_USER_VIOS * 3 / 4, 594 &vdo->data_vio_pool); 595 if (result != VDO_SUCCESS) { 596 *reason = "Cannot allocate data_vio pool"; 597 return result; 598 } 599 600 result = vdo_make_io_submitter(config->thread_counts.bio_threads, 601 config->thread_counts.bio_rotation_interval, 602 get_data_vio_pool_request_limit(vdo->data_vio_pool), 603 vdo, &vdo->io_submitter); 604 if (result != VDO_SUCCESS) { 605 *reason = "bio submission initialization failed"; 606 return result; 607 } 608 609 if (vdo_uses_bio_ack_queue(vdo)) { 610 result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread, 611 &bio_ack_q_type, 612 config->thread_counts.bio_ack_threads, NULL); 613 if (result != VDO_SUCCESS) { 614 *reason = "bio ack queue initialization failed"; 615 return result; 616 } 617 } 618 619 result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type, 620 config->thread_counts.cpu_threads, 621 (void **) vdo->compression_context); 622 if (result != VDO_SUCCESS) { 623 *reason = "CPU queue initialization failed"; 624 return result; 625 } 626 627 return VDO_SUCCESS; 628 } 629 630 static void finish_vdo(struct vdo *vdo) 631 { 632 int i; 633 634 if (vdo->threads == NULL) 635 return; 636 637 vdo_cleanup_io_submitter(vdo->io_submitter); 638 vdo_finish_dedupe_index(vdo->hash_zones); 639 640 for (i = 0; i < vdo->thread_config.thread_count; i++) 641 vdo_finish_work_queue(vdo->threads[i].queue); 642 } 643 644 /** 645 * free_listeners() - Free the list of read-only listeners associated with a thread. 646 * @thread_data: The thread holding the list to free. 647 */ 648 static void free_listeners(struct vdo_thread *thread) 649 { 650 struct read_only_listener *listener, *next; 651 652 for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) { 653 next = vdo_forget(listener->next); 654 vdo_free(listener); 655 } 656 } 657 658 static void uninitialize_super_block(struct vdo_super_block *super_block) 659 { 660 free_vio_components(&super_block->vio); 661 vdo_free(super_block->buffer); 662 } 663 664 /** 665 * unregister_vdo() - Remove a vdo from the device registry. 666 * @vdo: The vdo to remove. 667 */ 668 static void unregister_vdo(struct vdo *vdo) 669 { 670 write_lock(®istry.lock); 671 if (filter_vdos_locked(vdo_is_equal, vdo) == vdo) 672 list_del_init(&vdo->registration); 673 674 write_unlock(®istry.lock); 675 } 676 677 /** 678 * vdo_destroy() - Destroy a vdo instance. 679 * @vdo: The vdo to destroy (may be NULL). 680 */ 681 void vdo_destroy(struct vdo *vdo) 682 { 683 unsigned int i; 684 685 if (vdo == NULL) 686 return; 687 688 /* A running VDO should never be destroyed without suspending first. */ 689 BUG_ON(vdo_get_admin_state(vdo)->normal); 690 691 vdo->allocations_allowed = true; 692 693 finish_vdo(vdo); 694 unregister_vdo(vdo); 695 free_data_vio_pool(vdo->data_vio_pool); 696 vdo_free_io_submitter(vdo_forget(vdo->io_submitter)); 697 vdo_free_flusher(vdo_forget(vdo->flusher)); 698 vdo_free_packer(vdo_forget(vdo->packer)); 699 vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal)); 700 vdo_free_slab_depot(vdo_forget(vdo->depot)); 701 vdo_uninitialize_layout(&vdo->layout); 702 vdo_uninitialize_layout(&vdo->next_layout); 703 if (vdo->partition_copier) 704 dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier)); 705 uninitialize_super_block(&vdo->super_block); 706 vdo_free_block_map(vdo_forget(vdo->block_map)); 707 vdo_free_hash_zones(vdo_forget(vdo->hash_zones)); 708 vdo_free_physical_zones(vdo_forget(vdo->physical_zones)); 709 vdo_free_logical_zones(vdo_forget(vdo->logical_zones)); 710 711 if (vdo->threads != NULL) { 712 for (i = 0; i < vdo->thread_config.thread_count; i++) { 713 free_listeners(&vdo->threads[i]); 714 vdo_free_work_queue(vdo_forget(vdo->threads[i].queue)); 715 } 716 vdo_free(vdo_forget(vdo->threads)); 717 } 718 719 uninitialize_thread_config(&vdo->thread_config); 720 721 if (vdo->compression_context != NULL) { 722 for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++) 723 vdo_free(vdo_forget(vdo->compression_context[i])); 724 725 vdo_free(vdo_forget(vdo->compression_context)); 726 } 727 vdo_free(vdo); 728 } 729 730 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block) 731 { 732 int result; 733 734 result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block", 735 (char **) &vdo->super_block.buffer); 736 if (result != VDO_SUCCESS) 737 return result; 738 739 return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK, 740 VIO_PRIORITY_METADATA, NULL, 1, 741 (char *) super_block->buffer, 742 &vdo->super_block.vio); 743 } 744 745 /** 746 * finish_reading_super_block() - Continue after loading the super block. 747 * @completion: The super block vio. 748 * 749 * This callback is registered in vdo_load_super_block(). 750 */ 751 static void finish_reading_super_block(struct vdo_completion *completion) 752 { 753 struct vdo_super_block *super_block = 754 container_of(as_vio(completion), struct vdo_super_block, vio); 755 756 vdo_continue_completion(vdo_forget(completion->parent), 757 vdo_decode_super_block(super_block->buffer)); 758 } 759 760 /** 761 * handle_super_block_read_error() - Handle an error reading the super block. 762 * @completion: The super block vio. 763 * 764 * This error handler is registered in vdo_load_super_block(). 765 */ 766 static void handle_super_block_read_error(struct vdo_completion *completion) 767 { 768 vio_record_metadata_io_error(as_vio(completion)); 769 finish_reading_super_block(completion); 770 } 771 772 static void read_super_block_endio(struct bio *bio) 773 { 774 struct vio *vio = bio->bi_private; 775 struct vdo_completion *parent = vio->completion.parent; 776 777 continue_vio_after_io(vio, finish_reading_super_block, 778 parent->callback_thread_id); 779 } 780 781 /** 782 * vdo_load_super_block() - Allocate a super block and read its contents from storage. 783 * @vdo: The vdo containing the super block on disk. 784 * @parent: The completion to notify after loading the super block. 785 */ 786 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent) 787 { 788 int result; 789 790 result = initialize_super_block(vdo, &vdo->super_block); 791 if (result != VDO_SUCCESS) { 792 vdo_continue_completion(parent, result); 793 return; 794 } 795 796 vdo->super_block.vio.completion.parent = parent; 797 vdo_submit_metadata_vio(&vdo->super_block.vio, 798 vdo_get_data_region_start(vdo->geometry), 799 read_super_block_endio, 800 handle_super_block_read_error, 801 REQ_OP_READ); 802 } 803 804 /** 805 * vdo_get_backing_device() - Get the block device object underlying a vdo. 806 * @vdo: The vdo. 807 * 808 * Return: The vdo's current block device. 809 */ 810 struct block_device *vdo_get_backing_device(const struct vdo *vdo) 811 { 812 return vdo->device_config->owned_device->bdev; 813 } 814 815 /** 816 * vdo_get_device_name() - Get the device name associated with the vdo target. 817 * @target: The target device interface. 818 * 819 * Return: The block device name. 820 */ 821 const char *vdo_get_device_name(const struct dm_target *target) 822 { 823 return dm_device_name(dm_table_get_md(target->table)); 824 } 825 826 /** 827 * vdo_synchronous_flush() - Issue a flush request and wait for it to complete. 828 * @vdo: The vdo. 829 * 830 * Return: VDO_SUCCESS or an error. 831 */ 832 int vdo_synchronous_flush(struct vdo *vdo) 833 { 834 int result; 835 struct bio bio; 836 837 bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0, 838 REQ_OP_WRITE | REQ_PREFLUSH); 839 submit_bio_wait(&bio); 840 result = blk_status_to_errno(bio.bi_status); 841 842 atomic64_inc(&vdo->stats.flush_out); 843 if (result != 0) { 844 uds_log_error_strerror(result, "synchronous flush failed"); 845 result = -EIO; 846 } 847 848 bio_uninit(&bio); 849 return result; 850 } 851 852 /** 853 * vdo_get_state() - Get the current state of the vdo. 854 * @vdo: The vdo. 855 856 * Context: This method may be called from any thread. 857 * 858 * Return: The current state of the vdo. 859 */ 860 enum vdo_state vdo_get_state(const struct vdo *vdo) 861 { 862 enum vdo_state state = atomic_read(&vdo->state); 863 864 /* pairs with barriers where state field is changed */ 865 smp_rmb(); 866 return state; 867 } 868 869 /** 870 * vdo_set_state() - Set the current state of the vdo. 871 * @vdo: The vdo whose state is to be set. 872 * @state: The new state of the vdo. 873 * 874 * Context: This method may be called from any thread. 875 */ 876 void vdo_set_state(struct vdo *vdo, enum vdo_state state) 877 { 878 /* pairs with barrier in vdo_get_state */ 879 smp_wmb(); 880 atomic_set(&vdo->state, state); 881 } 882 883 /** 884 * vdo_get_admin_state() - Get the admin state of the vdo. 885 * @vdo: The vdo. 886 * 887 * Return: The code for the vdo's current admin state. 888 */ 889 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo) 890 { 891 return vdo_get_admin_state_code(&vdo->admin.state); 892 } 893 894 /** 895 * record_vdo() - Record the state of the VDO for encoding in the super block. 896 */ 897 static void record_vdo(struct vdo *vdo) 898 { 899 /* This is for backwards compatibility. */ 900 vdo->states.unused = vdo->geometry.unused; 901 vdo->states.vdo.state = vdo_get_state(vdo); 902 vdo->states.block_map = vdo_record_block_map(vdo->block_map); 903 vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal); 904 vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot); 905 vdo->states.layout = vdo->layout; 906 } 907 908 /** 909 * continue_super_block_parent() - Continue the parent of a super block save operation. 910 * @completion: The super block vio. 911 * 912 * This callback is registered in vdo_save_components(). 913 */ 914 static void continue_super_block_parent(struct vdo_completion *completion) 915 { 916 vdo_continue_completion(vdo_forget(completion->parent), completion->result); 917 } 918 919 /** 920 * handle_save_error() - Log a super block save error. 921 * @completion: The super block vio. 922 * 923 * This error handler is registered in vdo_save_components(). 924 */ 925 static void handle_save_error(struct vdo_completion *completion) 926 { 927 struct vdo_super_block *super_block = 928 container_of(as_vio(completion), struct vdo_super_block, vio); 929 930 vio_record_metadata_io_error(&super_block->vio); 931 uds_log_error_strerror(completion->result, "super block save failed"); 932 /* 933 * Mark the super block as unwritable so that we won't attempt to write it again. This 934 * avoids the case where a growth attempt fails writing the super block with the new size, 935 * but the subsequent attempt to write out the read-only state succeeds. In this case, 936 * writes which happened just before the suspend would not be visible if the VDO is 937 * restarted without rebuilding, but, after a read-only rebuild, the effects of those 938 * writes would reappear. 939 */ 940 super_block->unwritable = true; 941 completion->callback(completion); 942 } 943 944 static void super_block_write_endio(struct bio *bio) 945 { 946 struct vio *vio = bio->bi_private; 947 struct vdo_completion *parent = vio->completion.parent; 948 949 continue_vio_after_io(vio, continue_super_block_parent, 950 parent->callback_thread_id); 951 } 952 953 /** 954 * vdo_save_components() - Encode the vdo and save the super block asynchronously. 955 * @vdo: The vdo whose state is being saved. 956 * @parent: The completion to notify when the save is complete. 957 */ 958 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent) 959 { 960 struct vdo_super_block *super_block = &vdo->super_block; 961 962 if (super_block->unwritable) { 963 vdo_continue_completion(parent, VDO_READ_ONLY); 964 return; 965 } 966 967 if (super_block->vio.completion.parent != NULL) { 968 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 969 return; 970 } 971 972 record_vdo(vdo); 973 974 vdo_encode_super_block(super_block->buffer, &vdo->states); 975 super_block->vio.completion.parent = parent; 976 super_block->vio.completion.callback_thread_id = parent->callback_thread_id; 977 vdo_submit_metadata_vio(&super_block->vio, 978 vdo_get_data_region_start(vdo->geometry), 979 super_block_write_endio, handle_save_error, 980 REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA); 981 } 982 983 /** 984 * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes 985 * read-only. 986 * @vdo: The vdo to register with. 987 * @listener: The object to notify. 988 * @notification: The function to call to send the notification. 989 * @thread_id: The id of the thread on which to send the notification. 990 * 991 * Return: VDO_SUCCESS or an error. 992 */ 993 int vdo_register_read_only_listener(struct vdo *vdo, void *listener, 994 vdo_read_only_notification_fn notification, 995 thread_id_t thread_id) 996 { 997 struct vdo_thread *thread = &vdo->threads[thread_id]; 998 struct read_only_listener *read_only_listener; 999 int result; 1000 1001 result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread, 1002 "read only listener not registered on dedupe thread"); 1003 if (result != VDO_SUCCESS) 1004 return result; 1005 1006 result = vdo_allocate(1, struct read_only_listener, __func__, 1007 &read_only_listener); 1008 if (result != VDO_SUCCESS) 1009 return result; 1010 1011 *read_only_listener = (struct read_only_listener) { 1012 .listener = listener, 1013 .notify = notification, 1014 .next = thread->listeners, 1015 }; 1016 1017 thread->listeners = read_only_listener; 1018 return VDO_SUCCESS; 1019 } 1020 1021 /** 1022 * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only. 1023 * @listener: The vdo. 1024 * @parent: The completion to notify in order to acknowledge the notification. 1025 * 1026 * This will save the read-only state to the super block. 1027 * 1028 * Implements vdo_read_only_notification_fn. 1029 */ 1030 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent) 1031 { 1032 struct vdo *vdo = listener; 1033 1034 if (vdo_in_read_only_mode(vdo)) 1035 vdo_finish_completion(parent); 1036 1037 vdo_set_state(vdo, VDO_READ_ONLY_MODE); 1038 vdo_save_components(vdo, parent); 1039 } 1040 1041 /** 1042 * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors. 1043 * @vdo: The vdo to enable. 1044 * 1045 * Return: VDO_SUCCESS or an error. 1046 */ 1047 int vdo_enable_read_only_entry(struct vdo *vdo) 1048 { 1049 thread_id_t id; 1050 bool is_read_only = vdo_in_read_only_mode(vdo); 1051 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1052 1053 if (is_read_only) { 1054 notifier->read_only_error = VDO_READ_ONLY; 1055 notifier->state = NOTIFIED; 1056 } else { 1057 notifier->state = MAY_NOT_NOTIFY; 1058 } 1059 1060 spin_lock_init(¬ifier->lock); 1061 vdo_initialize_completion(¬ifier->completion, vdo, 1062 VDO_READ_ONLY_MODE_COMPLETION); 1063 1064 for (id = 0; id < vdo->thread_config.thread_count; id++) 1065 vdo->threads[id].is_read_only = is_read_only; 1066 1067 return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode, 1068 vdo->thread_config.admin_thread); 1069 } 1070 1071 /** 1072 * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in 1073 * progress and prevent any subsequent 1074 * notifications. 1075 * @parent: The completion to notify when no threads are entering read-only mode. 1076 * 1077 * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry(). 1078 */ 1079 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent) 1080 { 1081 struct vdo *vdo = parent->vdo; 1082 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1083 1084 vdo_assert_on_admin_thread(vdo, __func__); 1085 1086 if (notifier->waiter != NULL) { 1087 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1088 return; 1089 } 1090 1091 spin_lock(¬ifier->lock); 1092 if (notifier->state == NOTIFYING) 1093 notifier->waiter = parent; 1094 else if (notifier->state == MAY_NOTIFY) 1095 notifier->state = MAY_NOT_NOTIFY; 1096 spin_unlock(¬ifier->lock); 1097 1098 if (notifier->waiter == NULL) { 1099 /* 1100 * A notification was not in progress, and now they are 1101 * disallowed. 1102 */ 1103 vdo_launch_completion(parent); 1104 return; 1105 } 1106 } 1107 1108 /** 1109 * as_notifier() - Convert a generic vdo_completion to a read_only_notifier. 1110 * @completion: The completion to convert. 1111 * 1112 * Return: The completion as a read_only_notifier. 1113 */ 1114 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion) 1115 { 1116 vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION); 1117 return container_of(completion, struct read_only_notifier, completion); 1118 } 1119 1120 /** 1121 * finish_entering_read_only_mode() - Complete the process of entering read only mode. 1122 * @completion: The read-only mode completion. 1123 */ 1124 static void finish_entering_read_only_mode(struct vdo_completion *completion) 1125 { 1126 struct read_only_notifier *notifier = as_notifier(completion); 1127 1128 vdo_assert_on_admin_thread(completion->vdo, __func__); 1129 1130 spin_lock(¬ifier->lock); 1131 notifier->state = NOTIFIED; 1132 spin_unlock(¬ifier->lock); 1133 1134 if (notifier->waiter != NULL) 1135 vdo_continue_completion(vdo_forget(notifier->waiter), 1136 completion->result); 1137 } 1138 1139 /** 1140 * make_thread_read_only() - Inform each thread that the VDO is in read-only mode. 1141 * @completion: The read-only mode completion. 1142 */ 1143 static void make_thread_read_only(struct vdo_completion *completion) 1144 { 1145 struct vdo *vdo = completion->vdo; 1146 thread_id_t thread_id = completion->callback_thread_id; 1147 struct read_only_notifier *notifier = as_notifier(completion); 1148 struct read_only_listener *listener = completion->parent; 1149 1150 if (listener == NULL) { 1151 /* This is the first call on this thread */ 1152 struct vdo_thread *thread = &vdo->threads[thread_id]; 1153 1154 thread->is_read_only = true; 1155 listener = thread->listeners; 1156 if (thread_id == 0) 1157 uds_log_error_strerror(READ_ONCE(notifier->read_only_error), 1158 "Unrecoverable error, entering read-only mode"); 1159 } else { 1160 /* We've just finished notifying a listener */ 1161 listener = listener->next; 1162 } 1163 1164 if (listener != NULL) { 1165 /* We have a listener to notify */ 1166 vdo_prepare_completion(completion, make_thread_read_only, 1167 make_thread_read_only, thread_id, 1168 listener); 1169 listener->notify(listener->listener, completion); 1170 return; 1171 } 1172 1173 /* We're done with this thread */ 1174 if (++thread_id == vdo->thread_config.dedupe_thread) { 1175 /* 1176 * We don't want to notify the dedupe thread since it may be 1177 * blocked rebuilding the index. 1178 */ 1179 thread_id++; 1180 } 1181 1182 if (thread_id >= vdo->thread_config.thread_count) { 1183 /* There are no more threads */ 1184 vdo_prepare_completion(completion, finish_entering_read_only_mode, 1185 finish_entering_read_only_mode, 1186 vdo->thread_config.admin_thread, NULL); 1187 } else { 1188 vdo_prepare_completion(completion, make_thread_read_only, 1189 make_thread_read_only, thread_id, NULL); 1190 } 1191 1192 vdo_launch_completion(completion); 1193 } 1194 1195 /** 1196 * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode, 1197 * reversing the effects of 1198 * vdo_wait_until_not_entering_read_only_mode(). 1199 * @parent: The object to notify once the operation is complete. 1200 * 1201 * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it 1202 * will be done when this method is called. If that happens, the parent will not be notified until 1203 * the vdo has actually entered read-only mode and attempted to save the super block. 1204 * 1205 * Context: This method may only be called from the admin thread. 1206 */ 1207 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent) 1208 { 1209 struct vdo *vdo = parent->vdo; 1210 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1211 1212 vdo_assert_on_admin_thread(vdo, __func__); 1213 1214 if (notifier->waiter != NULL) { 1215 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1216 return; 1217 } 1218 1219 spin_lock(¬ifier->lock); 1220 if (notifier->state == MAY_NOT_NOTIFY) { 1221 if (notifier->read_only_error == VDO_SUCCESS) { 1222 notifier->state = MAY_NOTIFY; 1223 } else { 1224 notifier->state = NOTIFYING; 1225 notifier->waiter = parent; 1226 } 1227 } 1228 spin_unlock(¬ifier->lock); 1229 1230 if (notifier->waiter == NULL) { 1231 /* We're done */ 1232 vdo_launch_completion(parent); 1233 return; 1234 } 1235 1236 /* Do the pending notification. */ 1237 make_thread_read_only(¬ifier->completion); 1238 } 1239 1240 /** 1241 * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the 1242 * super block. 1243 * @vdo: The vdo. 1244 * @error_code: The error which caused the VDO to enter read-only mode. 1245 * 1246 * This method is a no-op if the VDO is already read-only. 1247 */ 1248 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code) 1249 { 1250 bool notify = false; 1251 thread_id_t thread_id = vdo_get_callback_thread_id(); 1252 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1253 struct vdo_thread *thread; 1254 1255 if (thread_id != VDO_INVALID_THREAD_ID) { 1256 thread = &vdo->threads[thread_id]; 1257 if (thread->is_read_only) { 1258 /* This thread has already gone read-only. */ 1259 return; 1260 } 1261 1262 /* Record for this thread that the VDO is read-only. */ 1263 thread->is_read_only = true; 1264 } 1265 1266 spin_lock(¬ifier->lock); 1267 if (notifier->read_only_error == VDO_SUCCESS) { 1268 WRITE_ONCE(notifier->read_only_error, error_code); 1269 if (notifier->state == MAY_NOTIFY) { 1270 notifier->state = NOTIFYING; 1271 notify = true; 1272 } 1273 } 1274 spin_unlock(¬ifier->lock); 1275 1276 if (!notify) { 1277 /* The notifier is already aware of a read-only error */ 1278 return; 1279 } 1280 1281 /* Initiate a notification starting on the lowest numbered thread. */ 1282 vdo_launch_completion_callback(¬ifier->completion, make_thread_read_only, 0); 1283 } 1284 1285 /** 1286 * vdo_is_read_only() - Check whether the VDO is read-only. 1287 * @vdo: The vdo. 1288 * 1289 * Return: true if the vdo is read-only. 1290 * 1291 * This method may be called from any thread, as opposed to examining the VDO's state field which 1292 * is only safe to check from the admin thread. 1293 */ 1294 bool vdo_is_read_only(struct vdo *vdo) 1295 { 1296 return vdo->threads[vdo_get_callback_thread_id()].is_read_only; 1297 } 1298 1299 /** 1300 * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode. 1301 * @vdo: The vdo to query. 1302 * 1303 * Return: true if the vdo is in read-only mode. 1304 */ 1305 bool vdo_in_read_only_mode(const struct vdo *vdo) 1306 { 1307 return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE); 1308 } 1309 1310 /** 1311 * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode. 1312 * @vdo: The vdo to query. 1313 * 1314 * Return: true if the vdo is in recovery mode. 1315 */ 1316 bool vdo_in_recovery_mode(const struct vdo *vdo) 1317 { 1318 return (vdo_get_state(vdo) == VDO_RECOVERING); 1319 } 1320 1321 /** 1322 * vdo_enter_recovery_mode() - Put the vdo into recovery mode. 1323 * @vdo: The vdo. 1324 */ 1325 void vdo_enter_recovery_mode(struct vdo *vdo) 1326 { 1327 vdo_assert_on_admin_thread(vdo, __func__); 1328 1329 if (vdo_in_read_only_mode(vdo)) 1330 return; 1331 1332 uds_log_info("Entering recovery mode"); 1333 vdo_set_state(vdo, VDO_RECOVERING); 1334 } 1335 1336 /** 1337 * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete. 1338 * @completion: The sync completion. 1339 */ 1340 static void complete_synchronous_action(struct vdo_completion *completion) 1341 { 1342 vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION); 1343 complete(&(container_of(completion, struct sync_completion, 1344 vdo_completion)->completion)); 1345 } 1346 1347 /** 1348 * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete. 1349 * @vdo: The vdo. 1350 * @action: The callback to launch. 1351 * @thread_id: The thread on which to run the action. 1352 * @parent: The parent of the sync completion (may be NULL). 1353 */ 1354 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action, 1355 thread_id_t thread_id, void *parent) 1356 { 1357 struct sync_completion sync; 1358 1359 vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION); 1360 init_completion(&sync.completion); 1361 sync.vdo_completion.parent = parent; 1362 vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id); 1363 wait_for_completion(&sync.completion); 1364 return sync.vdo_completion.result; 1365 } 1366 1367 /** 1368 * set_compression_callback() - Callback to turn compression on or off. 1369 * @completion: The completion. 1370 */ 1371 static void set_compression_callback(struct vdo_completion *completion) 1372 { 1373 struct vdo *vdo = completion->vdo; 1374 bool *enable = completion->parent; 1375 bool was_enabled = vdo_get_compressing(vdo); 1376 1377 if (*enable != was_enabled) { 1378 WRITE_ONCE(vdo->compressing, *enable); 1379 if (was_enabled) { 1380 /* Signal the packer to flush since compression has been disabled. */ 1381 vdo_flush_packer(vdo->packer); 1382 } 1383 } 1384 1385 uds_log_info("compression is %s", (*enable ? "enabled" : "disabled")); 1386 *enable = was_enabled; 1387 complete_synchronous_action(completion); 1388 } 1389 1390 /** 1391 * vdo_set_compressing() - Turn compression on or off. 1392 * @vdo: The vdo. 1393 * @enable: Whether to enable or disable compression. 1394 * 1395 * Return: Whether compression was previously on or off. 1396 */ 1397 bool vdo_set_compressing(struct vdo *vdo, bool enable) 1398 { 1399 perform_synchronous_action(vdo, set_compression_callback, 1400 vdo->thread_config.packer_thread, 1401 &enable); 1402 return enable; 1403 } 1404 1405 /** 1406 * vdo_get_compressing() - Get whether compression is enabled in a vdo. 1407 * @vdo: The vdo. 1408 * 1409 * Return: State of compression. 1410 */ 1411 bool vdo_get_compressing(struct vdo *vdo) 1412 { 1413 return READ_ONCE(vdo->compressing); 1414 } 1415 1416 static size_t get_block_map_cache_size(const struct vdo *vdo) 1417 { 1418 return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE; 1419 } 1420 1421 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo) 1422 { 1423 /* 1424 * The error counts can be incremented from arbitrary threads and so must be incremented 1425 * atomically, but they are just statistics with no semantics that could rely on memory 1426 * order, so unfenced reads are sufficient. 1427 */ 1428 const struct atomic_statistics *atoms = &vdo->stats; 1429 1430 return (struct error_statistics) { 1431 .invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count), 1432 .no_space_error_count = atomic64_read(&atoms->no_space_error_count), 1433 .read_only_error_count = atomic64_read(&atoms->read_only_error_count), 1434 }; 1435 } 1436 1437 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a) 1438 { 1439 b->read = atomic64_read(&a->read); 1440 b->write = atomic64_read(&a->write); 1441 b->discard = atomic64_read(&a->discard); 1442 b->flush = atomic64_read(&a->flush); 1443 b->empty_flush = atomic64_read(&a->empty_flush); 1444 b->fua = atomic64_read(&a->fua); 1445 } 1446 1447 static struct bio_stats subtract_bio_stats(struct bio_stats minuend, 1448 struct bio_stats subtrahend) 1449 { 1450 return (struct bio_stats) { 1451 .read = minuend.read - subtrahend.read, 1452 .write = minuend.write - subtrahend.write, 1453 .discard = minuend.discard - subtrahend.discard, 1454 .flush = minuend.flush - subtrahend.flush, 1455 .empty_flush = minuend.empty_flush - subtrahend.empty_flush, 1456 .fua = minuend.fua - subtrahend.fua, 1457 }; 1458 } 1459 1460 /** 1461 * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data. 1462 * @vdo: The vdo. 1463 * 1464 * Return: The number of blocks allocated for user data. 1465 */ 1466 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo) 1467 { 1468 return (vdo_get_slab_depot_allocated_blocks(vdo->depot) - 1469 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal)); 1470 } 1471 1472 /** 1473 * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata. 1474 * @vdo: The vdo. 1475 * 1476 * Return: The number of overhead blocks. 1477 */ 1478 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo) 1479 { 1480 /* 1481 * config.physical_blocks is mutated during resize and is in a packed structure, 1482 * but resize runs on admin thread. 1483 * TODO: Verify that this is always safe. 1484 */ 1485 return (vdo->states.vdo.config.physical_blocks - 1486 vdo_get_slab_depot_data_blocks(vdo->depot) + 1487 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal)); 1488 } 1489 1490 static const char *vdo_describe_state(enum vdo_state state) 1491 { 1492 /* These strings should all fit in the 15 chars of VDOStatistics.mode. */ 1493 switch (state) { 1494 case VDO_RECOVERING: 1495 return "recovering"; 1496 1497 case VDO_READ_ONLY_MODE: 1498 return "read-only"; 1499 1500 default: 1501 return "normal"; 1502 } 1503 } 1504 1505 /** 1506 * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread. 1507 * @vdo: The vdo. 1508 * @stats: The statistics structure to populate. 1509 */ 1510 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats) 1511 { 1512 struct recovery_journal *journal = vdo->recovery_journal; 1513 enum vdo_state state = vdo_get_state(vdo); 1514 1515 vdo_assert_on_admin_thread(vdo, __func__); 1516 1517 /* start with a clean slate */ 1518 memset(stats, 0, sizeof(struct vdo_statistics)); 1519 1520 /* 1521 * These are immutable properties of the vdo object, so it is safe to query them from any 1522 * thread. 1523 */ 1524 stats->version = STATISTICS_VERSION; 1525 stats->logical_blocks = vdo->states.vdo.config.logical_blocks; 1526 /* 1527 * config.physical_blocks is mutated during resize and is in a packed structure, but resize 1528 * runs on the admin thread. 1529 * TODO: verify that this is always safe 1530 */ 1531 stats->physical_blocks = vdo->states.vdo.config.physical_blocks; 1532 stats->block_size = VDO_BLOCK_SIZE; 1533 stats->complete_recoveries = vdo->states.vdo.complete_recoveries; 1534 stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries; 1535 stats->block_map_cache_size = get_block_map_cache_size(vdo); 1536 1537 /* The callees are responsible for thread-safety. */ 1538 stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo); 1539 stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo); 1540 stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal); 1541 vdo_get_slab_depot_statistics(vdo->depot, stats); 1542 stats->journal = vdo_get_recovery_journal_statistics(journal); 1543 stats->packer = vdo_get_packer_statistics(vdo->packer); 1544 stats->block_map = vdo_get_block_map_statistics(vdo->block_map); 1545 vdo_get_dedupe_statistics(vdo->hash_zones, stats); 1546 stats->errors = get_vdo_error_statistics(vdo); 1547 stats->in_recovery_mode = (state == VDO_RECOVERING); 1548 snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state)); 1549 1550 stats->instance = vdo->instance; 1551 stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool); 1552 stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool); 1553 1554 stats->flush_out = atomic64_read(&vdo->stats.flush_out); 1555 stats->logical_block_size = vdo->device_config->logical_block_size; 1556 copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in); 1557 copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial); 1558 copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out); 1559 copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta); 1560 copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal); 1561 copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache); 1562 copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed); 1563 copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed); 1564 copy_bio_stat(&stats->bios_journal_completed, 1565 &vdo->stats.bios_journal_completed); 1566 copy_bio_stat(&stats->bios_page_cache_completed, 1567 &vdo->stats.bios_page_cache_completed); 1568 copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged); 1569 copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial); 1570 stats->bios_in_progress = 1571 subtract_bio_stats(stats->bios_in, stats->bios_acknowledged); 1572 vdo_get_memory_stats(&stats->memory_usage.bytes_used, 1573 &stats->memory_usage.peak_bytes_used); 1574 } 1575 1576 /** 1577 * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics 1578 * structure on the admin thread. 1579 * @completion: The completion. 1580 * 1581 * This callback is registered in vdo_fetch_statistics(). 1582 */ 1583 static void vdo_fetch_statistics_callback(struct vdo_completion *completion) 1584 { 1585 get_vdo_statistics(completion->vdo, completion->parent); 1586 complete_synchronous_action(completion); 1587 } 1588 1589 /** 1590 * vdo_fetch_statistics() - Fetch statistics on the correct thread. 1591 * @vdo: The vdo. 1592 * @stats: The vdo statistics are returned here. 1593 */ 1594 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats) 1595 { 1596 perform_synchronous_action(vdo, vdo_fetch_statistics_callback, 1597 vdo->thread_config.admin_thread, stats); 1598 } 1599 1600 /** 1601 * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is 1602 * currently running. 1603 * 1604 * Return: The current thread ID, or -1 if no such thread. 1605 */ 1606 thread_id_t vdo_get_callback_thread_id(void) 1607 { 1608 struct vdo_work_queue *queue = vdo_get_current_work_queue(); 1609 struct vdo_thread *thread; 1610 thread_id_t thread_id; 1611 1612 if (queue == NULL) 1613 return VDO_INVALID_THREAD_ID; 1614 1615 thread = vdo_get_work_queue_owner(queue); 1616 thread_id = thread->thread_id; 1617 1618 if (PARANOID_THREAD_CONSISTENCY_CHECKS) { 1619 BUG_ON(thread_id >= thread->vdo->thread_config.thread_count); 1620 BUG_ON(thread != &thread->vdo->threads[thread_id]); 1621 } 1622 1623 return thread_id; 1624 } 1625 1626 /** 1627 * vdo_dump_status() - Dump status information about a vdo to the log for debugging. 1628 * @vdo: The vdo to dump. 1629 */ 1630 void vdo_dump_status(const struct vdo *vdo) 1631 { 1632 zone_count_t zone; 1633 1634 vdo_dump_flusher(vdo->flusher); 1635 vdo_dump_recovery_journal_statistics(vdo->recovery_journal); 1636 vdo_dump_packer(vdo->packer); 1637 vdo_dump_slab_depot(vdo->depot); 1638 1639 for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++) 1640 vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]); 1641 1642 for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++) 1643 vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]); 1644 1645 vdo_dump_hash_zones(vdo->hash_zones); 1646 } 1647 1648 /** 1649 * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread. 1650 * @vdo: The vdo. 1651 * @name: The name of the function which should be running on the admin thread (for logging). 1652 */ 1653 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name) 1654 { 1655 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread), 1656 "%s called on admin thread", name); 1657 } 1658 1659 /** 1660 * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified 1661 * logical zone thread. 1662 * @vdo: The vdo. 1663 * @logical_zone: The number of the logical zone. 1664 * @name: The name of the calling function. 1665 */ 1666 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone, 1667 const char *name) 1668 { 1669 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == 1670 vdo->thread_config.logical_threads[logical_zone]), 1671 "%s called on logical thread", name); 1672 } 1673 1674 /** 1675 * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified 1676 * physical zone thread. 1677 * @vdo: The vdo. 1678 * @physical_zone: The number of the physical zone. 1679 * @name: The name of the calling function. 1680 */ 1681 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo, 1682 zone_count_t physical_zone, const char *name) 1683 { 1684 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == 1685 vdo->thread_config.physical_threads[physical_zone]), 1686 "%s called on physical thread", name); 1687 } 1688 1689 /** 1690 * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number. 1691 * @vdo: The vdo containing the physical zones. 1692 * @pbn: The PBN of the data block. 1693 * @zone_ptr: A pointer to return the physical zone. 1694 * 1695 * Gets the physical zone responsible for a given physical block number of a data block in this vdo 1696 * instance, or of the zero block (for which a NULL zone is returned). For any other block number 1697 * that is not in the range of valid data block numbers in any slab, an error will be returned. 1698 * This function is safe to call on invalid block numbers; it will not put the vdo into read-only 1699 * mode. 1700 * 1701 * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any 1702 * other failure. 1703 */ 1704 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn, 1705 struct physical_zone **zone_ptr) 1706 { 1707 struct vdo_slab *slab; 1708 int result; 1709 1710 if (pbn == VDO_ZERO_BLOCK) { 1711 *zone_ptr = NULL; 1712 return VDO_SUCCESS; 1713 } 1714 1715 /* 1716 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first 1717 * because it won't trigger read-only mode on an invalid PBN. 1718 */ 1719 if (!vdo_is_physical_data_block(vdo->depot, pbn)) 1720 return VDO_OUT_OF_RANGE; 1721 1722 /* With the PBN already checked, we should always succeed in finding a slab. */ 1723 slab = vdo_get_slab(vdo->depot, pbn); 1724 result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs"); 1725 if (result != VDO_SUCCESS) 1726 return result; 1727 1728 *zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number]; 1729 return VDO_SUCCESS; 1730 } 1731