1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SPLIT_COUNT 8 32 #define MIN_JOBS 8 33 34 #define DEFAULT_SUB_JOB_SIZE_KB 512 35 #define MAX_SUB_JOB_SIZE_KB 1024 36 37 static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB; 38 39 module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR); 40 MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients"); 41 42 static unsigned dm_get_kcopyd_subjob_size(void) 43 { 44 unsigned sub_job_size_kb; 45 46 sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb, 47 DEFAULT_SUB_JOB_SIZE_KB, 48 MAX_SUB_JOB_SIZE_KB); 49 50 return sub_job_size_kb << 1; 51 } 52 53 /*----------------------------------------------------------------- 54 * Each kcopyd client has its own little pool of preallocated 55 * pages for kcopyd io. 56 *---------------------------------------------------------------*/ 57 struct dm_kcopyd_client { 58 struct page_list *pages; 59 unsigned nr_reserved_pages; 60 unsigned nr_free_pages; 61 unsigned sub_job_size; 62 63 struct dm_io_client *io_client; 64 65 wait_queue_head_t destroyq; 66 67 mempool_t job_pool; 68 69 struct workqueue_struct *kcopyd_wq; 70 struct work_struct kcopyd_work; 71 72 struct dm_kcopyd_throttle *throttle; 73 74 atomic_t nr_jobs; 75 76 /* 77 * We maintain four lists of jobs: 78 * 79 * i) jobs waiting for pages 80 * ii) jobs that have pages, and are waiting for the io to be issued. 81 * iii) jobs that don't need to do any IO and just run a callback 82 * iv) jobs that have completed. 83 * 84 * All four of these are protected by job_lock. 85 */ 86 spinlock_t job_lock; 87 struct list_head callback_jobs; 88 struct list_head complete_jobs; 89 struct list_head io_jobs; 90 struct list_head pages_jobs; 91 }; 92 93 static struct page_list zero_page_list; 94 95 static DEFINE_SPINLOCK(throttle_spinlock); 96 97 /* 98 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 99 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 100 * by 2. 101 */ 102 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 103 104 /* 105 * Sleep this number of milliseconds. 106 * 107 * The value was decided experimentally. 108 * Smaller values seem to cause an increased copy rate above the limit. 109 * The reason for this is unknown but possibly due to jiffies rounding errors 110 * or read/write cache inside the disk. 111 */ 112 #define SLEEP_MSEC 100 113 114 /* 115 * Maximum number of sleep events. There is a theoretical livelock if more 116 * kcopyd clients do work simultaneously which this limit avoids. 117 */ 118 #define MAX_SLEEPS 10 119 120 static void io_job_start(struct dm_kcopyd_throttle *t) 121 { 122 unsigned throttle, now, difference; 123 int slept = 0, skew; 124 125 if (unlikely(!t)) 126 return; 127 128 try_again: 129 spin_lock_irq(&throttle_spinlock); 130 131 throttle = READ_ONCE(t->throttle); 132 133 if (likely(throttle >= 100)) 134 goto skip_limit; 135 136 now = jiffies; 137 difference = now - t->last_jiffies; 138 t->last_jiffies = now; 139 if (t->num_io_jobs) 140 t->io_period += difference; 141 t->total_period += difference; 142 143 /* 144 * Maintain sane values if we got a temporary overflow. 145 */ 146 if (unlikely(t->io_period > t->total_period)) 147 t->io_period = t->total_period; 148 149 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 150 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 151 t->total_period >>= shift; 152 t->io_period >>= shift; 153 } 154 155 skew = t->io_period - throttle * t->total_period / 100; 156 157 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 158 slept++; 159 spin_unlock_irq(&throttle_spinlock); 160 msleep(SLEEP_MSEC); 161 goto try_again; 162 } 163 164 skip_limit: 165 t->num_io_jobs++; 166 167 spin_unlock_irq(&throttle_spinlock); 168 } 169 170 static void io_job_finish(struct dm_kcopyd_throttle *t) 171 { 172 unsigned long flags; 173 174 if (unlikely(!t)) 175 return; 176 177 spin_lock_irqsave(&throttle_spinlock, flags); 178 179 t->num_io_jobs--; 180 181 if (likely(READ_ONCE(t->throttle) >= 100)) 182 goto skip_limit; 183 184 if (!t->num_io_jobs) { 185 unsigned now, difference; 186 187 now = jiffies; 188 difference = now - t->last_jiffies; 189 t->last_jiffies = now; 190 191 t->io_period += difference; 192 t->total_period += difference; 193 194 /* 195 * Maintain sane values if we got a temporary overflow. 196 */ 197 if (unlikely(t->io_period > t->total_period)) 198 t->io_period = t->total_period; 199 } 200 201 skip_limit: 202 spin_unlock_irqrestore(&throttle_spinlock, flags); 203 } 204 205 206 static void wake(struct dm_kcopyd_client *kc) 207 { 208 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 209 } 210 211 /* 212 * Obtain one page for the use of kcopyd. 213 */ 214 static struct page_list *alloc_pl(gfp_t gfp) 215 { 216 struct page_list *pl; 217 218 pl = kmalloc(sizeof(*pl), gfp); 219 if (!pl) 220 return NULL; 221 222 pl->page = alloc_page(gfp); 223 if (!pl->page) { 224 kfree(pl); 225 return NULL; 226 } 227 228 return pl; 229 } 230 231 static void free_pl(struct page_list *pl) 232 { 233 __free_page(pl->page); 234 kfree(pl); 235 } 236 237 /* 238 * Add the provided pages to a client's free page list, releasing 239 * back to the system any beyond the reserved_pages limit. 240 */ 241 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 242 { 243 struct page_list *next; 244 245 do { 246 next = pl->next; 247 248 if (kc->nr_free_pages >= kc->nr_reserved_pages) 249 free_pl(pl); 250 else { 251 pl->next = kc->pages; 252 kc->pages = pl; 253 kc->nr_free_pages++; 254 } 255 256 pl = next; 257 } while (pl); 258 } 259 260 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 261 unsigned int nr, struct page_list **pages) 262 { 263 struct page_list *pl; 264 265 *pages = NULL; 266 267 do { 268 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 269 if (unlikely(!pl)) { 270 /* Use reserved pages */ 271 pl = kc->pages; 272 if (unlikely(!pl)) 273 goto out_of_memory; 274 kc->pages = pl->next; 275 kc->nr_free_pages--; 276 } 277 pl->next = *pages; 278 *pages = pl; 279 } while (--nr); 280 281 return 0; 282 283 out_of_memory: 284 if (*pages) 285 kcopyd_put_pages(kc, *pages); 286 return -ENOMEM; 287 } 288 289 /* 290 * These three functions resize the page pool. 291 */ 292 static void drop_pages(struct page_list *pl) 293 { 294 struct page_list *next; 295 296 while (pl) { 297 next = pl->next; 298 free_pl(pl); 299 pl = next; 300 } 301 } 302 303 /* 304 * Allocate and reserve nr_pages for the use of a specific client. 305 */ 306 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 307 { 308 unsigned i; 309 struct page_list *pl = NULL, *next; 310 311 for (i = 0; i < nr_pages; i++) { 312 next = alloc_pl(GFP_KERNEL); 313 if (!next) { 314 if (pl) 315 drop_pages(pl); 316 return -ENOMEM; 317 } 318 next->next = pl; 319 pl = next; 320 } 321 322 kc->nr_reserved_pages += nr_pages; 323 kcopyd_put_pages(kc, pl); 324 325 return 0; 326 } 327 328 static void client_free_pages(struct dm_kcopyd_client *kc) 329 { 330 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 331 drop_pages(kc->pages); 332 kc->pages = NULL; 333 kc->nr_free_pages = kc->nr_reserved_pages = 0; 334 } 335 336 /*----------------------------------------------------------------- 337 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 338 * for this reason we use a mempool to prevent the client from 339 * ever having to do io (which could cause a deadlock). 340 *---------------------------------------------------------------*/ 341 struct kcopyd_job { 342 struct dm_kcopyd_client *kc; 343 struct list_head list; 344 unsigned flags; 345 346 /* 347 * Error state of the job. 348 */ 349 int read_err; 350 unsigned long write_err; 351 352 /* 353 * Either READ or WRITE 354 */ 355 int rw; 356 struct dm_io_region source; 357 358 /* 359 * The destinations for the transfer. 360 */ 361 unsigned int num_dests; 362 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 363 364 struct page_list *pages; 365 366 /* 367 * Set this to ensure you are notified when the job has 368 * completed. 'context' is for callback to use. 369 */ 370 dm_kcopyd_notify_fn fn; 371 void *context; 372 373 /* 374 * These fields are only used if the job has been split 375 * into more manageable parts. 376 */ 377 struct mutex lock; 378 atomic_t sub_jobs; 379 sector_t progress; 380 sector_t write_offset; 381 382 struct kcopyd_job *master_job; 383 }; 384 385 static struct kmem_cache *_job_cache; 386 387 int __init dm_kcopyd_init(void) 388 { 389 _job_cache = kmem_cache_create("kcopyd_job", 390 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 391 __alignof__(struct kcopyd_job), 0, NULL); 392 if (!_job_cache) 393 return -ENOMEM; 394 395 zero_page_list.next = &zero_page_list; 396 zero_page_list.page = ZERO_PAGE(0); 397 398 return 0; 399 } 400 401 void dm_kcopyd_exit(void) 402 { 403 kmem_cache_destroy(_job_cache); 404 _job_cache = NULL; 405 } 406 407 /* 408 * Functions to push and pop a job onto the head of a given job 409 * list. 410 */ 411 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 412 struct dm_kcopyd_client *kc) 413 { 414 struct kcopyd_job *job; 415 416 /* 417 * For I/O jobs, pop any read, any write without sequential write 418 * constraint and sequential writes that are at the right position. 419 */ 420 list_for_each_entry(job, jobs, list) { 421 if (job->rw == READ || !(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) { 422 list_del(&job->list); 423 return job; 424 } 425 426 if (job->write_offset == job->master_job->write_offset) { 427 job->master_job->write_offset += job->source.count; 428 list_del(&job->list); 429 return job; 430 } 431 } 432 433 return NULL; 434 } 435 436 static struct kcopyd_job *pop(struct list_head *jobs, 437 struct dm_kcopyd_client *kc) 438 { 439 struct kcopyd_job *job = NULL; 440 441 spin_lock_irq(&kc->job_lock); 442 443 if (!list_empty(jobs)) { 444 if (jobs == &kc->io_jobs) 445 job = pop_io_job(jobs, kc); 446 else { 447 job = list_entry(jobs->next, struct kcopyd_job, list); 448 list_del(&job->list); 449 } 450 } 451 spin_unlock_irq(&kc->job_lock); 452 453 return job; 454 } 455 456 static void push(struct list_head *jobs, struct kcopyd_job *job) 457 { 458 unsigned long flags; 459 struct dm_kcopyd_client *kc = job->kc; 460 461 spin_lock_irqsave(&kc->job_lock, flags); 462 list_add_tail(&job->list, jobs); 463 spin_unlock_irqrestore(&kc->job_lock, flags); 464 } 465 466 467 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 468 { 469 struct dm_kcopyd_client *kc = job->kc; 470 471 spin_lock_irq(&kc->job_lock); 472 list_add(&job->list, jobs); 473 spin_unlock_irq(&kc->job_lock); 474 } 475 476 /* 477 * These three functions process 1 item from the corresponding 478 * job list. 479 * 480 * They return: 481 * < 0: error 482 * 0: success 483 * > 0: can't process yet. 484 */ 485 static int run_complete_job(struct kcopyd_job *job) 486 { 487 void *context = job->context; 488 int read_err = job->read_err; 489 unsigned long write_err = job->write_err; 490 dm_kcopyd_notify_fn fn = job->fn; 491 struct dm_kcopyd_client *kc = job->kc; 492 493 if (job->pages && job->pages != &zero_page_list) 494 kcopyd_put_pages(kc, job->pages); 495 /* 496 * If this is the master job, the sub jobs have already 497 * completed so we can free everything. 498 */ 499 if (job->master_job == job) { 500 mutex_destroy(&job->lock); 501 mempool_free(job, &kc->job_pool); 502 } 503 fn(read_err, write_err, context); 504 505 if (atomic_dec_and_test(&kc->nr_jobs)) 506 wake_up(&kc->destroyq); 507 508 cond_resched(); 509 510 return 0; 511 } 512 513 static void complete_io(unsigned long error, void *context) 514 { 515 struct kcopyd_job *job = (struct kcopyd_job *) context; 516 struct dm_kcopyd_client *kc = job->kc; 517 518 io_job_finish(kc->throttle); 519 520 if (error) { 521 if (op_is_write(job->rw)) 522 job->write_err |= error; 523 else 524 job->read_err = 1; 525 526 if (!(job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))) { 527 push(&kc->complete_jobs, job); 528 wake(kc); 529 return; 530 } 531 } 532 533 if (op_is_write(job->rw)) 534 push(&kc->complete_jobs, job); 535 536 else { 537 job->rw = WRITE; 538 push(&kc->io_jobs, job); 539 } 540 541 wake(kc); 542 } 543 544 /* 545 * Request io on as many buffer heads as we can currently get for 546 * a particular job. 547 */ 548 static int run_io_job(struct kcopyd_job *job) 549 { 550 int r; 551 struct dm_io_request io_req = { 552 .bi_op = job->rw, 553 .bi_op_flags = 0, 554 .mem.type = DM_IO_PAGE_LIST, 555 .mem.ptr.pl = job->pages, 556 .mem.offset = 0, 557 .notify.fn = complete_io, 558 .notify.context = job, 559 .client = job->kc->io_client, 560 }; 561 562 /* 563 * If we need to write sequentially and some reads or writes failed, 564 * no point in continuing. 565 */ 566 if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) && 567 job->master_job->write_err) { 568 job->write_err = job->master_job->write_err; 569 return -EIO; 570 } 571 572 io_job_start(job->kc->throttle); 573 574 if (job->rw == READ) 575 r = dm_io(&io_req, 1, &job->source, NULL); 576 else 577 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 578 579 return r; 580 } 581 582 static int run_pages_job(struct kcopyd_job *job) 583 { 584 int r; 585 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 586 587 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 588 if (!r) { 589 /* this job is ready for io */ 590 push(&job->kc->io_jobs, job); 591 return 0; 592 } 593 594 if (r == -ENOMEM) 595 /* can't complete now */ 596 return 1; 597 598 return r; 599 } 600 601 /* 602 * Run through a list for as long as possible. Returns the count 603 * of successful jobs. 604 */ 605 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 606 int (*fn) (struct kcopyd_job *)) 607 { 608 struct kcopyd_job *job; 609 int r, count = 0; 610 611 while ((job = pop(jobs, kc))) { 612 613 r = fn(job); 614 615 if (r < 0) { 616 /* error this rogue job */ 617 if (op_is_write(job->rw)) 618 job->write_err = (unsigned long) -1L; 619 else 620 job->read_err = 1; 621 push(&kc->complete_jobs, job); 622 wake(kc); 623 break; 624 } 625 626 if (r > 0) { 627 /* 628 * We couldn't service this job ATM, so 629 * push this job back onto the list. 630 */ 631 push_head(jobs, job); 632 break; 633 } 634 635 count++; 636 } 637 638 return count; 639 } 640 641 /* 642 * kcopyd does this every time it's woken up. 643 */ 644 static void do_work(struct work_struct *work) 645 { 646 struct dm_kcopyd_client *kc = container_of(work, 647 struct dm_kcopyd_client, kcopyd_work); 648 struct blk_plug plug; 649 650 /* 651 * The order that these are called is *very* important. 652 * complete jobs can free some pages for pages jobs. 653 * Pages jobs when successful will jump onto the io jobs 654 * list. io jobs call wake when they complete and it all 655 * starts again. 656 */ 657 spin_lock_irq(&kc->job_lock); 658 list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs); 659 spin_unlock_irq(&kc->job_lock); 660 661 blk_start_plug(&plug); 662 process_jobs(&kc->complete_jobs, kc, run_complete_job); 663 process_jobs(&kc->pages_jobs, kc, run_pages_job); 664 process_jobs(&kc->io_jobs, kc, run_io_job); 665 blk_finish_plug(&plug); 666 } 667 668 /* 669 * If we are copying a small region we just dispatch a single job 670 * to do the copy, otherwise the io has to be split up into many 671 * jobs. 672 */ 673 static void dispatch_job(struct kcopyd_job *job) 674 { 675 struct dm_kcopyd_client *kc = job->kc; 676 atomic_inc(&kc->nr_jobs); 677 if (unlikely(!job->source.count)) 678 push(&kc->callback_jobs, job); 679 else if (job->pages == &zero_page_list) 680 push(&kc->io_jobs, job); 681 else 682 push(&kc->pages_jobs, job); 683 wake(kc); 684 } 685 686 static void segment_complete(int read_err, unsigned long write_err, 687 void *context) 688 { 689 /* FIXME: tidy this function */ 690 sector_t progress = 0; 691 sector_t count = 0; 692 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 693 struct kcopyd_job *job = sub_job->master_job; 694 struct dm_kcopyd_client *kc = job->kc; 695 696 mutex_lock(&job->lock); 697 698 /* update the error */ 699 if (read_err) 700 job->read_err = 1; 701 702 if (write_err) 703 job->write_err |= write_err; 704 705 /* 706 * Only dispatch more work if there hasn't been an error. 707 */ 708 if ((!job->read_err && !job->write_err) || 709 job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) { 710 /* get the next chunk of work */ 711 progress = job->progress; 712 count = job->source.count - progress; 713 if (count) { 714 if (count > kc->sub_job_size) 715 count = kc->sub_job_size; 716 717 job->progress += count; 718 } 719 } 720 mutex_unlock(&job->lock); 721 722 if (count) { 723 int i; 724 725 *sub_job = *job; 726 sub_job->write_offset = progress; 727 sub_job->source.sector += progress; 728 sub_job->source.count = count; 729 730 for (i = 0; i < job->num_dests; i++) { 731 sub_job->dests[i].sector += progress; 732 sub_job->dests[i].count = count; 733 } 734 735 sub_job->fn = segment_complete; 736 sub_job->context = sub_job; 737 dispatch_job(sub_job); 738 739 } else if (atomic_dec_and_test(&job->sub_jobs)) { 740 741 /* 742 * Queue the completion callback to the kcopyd thread. 743 * 744 * Some callers assume that all the completions are called 745 * from a single thread and don't race with each other. 746 * 747 * We must not call the callback directly here because this 748 * code may not be executing in the thread. 749 */ 750 push(&kc->complete_jobs, job); 751 wake(kc); 752 } 753 } 754 755 /* 756 * Create some sub jobs to share the work between them. 757 */ 758 static void split_job(struct kcopyd_job *master_job) 759 { 760 int i; 761 762 atomic_inc(&master_job->kc->nr_jobs); 763 764 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 765 for (i = 0; i < SPLIT_COUNT; i++) { 766 master_job[i + 1].master_job = master_job; 767 segment_complete(0, 0u, &master_job[i + 1]); 768 } 769 } 770 771 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 772 unsigned int num_dests, struct dm_io_region *dests, 773 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 774 { 775 struct kcopyd_job *job; 776 int i; 777 778 /* 779 * Allocate an array of jobs consisting of one master job 780 * followed by SPLIT_COUNT sub jobs. 781 */ 782 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 783 mutex_init(&job->lock); 784 785 /* 786 * set up for the read. 787 */ 788 job->kc = kc; 789 job->flags = flags; 790 job->read_err = 0; 791 job->write_err = 0; 792 793 job->num_dests = num_dests; 794 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 795 796 /* 797 * If one of the destination is a host-managed zoned block device, 798 * we need to write sequentially. If one of the destination is a 799 * host-aware device, then leave it to the caller to choose what to do. 800 */ 801 if (!(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) { 802 for (i = 0; i < job->num_dests; i++) { 803 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 804 job->flags |= BIT(DM_KCOPYD_WRITE_SEQ); 805 break; 806 } 807 } 808 } 809 810 /* 811 * If we need to write sequentially, errors cannot be ignored. 812 */ 813 if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) && 814 job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) 815 job->flags &= ~BIT(DM_KCOPYD_IGNORE_ERROR); 816 817 if (from) { 818 job->source = *from; 819 job->pages = NULL; 820 job->rw = READ; 821 } else { 822 memset(&job->source, 0, sizeof job->source); 823 job->source.count = job->dests[0].count; 824 job->pages = &zero_page_list; 825 826 /* 827 * Use WRITE ZEROES to optimize zeroing if all dests support it. 828 */ 829 job->rw = REQ_OP_WRITE_ZEROES; 830 for (i = 0; i < job->num_dests; i++) 831 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 832 job->rw = WRITE; 833 break; 834 } 835 } 836 837 job->fn = fn; 838 job->context = context; 839 job->master_job = job; 840 job->write_offset = 0; 841 842 if (job->source.count <= kc->sub_job_size) 843 dispatch_job(job); 844 else { 845 job->progress = 0; 846 split_job(job); 847 } 848 } 849 EXPORT_SYMBOL(dm_kcopyd_copy); 850 851 void dm_kcopyd_zero(struct dm_kcopyd_client *kc, 852 unsigned num_dests, struct dm_io_region *dests, 853 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 854 { 855 dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 856 } 857 EXPORT_SYMBOL(dm_kcopyd_zero); 858 859 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 860 dm_kcopyd_notify_fn fn, void *context) 861 { 862 struct kcopyd_job *job; 863 864 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 865 866 memset(job, 0, sizeof(struct kcopyd_job)); 867 job->kc = kc; 868 job->fn = fn; 869 job->context = context; 870 job->master_job = job; 871 872 atomic_inc(&kc->nr_jobs); 873 874 return job; 875 } 876 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 877 878 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 879 { 880 struct kcopyd_job *job = j; 881 struct dm_kcopyd_client *kc = job->kc; 882 883 job->read_err = read_err; 884 job->write_err = write_err; 885 886 push(&kc->callback_jobs, job); 887 wake(kc); 888 } 889 EXPORT_SYMBOL(dm_kcopyd_do_callback); 890 891 /* 892 * Cancels a kcopyd job, eg. someone might be deactivating a 893 * mirror. 894 */ 895 #if 0 896 int kcopyd_cancel(struct kcopyd_job *job, int block) 897 { 898 /* FIXME: finish */ 899 return -1; 900 } 901 #endif /* 0 */ 902 903 /*----------------------------------------------------------------- 904 * Client setup 905 *---------------------------------------------------------------*/ 906 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 907 { 908 int r; 909 unsigned reserve_pages; 910 struct dm_kcopyd_client *kc; 911 912 kc = kzalloc(sizeof(*kc), GFP_KERNEL); 913 if (!kc) 914 return ERR_PTR(-ENOMEM); 915 916 spin_lock_init(&kc->job_lock); 917 INIT_LIST_HEAD(&kc->callback_jobs); 918 INIT_LIST_HEAD(&kc->complete_jobs); 919 INIT_LIST_HEAD(&kc->io_jobs); 920 INIT_LIST_HEAD(&kc->pages_jobs); 921 kc->throttle = throttle; 922 923 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache); 924 if (r) 925 goto bad_slab; 926 927 INIT_WORK(&kc->kcopyd_work, do_work); 928 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 929 if (!kc->kcopyd_wq) { 930 r = -ENOMEM; 931 goto bad_workqueue; 932 } 933 934 kc->sub_job_size = dm_get_kcopyd_subjob_size(); 935 reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE); 936 937 kc->pages = NULL; 938 kc->nr_reserved_pages = kc->nr_free_pages = 0; 939 r = client_reserve_pages(kc, reserve_pages); 940 if (r) 941 goto bad_client_pages; 942 943 kc->io_client = dm_io_client_create(); 944 if (IS_ERR(kc->io_client)) { 945 r = PTR_ERR(kc->io_client); 946 goto bad_io_client; 947 } 948 949 init_waitqueue_head(&kc->destroyq); 950 atomic_set(&kc->nr_jobs, 0); 951 952 return kc; 953 954 bad_io_client: 955 client_free_pages(kc); 956 bad_client_pages: 957 destroy_workqueue(kc->kcopyd_wq); 958 bad_workqueue: 959 mempool_exit(&kc->job_pool); 960 bad_slab: 961 kfree(kc); 962 963 return ERR_PTR(r); 964 } 965 EXPORT_SYMBOL(dm_kcopyd_client_create); 966 967 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 968 { 969 /* Wait for completion of all jobs submitted by this client. */ 970 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 971 972 BUG_ON(!list_empty(&kc->callback_jobs)); 973 BUG_ON(!list_empty(&kc->complete_jobs)); 974 BUG_ON(!list_empty(&kc->io_jobs)); 975 BUG_ON(!list_empty(&kc->pages_jobs)); 976 destroy_workqueue(kc->kcopyd_wq); 977 dm_io_client_destroy(kc->io_client); 978 client_free_pages(kc); 979 mempool_exit(&kc->job_pool); 980 kfree(kc); 981 } 982 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 983 984 void dm_kcopyd_client_flush(struct dm_kcopyd_client *kc) 985 { 986 flush_workqueue(kc->kcopyd_wq); 987 } 988 EXPORT_SYMBOL(dm_kcopyd_client_flush); 989