1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright (C) 2002 Sistina Software (UK) Limited. 4 * Copyright (C) 2006 Red Hat GmbH 5 * 6 * This file is released under the GPL. 7 * 8 * Kcopyd provides a simple interface for copying an area of one 9 * block-device to one or more other block-devices, with an asynchronous 10 * completion notification. 11 */ 12 13 #include <linux/types.h> 14 #include <linux/atomic.h> 15 #include <linux/blkdev.h> 16 #include <linux/fs.h> 17 #include <linux/init.h> 18 #include <linux/list.h> 19 #include <linux/mempool.h> 20 #include <linux/module.h> 21 #include <linux/pagemap.h> 22 #include <linux/slab.h> 23 #include <linux/vmalloc.h> 24 #include <linux/workqueue.h> 25 #include <linux/mutex.h> 26 #include <linux/delay.h> 27 #include <linux/device-mapper.h> 28 #include <linux/dm-kcopyd.h> 29 30 #include "dm-core.h" 31 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 35 #define DEFAULT_SUB_JOB_SIZE_KB 512 36 #define MAX_SUB_JOB_SIZE_KB 1024 37 38 static unsigned int kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB; 39 40 module_param(kcopyd_subjob_size_kb, uint, 0644); 41 MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients"); 42 43 static unsigned int dm_get_kcopyd_subjob_size(void) 44 { 45 unsigned int sub_job_size_kb; 46 47 sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb, 48 DEFAULT_SUB_JOB_SIZE_KB, 49 MAX_SUB_JOB_SIZE_KB); 50 51 return sub_job_size_kb << 1; 52 } 53 54 /* 55 *---------------------------------------------------------------- 56 * Each kcopyd client has its own little pool of preallocated 57 * pages for kcopyd io. 58 *--------------------------------------------------------------- 59 */ 60 struct dm_kcopyd_client { 61 struct page_list *pages; 62 unsigned int nr_reserved_pages; 63 unsigned int nr_free_pages; 64 unsigned int sub_job_size; 65 66 struct dm_io_client *io_client; 67 68 wait_queue_head_t destroyq; 69 70 mempool_t job_pool; 71 72 struct workqueue_struct *kcopyd_wq; 73 struct work_struct kcopyd_work; 74 75 struct dm_kcopyd_throttle *throttle; 76 77 atomic_t nr_jobs; 78 79 /* 80 * We maintain four lists of jobs: 81 * 82 * i) jobs waiting for pages 83 * ii) jobs that have pages, and are waiting for the io to be issued. 84 * iii) jobs that don't need to do any IO and just run a callback 85 * iv) jobs that have completed. 86 * 87 * All four of these are protected by job_lock. 88 */ 89 spinlock_t job_lock; 90 struct list_head callback_jobs; 91 struct list_head complete_jobs; 92 struct list_head io_jobs; 93 struct list_head pages_jobs; 94 }; 95 96 static struct page_list zero_page_list; 97 98 static DEFINE_SPINLOCK(throttle_spinlock); 99 100 /* 101 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 102 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 103 * by 2. 104 */ 105 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 106 107 /* 108 * Sleep this number of milliseconds. 109 * 110 * The value was decided experimentally. 111 * Smaller values seem to cause an increased copy rate above the limit. 112 * The reason for this is unknown but possibly due to jiffies rounding errors 113 * or read/write cache inside the disk. 114 */ 115 #define SLEEP_USEC 100000 116 117 /* 118 * Maximum number of sleep events. There is a theoretical livelock if more 119 * kcopyd clients do work simultaneously which this limit avoids. 120 */ 121 #define MAX_SLEEPS 10 122 123 static void io_job_start(struct dm_kcopyd_throttle *t) 124 { 125 unsigned int throttle, now, difference; 126 int slept = 0, skew; 127 128 if (unlikely(!t)) 129 return; 130 131 try_again: 132 spin_lock_irq(&throttle_spinlock); 133 134 throttle = READ_ONCE(t->throttle); 135 136 if (likely(throttle >= 100)) 137 goto skip_limit; 138 139 now = jiffies; 140 difference = now - t->last_jiffies; 141 t->last_jiffies = now; 142 if (t->num_io_jobs) 143 t->io_period += difference; 144 t->total_period += difference; 145 146 /* 147 * Maintain sane values if we got a temporary overflow. 148 */ 149 if (unlikely(t->io_period > t->total_period)) 150 t->io_period = t->total_period; 151 152 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 153 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 154 155 t->total_period >>= shift; 156 t->io_period >>= shift; 157 } 158 159 skew = t->io_period - throttle * t->total_period / 100; 160 161 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 162 slept++; 163 spin_unlock_irq(&throttle_spinlock); 164 fsleep(SLEEP_USEC); 165 goto try_again; 166 } 167 168 skip_limit: 169 t->num_io_jobs++; 170 171 spin_unlock_irq(&throttle_spinlock); 172 } 173 174 static void io_job_finish(struct dm_kcopyd_throttle *t) 175 { 176 unsigned long flags; 177 178 if (unlikely(!t)) 179 return; 180 181 spin_lock_irqsave(&throttle_spinlock, flags); 182 183 t->num_io_jobs--; 184 185 if (likely(READ_ONCE(t->throttle) >= 100)) 186 goto skip_limit; 187 188 if (!t->num_io_jobs) { 189 unsigned int now, difference; 190 191 now = jiffies; 192 difference = now - t->last_jiffies; 193 t->last_jiffies = now; 194 195 t->io_period += difference; 196 t->total_period += difference; 197 198 /* 199 * Maintain sane values if we got a temporary overflow. 200 */ 201 if (unlikely(t->io_period > t->total_period)) 202 t->io_period = t->total_period; 203 } 204 205 skip_limit: 206 spin_unlock_irqrestore(&throttle_spinlock, flags); 207 } 208 209 210 static void wake(struct dm_kcopyd_client *kc) 211 { 212 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 213 } 214 215 /* 216 * Obtain one page for the use of kcopyd. 217 */ 218 static struct page_list *alloc_pl(gfp_t gfp) 219 { 220 struct page_list *pl; 221 222 pl = kmalloc(sizeof(*pl), gfp); 223 if (!pl) 224 return NULL; 225 226 pl->page = alloc_page(gfp | __GFP_HIGHMEM); 227 if (!pl->page) { 228 kfree(pl); 229 return NULL; 230 } 231 232 return pl; 233 } 234 235 static void free_pl(struct page_list *pl) 236 { 237 __free_page(pl->page); 238 kfree(pl); 239 } 240 241 /* 242 * Add the provided pages to a client's free page list, releasing 243 * back to the system any beyond the reserved_pages limit. 244 */ 245 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 246 { 247 struct page_list *next; 248 249 do { 250 next = pl->next; 251 252 if (kc->nr_free_pages >= kc->nr_reserved_pages) 253 free_pl(pl); 254 else { 255 pl->next = kc->pages; 256 kc->pages = pl; 257 kc->nr_free_pages++; 258 } 259 260 pl = next; 261 } while (pl); 262 } 263 264 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 265 unsigned int nr, struct page_list **pages) 266 { 267 struct page_list *pl; 268 269 *pages = NULL; 270 271 do { 272 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 273 if (unlikely(!pl)) { 274 /* Use reserved pages */ 275 pl = kc->pages; 276 if (unlikely(!pl)) 277 goto out_of_memory; 278 kc->pages = pl->next; 279 kc->nr_free_pages--; 280 } 281 pl->next = *pages; 282 *pages = pl; 283 } while (--nr); 284 285 return 0; 286 287 out_of_memory: 288 if (*pages) 289 kcopyd_put_pages(kc, *pages); 290 return -ENOMEM; 291 } 292 293 /* 294 * These three functions resize the page pool. 295 */ 296 static void drop_pages(struct page_list *pl) 297 { 298 struct page_list *next; 299 300 while (pl) { 301 next = pl->next; 302 free_pl(pl); 303 pl = next; 304 } 305 } 306 307 /* 308 * Allocate and reserve nr_pages for the use of a specific client. 309 */ 310 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned int nr_pages) 311 { 312 unsigned int i; 313 struct page_list *pl = NULL, *next; 314 315 for (i = 0; i < nr_pages; i++) { 316 next = alloc_pl(GFP_KERNEL); 317 if (!next) { 318 if (pl) 319 drop_pages(pl); 320 return -ENOMEM; 321 } 322 next->next = pl; 323 pl = next; 324 } 325 326 kc->nr_reserved_pages += nr_pages; 327 kcopyd_put_pages(kc, pl); 328 329 return 0; 330 } 331 332 static void client_free_pages(struct dm_kcopyd_client *kc) 333 { 334 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 335 drop_pages(kc->pages); 336 kc->pages = NULL; 337 kc->nr_free_pages = kc->nr_reserved_pages = 0; 338 } 339 340 /* 341 *--------------------------------------------------------------- 342 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 343 * for this reason we use a mempool to prevent the client from 344 * ever having to do io (which could cause a deadlock). 345 *--------------------------------------------------------------- 346 */ 347 struct kcopyd_job { 348 struct dm_kcopyd_client *kc; 349 struct list_head list; 350 unsigned int flags; 351 352 /* 353 * Error state of the job. 354 */ 355 int read_err; 356 unsigned long write_err; 357 358 /* 359 * REQ_OP_READ, REQ_OP_WRITE or REQ_OP_WRITE_ZEROES. 360 */ 361 enum req_op op; 362 struct dm_io_region source; 363 364 /* 365 * The destinations for the transfer. 366 */ 367 unsigned int num_dests; 368 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 369 370 struct page_list *pages; 371 372 /* 373 * Set this to ensure you are notified when the job has 374 * completed. 'context' is for callback to use. 375 */ 376 dm_kcopyd_notify_fn fn; 377 void *context; 378 379 /* 380 * These fields are only used if the job has been split 381 * into more manageable parts. 382 */ 383 struct mutex lock; 384 atomic_t sub_jobs; 385 sector_t progress; 386 sector_t write_offset; 387 388 struct kcopyd_job *master_job; 389 }; 390 391 static struct kmem_cache *_job_cache; 392 393 int __init dm_kcopyd_init(void) 394 { 395 _job_cache = kmem_cache_create("kcopyd_job", 396 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 397 __alignof__(struct kcopyd_job), 0, NULL); 398 if (!_job_cache) 399 return -ENOMEM; 400 401 zero_page_list.next = &zero_page_list; 402 zero_page_list.page = ZERO_PAGE(0); 403 404 return 0; 405 } 406 407 void dm_kcopyd_exit(void) 408 { 409 kmem_cache_destroy(_job_cache); 410 _job_cache = NULL; 411 } 412 413 /* 414 * Functions to push and pop a job onto the head of a given job 415 * list. 416 */ 417 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 418 struct dm_kcopyd_client *kc) 419 { 420 struct kcopyd_job *job; 421 422 /* 423 * For I/O jobs, pop any read, any write without sequential write 424 * constraint and sequential writes that are at the right position. 425 */ 426 list_for_each_entry(job, jobs, list) { 427 if (job->op == REQ_OP_READ || 428 !(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) { 429 list_del(&job->list); 430 return job; 431 } 432 433 if (job->write_offset == job->master_job->write_offset) { 434 job->master_job->write_offset += job->source.count; 435 list_del(&job->list); 436 return job; 437 } 438 } 439 440 return NULL; 441 } 442 443 static struct kcopyd_job *pop(struct list_head *jobs, 444 struct dm_kcopyd_client *kc) 445 { 446 struct kcopyd_job *job = NULL; 447 448 spin_lock_irq(&kc->job_lock); 449 450 if (!list_empty(jobs)) { 451 if (jobs == &kc->io_jobs) 452 job = pop_io_job(jobs, kc); 453 else { 454 job = list_entry(jobs->next, struct kcopyd_job, list); 455 list_del(&job->list); 456 } 457 } 458 spin_unlock_irq(&kc->job_lock); 459 460 return job; 461 } 462 463 static void push(struct list_head *jobs, struct kcopyd_job *job) 464 { 465 unsigned long flags; 466 struct dm_kcopyd_client *kc = job->kc; 467 468 spin_lock_irqsave(&kc->job_lock, flags); 469 list_add_tail(&job->list, jobs); 470 spin_unlock_irqrestore(&kc->job_lock, flags); 471 } 472 473 474 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 475 { 476 struct dm_kcopyd_client *kc = job->kc; 477 478 spin_lock_irq(&kc->job_lock); 479 list_add(&job->list, jobs); 480 spin_unlock_irq(&kc->job_lock); 481 } 482 483 /* 484 * These three functions process 1 item from the corresponding 485 * job list. 486 * 487 * They return: 488 * < 0: error 489 * 0: success 490 * > 0: can't process yet. 491 */ 492 static int run_complete_job(struct kcopyd_job *job) 493 { 494 void *context = job->context; 495 int read_err = job->read_err; 496 unsigned long write_err = job->write_err; 497 dm_kcopyd_notify_fn fn = job->fn; 498 struct dm_kcopyd_client *kc = job->kc; 499 500 if (job->pages && job->pages != &zero_page_list) 501 kcopyd_put_pages(kc, job->pages); 502 /* 503 * If this is the master job, the sub jobs have already 504 * completed so we can free everything. 505 */ 506 if (job->master_job == job) { 507 mutex_destroy(&job->lock); 508 mempool_free(job, &kc->job_pool); 509 } 510 fn(read_err, write_err, context); 511 512 if (atomic_dec_and_test(&kc->nr_jobs)) 513 wake_up(&kc->destroyq); 514 515 cond_resched(); 516 517 return 0; 518 } 519 520 static void complete_io(unsigned long error, void *context) 521 { 522 struct kcopyd_job *job = context; 523 struct dm_kcopyd_client *kc = job->kc; 524 525 io_job_finish(kc->throttle); 526 527 if (error) { 528 if (op_is_write(job->op)) 529 job->write_err |= error; 530 else 531 job->read_err = 1; 532 533 if (!(job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))) { 534 push(&kc->complete_jobs, job); 535 wake(kc); 536 return; 537 } 538 } 539 540 if (op_is_write(job->op)) 541 push(&kc->complete_jobs, job); 542 543 else { 544 job->op = REQ_OP_WRITE; 545 push(&kc->io_jobs, job); 546 } 547 548 wake(kc); 549 } 550 551 /* 552 * Request io on as many buffer heads as we can currently get for 553 * a particular job. 554 */ 555 static int run_io_job(struct kcopyd_job *job) 556 { 557 int r; 558 struct dm_io_request io_req = { 559 .bi_opf = job->op, 560 .mem.type = DM_IO_PAGE_LIST, 561 .mem.ptr.pl = job->pages, 562 .mem.offset = 0, 563 .notify.fn = complete_io, 564 .notify.context = job, 565 .client = job->kc->io_client, 566 }; 567 568 /* 569 * If we need to write sequentially and some reads or writes failed, 570 * no point in continuing. 571 */ 572 if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) && 573 job->master_job->write_err) { 574 job->write_err = job->master_job->write_err; 575 return -EIO; 576 } 577 578 io_job_start(job->kc->throttle); 579 580 if (job->op == REQ_OP_READ) 581 r = dm_io(&io_req, 1, &job->source, NULL); 582 else 583 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 584 585 return r; 586 } 587 588 static int run_pages_job(struct kcopyd_job *job) 589 { 590 int r; 591 unsigned int nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 592 593 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 594 if (!r) { 595 /* this job is ready for io */ 596 push(&job->kc->io_jobs, job); 597 return 0; 598 } 599 600 if (r == -ENOMEM) 601 /* can't complete now */ 602 return 1; 603 604 return r; 605 } 606 607 /* 608 * Run through a list for as long as possible. Returns the count 609 * of successful jobs. 610 */ 611 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 612 int (*fn)(struct kcopyd_job *)) 613 { 614 struct kcopyd_job *job; 615 int r, count = 0; 616 617 while ((job = pop(jobs, kc))) { 618 619 r = fn(job); 620 621 if (r < 0) { 622 /* error this rogue job */ 623 if (op_is_write(job->op)) 624 job->write_err = (unsigned long) -1L; 625 else 626 job->read_err = 1; 627 push(&kc->complete_jobs, job); 628 wake(kc); 629 break; 630 } 631 632 if (r > 0) { 633 /* 634 * We couldn't service this job ATM, so 635 * push this job back onto the list. 636 */ 637 push_head(jobs, job); 638 break; 639 } 640 641 count++; 642 } 643 644 return count; 645 } 646 647 /* 648 * kcopyd does this every time it's woken up. 649 */ 650 static void do_work(struct work_struct *work) 651 { 652 struct dm_kcopyd_client *kc = container_of(work, 653 struct dm_kcopyd_client, kcopyd_work); 654 struct blk_plug plug; 655 656 /* 657 * The order that these are called is *very* important. 658 * complete jobs can free some pages for pages jobs. 659 * Pages jobs when successful will jump onto the io jobs 660 * list. io jobs call wake when they complete and it all 661 * starts again. 662 */ 663 spin_lock_irq(&kc->job_lock); 664 list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs); 665 spin_unlock_irq(&kc->job_lock); 666 667 blk_start_plug(&plug); 668 process_jobs(&kc->complete_jobs, kc, run_complete_job); 669 process_jobs(&kc->pages_jobs, kc, run_pages_job); 670 process_jobs(&kc->io_jobs, kc, run_io_job); 671 blk_finish_plug(&plug); 672 } 673 674 /* 675 * If we are copying a small region we just dispatch a single job 676 * to do the copy, otherwise the io has to be split up into many 677 * jobs. 678 */ 679 static void dispatch_job(struct kcopyd_job *job) 680 { 681 struct dm_kcopyd_client *kc = job->kc; 682 683 atomic_inc(&kc->nr_jobs); 684 if (unlikely(!job->source.count)) 685 push(&kc->callback_jobs, job); 686 else if (job->pages == &zero_page_list) 687 push(&kc->io_jobs, job); 688 else 689 push(&kc->pages_jobs, job); 690 wake(kc); 691 } 692 693 static void segment_complete(int read_err, unsigned long write_err, 694 void *context) 695 { 696 /* FIXME: tidy this function */ 697 sector_t progress = 0; 698 sector_t count = 0; 699 struct kcopyd_job *sub_job = context; 700 struct kcopyd_job *job = sub_job->master_job; 701 struct dm_kcopyd_client *kc = job->kc; 702 703 mutex_lock(&job->lock); 704 705 /* update the error */ 706 if (read_err) 707 job->read_err = 1; 708 709 if (write_err) 710 job->write_err |= write_err; 711 712 /* 713 * Only dispatch more work if there hasn't been an error. 714 */ 715 if ((!job->read_err && !job->write_err) || 716 job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) { 717 /* get the next chunk of work */ 718 progress = job->progress; 719 count = job->source.count - progress; 720 if (count) { 721 if (count > kc->sub_job_size) 722 count = kc->sub_job_size; 723 724 job->progress += count; 725 } 726 } 727 mutex_unlock(&job->lock); 728 729 if (count) { 730 int i; 731 732 *sub_job = *job; 733 sub_job->write_offset = progress; 734 sub_job->source.sector += progress; 735 sub_job->source.count = count; 736 737 for (i = 0; i < job->num_dests; i++) { 738 sub_job->dests[i].sector += progress; 739 sub_job->dests[i].count = count; 740 } 741 742 sub_job->fn = segment_complete; 743 sub_job->context = sub_job; 744 dispatch_job(sub_job); 745 746 } else if (atomic_dec_and_test(&job->sub_jobs)) { 747 748 /* 749 * Queue the completion callback to the kcopyd thread. 750 * 751 * Some callers assume that all the completions are called 752 * from a single thread and don't race with each other. 753 * 754 * We must not call the callback directly here because this 755 * code may not be executing in the thread. 756 */ 757 push(&kc->complete_jobs, job); 758 wake(kc); 759 } 760 } 761 762 /* 763 * Create some sub jobs to share the work between them. 764 */ 765 static void split_job(struct kcopyd_job *master_job) 766 { 767 int i; 768 769 atomic_inc(&master_job->kc->nr_jobs); 770 771 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 772 for (i = 0; i < SPLIT_COUNT; i++) { 773 master_job[i + 1].master_job = master_job; 774 segment_complete(0, 0u, &master_job[i + 1]); 775 } 776 } 777 778 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 779 unsigned int num_dests, struct dm_io_region *dests, 780 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 781 { 782 struct kcopyd_job *job; 783 int i; 784 785 /* 786 * Allocate an array of jobs consisting of one master job 787 * followed by SPLIT_COUNT sub jobs. 788 */ 789 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 790 mutex_init(&job->lock); 791 792 /* 793 * set up for the read. 794 */ 795 job->kc = kc; 796 job->flags = flags; 797 job->read_err = 0; 798 job->write_err = 0; 799 800 job->num_dests = num_dests; 801 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 802 803 /* 804 * If one of the destination is a host-managed zoned block device, 805 * we need to write sequentially. If one of the destination is a 806 * host-aware device, then leave it to the caller to choose what to do. 807 */ 808 if (!(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) { 809 for (i = 0; i < job->num_dests; i++) { 810 if (bdev_is_zoned(dests[i].bdev)) { 811 job->flags |= BIT(DM_KCOPYD_WRITE_SEQ); 812 break; 813 } 814 } 815 } 816 817 /* 818 * If we need to write sequentially, errors cannot be ignored. 819 */ 820 if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) && 821 job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) 822 job->flags &= ~BIT(DM_KCOPYD_IGNORE_ERROR); 823 824 if (from) { 825 job->source = *from; 826 job->pages = NULL; 827 job->op = REQ_OP_READ; 828 } else { 829 memset(&job->source, 0, sizeof(job->source)); 830 job->source.count = job->dests[0].count; 831 job->pages = &zero_page_list; 832 833 /* 834 * Use WRITE ZEROES to optimize zeroing if all dests support it. 835 */ 836 job->op = REQ_OP_WRITE_ZEROES; 837 for (i = 0; i < job->num_dests; i++) 838 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 839 job->op = REQ_OP_WRITE; 840 break; 841 } 842 } 843 844 job->fn = fn; 845 job->context = context; 846 job->master_job = job; 847 job->write_offset = 0; 848 849 if (job->source.count <= kc->sub_job_size) 850 dispatch_job(job); 851 else { 852 job->progress = 0; 853 split_job(job); 854 } 855 } 856 EXPORT_SYMBOL(dm_kcopyd_copy); 857 858 void dm_kcopyd_zero(struct dm_kcopyd_client *kc, 859 unsigned int num_dests, struct dm_io_region *dests, 860 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 861 { 862 dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 863 } 864 EXPORT_SYMBOL(dm_kcopyd_zero); 865 866 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 867 dm_kcopyd_notify_fn fn, void *context) 868 { 869 struct kcopyd_job *job; 870 871 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 872 873 memset(job, 0, sizeof(struct kcopyd_job)); 874 job->kc = kc; 875 job->fn = fn; 876 job->context = context; 877 job->master_job = job; 878 879 atomic_inc(&kc->nr_jobs); 880 881 return job; 882 } 883 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 884 885 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 886 { 887 struct kcopyd_job *job = j; 888 struct dm_kcopyd_client *kc = job->kc; 889 890 job->read_err = read_err; 891 job->write_err = write_err; 892 893 push(&kc->callback_jobs, job); 894 wake(kc); 895 } 896 EXPORT_SYMBOL(dm_kcopyd_do_callback); 897 898 /* 899 * Cancels a kcopyd job, eg. someone might be deactivating a 900 * mirror. 901 */ 902 #if 0 903 int kcopyd_cancel(struct kcopyd_job *job, int block) 904 { 905 /* FIXME: finish */ 906 return -1; 907 } 908 #endif /* 0 */ 909 910 /* 911 *--------------------------------------------------------------- 912 * Client setup 913 *--------------------------------------------------------------- 914 */ 915 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 916 { 917 int r; 918 unsigned int reserve_pages; 919 struct dm_kcopyd_client *kc; 920 921 kc = kzalloc(sizeof(*kc), GFP_KERNEL); 922 if (!kc) 923 return ERR_PTR(-ENOMEM); 924 925 spin_lock_init(&kc->job_lock); 926 INIT_LIST_HEAD(&kc->callback_jobs); 927 INIT_LIST_HEAD(&kc->complete_jobs); 928 INIT_LIST_HEAD(&kc->io_jobs); 929 INIT_LIST_HEAD(&kc->pages_jobs); 930 kc->throttle = throttle; 931 932 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache); 933 if (r) 934 goto bad_slab; 935 936 INIT_WORK(&kc->kcopyd_work, do_work); 937 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 938 if (!kc->kcopyd_wq) { 939 r = -ENOMEM; 940 goto bad_workqueue; 941 } 942 943 kc->sub_job_size = dm_get_kcopyd_subjob_size(); 944 reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE); 945 946 kc->pages = NULL; 947 kc->nr_reserved_pages = kc->nr_free_pages = 0; 948 r = client_reserve_pages(kc, reserve_pages); 949 if (r) 950 goto bad_client_pages; 951 952 kc->io_client = dm_io_client_create(); 953 if (IS_ERR(kc->io_client)) { 954 r = PTR_ERR(kc->io_client); 955 goto bad_io_client; 956 } 957 958 init_waitqueue_head(&kc->destroyq); 959 atomic_set(&kc->nr_jobs, 0); 960 961 return kc; 962 963 bad_io_client: 964 client_free_pages(kc); 965 bad_client_pages: 966 destroy_workqueue(kc->kcopyd_wq); 967 bad_workqueue: 968 mempool_exit(&kc->job_pool); 969 bad_slab: 970 kfree(kc); 971 972 return ERR_PTR(r); 973 } 974 EXPORT_SYMBOL(dm_kcopyd_client_create); 975 976 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 977 { 978 /* Wait for completion of all jobs submitted by this client. */ 979 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 980 981 BUG_ON(!list_empty(&kc->callback_jobs)); 982 BUG_ON(!list_empty(&kc->complete_jobs)); 983 BUG_ON(!list_empty(&kc->io_jobs)); 984 BUG_ON(!list_empty(&kc->pages_jobs)); 985 destroy_workqueue(kc->kcopyd_wq); 986 dm_io_client_destroy(kc->io_client); 987 client_free_pages(kc); 988 mempool_exit(&kc->job_pool); 989 kfree(kc); 990 } 991 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 992 993 void dm_kcopyd_client_flush(struct dm_kcopyd_client *kc) 994 { 995 flush_workqueue(kc->kcopyd_wq); 996 } 997 EXPORT_SYMBOL(dm_kcopyd_client_flush); 998