1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SPLIT_COUNT 8 32 #define MIN_JOBS 8 33 34 #define DEFAULT_SUB_JOB_SIZE_KB 512 35 #define MAX_SUB_JOB_SIZE_KB 1024 36 37 static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB; 38 39 module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR); 40 MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients"); 41 42 static unsigned dm_get_kcopyd_subjob_size(void) 43 { 44 unsigned sub_job_size_kb; 45 46 sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb, 47 DEFAULT_SUB_JOB_SIZE_KB, 48 MAX_SUB_JOB_SIZE_KB); 49 50 return sub_job_size_kb << 1; 51 } 52 53 /*----------------------------------------------------------------- 54 * Each kcopyd client has its own little pool of preallocated 55 * pages for kcopyd io. 56 *---------------------------------------------------------------*/ 57 struct dm_kcopyd_client { 58 struct page_list *pages; 59 unsigned nr_reserved_pages; 60 unsigned nr_free_pages; 61 unsigned sub_job_size; 62 63 struct dm_io_client *io_client; 64 65 wait_queue_head_t destroyq; 66 67 mempool_t job_pool; 68 69 struct workqueue_struct *kcopyd_wq; 70 struct work_struct kcopyd_work; 71 72 struct dm_kcopyd_throttle *throttle; 73 74 atomic_t nr_jobs; 75 76 /* 77 * We maintain four lists of jobs: 78 * 79 * i) jobs waiting for pages 80 * ii) jobs that have pages, and are waiting for the io to be issued. 81 * iii) jobs that don't need to do any IO and just run a callback 82 * iv) jobs that have completed. 83 * 84 * All four of these are protected by job_lock. 85 */ 86 spinlock_t job_lock; 87 struct list_head callback_jobs; 88 struct list_head complete_jobs; 89 struct list_head io_jobs; 90 struct list_head pages_jobs; 91 }; 92 93 static struct page_list zero_page_list; 94 95 static DEFINE_SPINLOCK(throttle_spinlock); 96 97 /* 98 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 99 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 100 * by 2. 101 */ 102 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 103 104 /* 105 * Sleep this number of milliseconds. 106 * 107 * The value was decided experimentally. 108 * Smaller values seem to cause an increased copy rate above the limit. 109 * The reason for this is unknown but possibly due to jiffies rounding errors 110 * or read/write cache inside the disk. 111 */ 112 #define SLEEP_MSEC 100 113 114 /* 115 * Maximum number of sleep events. There is a theoretical livelock if more 116 * kcopyd clients do work simultaneously which this limit avoids. 117 */ 118 #define MAX_SLEEPS 10 119 120 static void io_job_start(struct dm_kcopyd_throttle *t) 121 { 122 unsigned throttle, now, difference; 123 int slept = 0, skew; 124 125 if (unlikely(!t)) 126 return; 127 128 try_again: 129 spin_lock_irq(&throttle_spinlock); 130 131 throttle = READ_ONCE(t->throttle); 132 133 if (likely(throttle >= 100)) 134 goto skip_limit; 135 136 now = jiffies; 137 difference = now - t->last_jiffies; 138 t->last_jiffies = now; 139 if (t->num_io_jobs) 140 t->io_period += difference; 141 t->total_period += difference; 142 143 /* 144 * Maintain sane values if we got a temporary overflow. 145 */ 146 if (unlikely(t->io_period > t->total_period)) 147 t->io_period = t->total_period; 148 149 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 150 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 151 t->total_period >>= shift; 152 t->io_period >>= shift; 153 } 154 155 skew = t->io_period - throttle * t->total_period / 100; 156 157 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 158 slept++; 159 spin_unlock_irq(&throttle_spinlock); 160 msleep(SLEEP_MSEC); 161 goto try_again; 162 } 163 164 skip_limit: 165 t->num_io_jobs++; 166 167 spin_unlock_irq(&throttle_spinlock); 168 } 169 170 static void io_job_finish(struct dm_kcopyd_throttle *t) 171 { 172 unsigned long flags; 173 174 if (unlikely(!t)) 175 return; 176 177 spin_lock_irqsave(&throttle_spinlock, flags); 178 179 t->num_io_jobs--; 180 181 if (likely(READ_ONCE(t->throttle) >= 100)) 182 goto skip_limit; 183 184 if (!t->num_io_jobs) { 185 unsigned now, difference; 186 187 now = jiffies; 188 difference = now - t->last_jiffies; 189 t->last_jiffies = now; 190 191 t->io_period += difference; 192 t->total_period += difference; 193 194 /* 195 * Maintain sane values if we got a temporary overflow. 196 */ 197 if (unlikely(t->io_period > t->total_period)) 198 t->io_period = t->total_period; 199 } 200 201 skip_limit: 202 spin_unlock_irqrestore(&throttle_spinlock, flags); 203 } 204 205 206 static void wake(struct dm_kcopyd_client *kc) 207 { 208 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 209 } 210 211 /* 212 * Obtain one page for the use of kcopyd. 213 */ 214 static struct page_list *alloc_pl(gfp_t gfp) 215 { 216 struct page_list *pl; 217 218 pl = kmalloc(sizeof(*pl), gfp); 219 if (!pl) 220 return NULL; 221 222 pl->page = alloc_page(gfp); 223 if (!pl->page) { 224 kfree(pl); 225 return NULL; 226 } 227 228 return pl; 229 } 230 231 static void free_pl(struct page_list *pl) 232 { 233 __free_page(pl->page); 234 kfree(pl); 235 } 236 237 /* 238 * Add the provided pages to a client's free page list, releasing 239 * back to the system any beyond the reserved_pages limit. 240 */ 241 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 242 { 243 struct page_list *next; 244 245 do { 246 next = pl->next; 247 248 if (kc->nr_free_pages >= kc->nr_reserved_pages) 249 free_pl(pl); 250 else { 251 pl->next = kc->pages; 252 kc->pages = pl; 253 kc->nr_free_pages++; 254 } 255 256 pl = next; 257 } while (pl); 258 } 259 260 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 261 unsigned int nr, struct page_list **pages) 262 { 263 struct page_list *pl; 264 265 *pages = NULL; 266 267 do { 268 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 269 if (unlikely(!pl)) { 270 /* Use reserved pages */ 271 pl = kc->pages; 272 if (unlikely(!pl)) 273 goto out_of_memory; 274 kc->pages = pl->next; 275 kc->nr_free_pages--; 276 } 277 pl->next = *pages; 278 *pages = pl; 279 } while (--nr); 280 281 return 0; 282 283 out_of_memory: 284 if (*pages) 285 kcopyd_put_pages(kc, *pages); 286 return -ENOMEM; 287 } 288 289 /* 290 * These three functions resize the page pool. 291 */ 292 static void drop_pages(struct page_list *pl) 293 { 294 struct page_list *next; 295 296 while (pl) { 297 next = pl->next; 298 free_pl(pl); 299 pl = next; 300 } 301 } 302 303 /* 304 * Allocate and reserve nr_pages for the use of a specific client. 305 */ 306 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 307 { 308 unsigned i; 309 struct page_list *pl = NULL, *next; 310 311 for (i = 0; i < nr_pages; i++) { 312 next = alloc_pl(GFP_KERNEL); 313 if (!next) { 314 if (pl) 315 drop_pages(pl); 316 return -ENOMEM; 317 } 318 next->next = pl; 319 pl = next; 320 } 321 322 kc->nr_reserved_pages += nr_pages; 323 kcopyd_put_pages(kc, pl); 324 325 return 0; 326 } 327 328 static void client_free_pages(struct dm_kcopyd_client *kc) 329 { 330 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 331 drop_pages(kc->pages); 332 kc->pages = NULL; 333 kc->nr_free_pages = kc->nr_reserved_pages = 0; 334 } 335 336 /*----------------------------------------------------------------- 337 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 338 * for this reason we use a mempool to prevent the client from 339 * ever having to do io (which could cause a deadlock). 340 *---------------------------------------------------------------*/ 341 struct kcopyd_job { 342 struct dm_kcopyd_client *kc; 343 struct list_head list; 344 unsigned long flags; 345 346 /* 347 * Error state of the job. 348 */ 349 int read_err; 350 unsigned long write_err; 351 352 /* 353 * Either READ or WRITE 354 */ 355 int rw; 356 struct dm_io_region source; 357 358 /* 359 * The destinations for the transfer. 360 */ 361 unsigned int num_dests; 362 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 363 364 struct page_list *pages; 365 366 /* 367 * Set this to ensure you are notified when the job has 368 * completed. 'context' is for callback to use. 369 */ 370 dm_kcopyd_notify_fn fn; 371 void *context; 372 373 /* 374 * These fields are only used if the job has been split 375 * into more manageable parts. 376 */ 377 struct mutex lock; 378 atomic_t sub_jobs; 379 sector_t progress; 380 sector_t write_offset; 381 382 struct kcopyd_job *master_job; 383 }; 384 385 static struct kmem_cache *_job_cache; 386 387 int __init dm_kcopyd_init(void) 388 { 389 _job_cache = kmem_cache_create("kcopyd_job", 390 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 391 __alignof__(struct kcopyd_job), 0, NULL); 392 if (!_job_cache) 393 return -ENOMEM; 394 395 zero_page_list.next = &zero_page_list; 396 zero_page_list.page = ZERO_PAGE(0); 397 398 return 0; 399 } 400 401 void dm_kcopyd_exit(void) 402 { 403 kmem_cache_destroy(_job_cache); 404 _job_cache = NULL; 405 } 406 407 /* 408 * Functions to push and pop a job onto the head of a given job 409 * list. 410 */ 411 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 412 struct dm_kcopyd_client *kc) 413 { 414 struct kcopyd_job *job; 415 416 /* 417 * For I/O jobs, pop any read, any write without sequential write 418 * constraint and sequential writes that are at the right position. 419 */ 420 list_for_each_entry(job, jobs, list) { 421 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 422 list_del(&job->list); 423 return job; 424 } 425 426 if (job->write_offset == job->master_job->write_offset) { 427 job->master_job->write_offset += job->source.count; 428 list_del(&job->list); 429 return job; 430 } 431 } 432 433 return NULL; 434 } 435 436 static struct kcopyd_job *pop(struct list_head *jobs, 437 struct dm_kcopyd_client *kc) 438 { 439 struct kcopyd_job *job = NULL; 440 unsigned long flags; 441 442 spin_lock_irqsave(&kc->job_lock, flags); 443 444 if (!list_empty(jobs)) { 445 if (jobs == &kc->io_jobs) 446 job = pop_io_job(jobs, kc); 447 else { 448 job = list_entry(jobs->next, struct kcopyd_job, list); 449 list_del(&job->list); 450 } 451 } 452 spin_unlock_irqrestore(&kc->job_lock, flags); 453 454 return job; 455 } 456 457 static void push(struct list_head *jobs, struct kcopyd_job *job) 458 { 459 unsigned long flags; 460 struct dm_kcopyd_client *kc = job->kc; 461 462 spin_lock_irqsave(&kc->job_lock, flags); 463 list_add_tail(&job->list, jobs); 464 spin_unlock_irqrestore(&kc->job_lock, flags); 465 } 466 467 468 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 469 { 470 unsigned long flags; 471 struct dm_kcopyd_client *kc = job->kc; 472 473 spin_lock_irqsave(&kc->job_lock, flags); 474 list_add(&job->list, jobs); 475 spin_unlock_irqrestore(&kc->job_lock, flags); 476 } 477 478 /* 479 * These three functions process 1 item from the corresponding 480 * job list. 481 * 482 * They return: 483 * < 0: error 484 * 0: success 485 * > 0: can't process yet. 486 */ 487 static int run_complete_job(struct kcopyd_job *job) 488 { 489 void *context = job->context; 490 int read_err = job->read_err; 491 unsigned long write_err = job->write_err; 492 dm_kcopyd_notify_fn fn = job->fn; 493 struct dm_kcopyd_client *kc = job->kc; 494 495 if (job->pages && job->pages != &zero_page_list) 496 kcopyd_put_pages(kc, job->pages); 497 /* 498 * If this is the master job, the sub jobs have already 499 * completed so we can free everything. 500 */ 501 if (job->master_job == job) { 502 mutex_destroy(&job->lock); 503 mempool_free(job, &kc->job_pool); 504 } 505 fn(read_err, write_err, context); 506 507 if (atomic_dec_and_test(&kc->nr_jobs)) 508 wake_up(&kc->destroyq); 509 510 cond_resched(); 511 512 return 0; 513 } 514 515 static void complete_io(unsigned long error, void *context) 516 { 517 struct kcopyd_job *job = (struct kcopyd_job *) context; 518 struct dm_kcopyd_client *kc = job->kc; 519 520 io_job_finish(kc->throttle); 521 522 if (error) { 523 if (op_is_write(job->rw)) 524 job->write_err |= error; 525 else 526 job->read_err = 1; 527 528 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 529 push(&kc->complete_jobs, job); 530 wake(kc); 531 return; 532 } 533 } 534 535 if (op_is_write(job->rw)) 536 push(&kc->complete_jobs, job); 537 538 else { 539 job->rw = WRITE; 540 push(&kc->io_jobs, job); 541 } 542 543 wake(kc); 544 } 545 546 /* 547 * Request io on as many buffer heads as we can currently get for 548 * a particular job. 549 */ 550 static int run_io_job(struct kcopyd_job *job) 551 { 552 int r; 553 struct dm_io_request io_req = { 554 .bi_op = job->rw, 555 .bi_op_flags = 0, 556 .mem.type = DM_IO_PAGE_LIST, 557 .mem.ptr.pl = job->pages, 558 .mem.offset = 0, 559 .notify.fn = complete_io, 560 .notify.context = job, 561 .client = job->kc->io_client, 562 }; 563 564 /* 565 * If we need to write sequentially and some reads or writes failed, 566 * no point in continuing. 567 */ 568 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 569 job->master_job->write_err) { 570 job->write_err = job->master_job->write_err; 571 return -EIO; 572 } 573 574 io_job_start(job->kc->throttle); 575 576 if (job->rw == READ) 577 r = dm_io(&io_req, 1, &job->source, NULL); 578 else 579 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 580 581 return r; 582 } 583 584 static int run_pages_job(struct kcopyd_job *job) 585 { 586 int r; 587 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 588 589 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 590 if (!r) { 591 /* this job is ready for io */ 592 push(&job->kc->io_jobs, job); 593 return 0; 594 } 595 596 if (r == -ENOMEM) 597 /* can't complete now */ 598 return 1; 599 600 return r; 601 } 602 603 /* 604 * Run through a list for as long as possible. Returns the count 605 * of successful jobs. 606 */ 607 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 608 int (*fn) (struct kcopyd_job *)) 609 { 610 struct kcopyd_job *job; 611 int r, count = 0; 612 613 while ((job = pop(jobs, kc))) { 614 615 r = fn(job); 616 617 if (r < 0) { 618 /* error this rogue job */ 619 if (op_is_write(job->rw)) 620 job->write_err = (unsigned long) -1L; 621 else 622 job->read_err = 1; 623 push(&kc->complete_jobs, job); 624 wake(kc); 625 break; 626 } 627 628 if (r > 0) { 629 /* 630 * We couldn't service this job ATM, so 631 * push this job back onto the list. 632 */ 633 push_head(jobs, job); 634 break; 635 } 636 637 count++; 638 } 639 640 return count; 641 } 642 643 /* 644 * kcopyd does this every time it's woken up. 645 */ 646 static void do_work(struct work_struct *work) 647 { 648 struct dm_kcopyd_client *kc = container_of(work, 649 struct dm_kcopyd_client, kcopyd_work); 650 struct blk_plug plug; 651 unsigned long flags; 652 653 /* 654 * The order that these are called is *very* important. 655 * complete jobs can free some pages for pages jobs. 656 * Pages jobs when successful will jump onto the io jobs 657 * list. io jobs call wake when they complete and it all 658 * starts again. 659 */ 660 spin_lock_irqsave(&kc->job_lock, flags); 661 list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs); 662 spin_unlock_irqrestore(&kc->job_lock, flags); 663 664 blk_start_plug(&plug); 665 process_jobs(&kc->complete_jobs, kc, run_complete_job); 666 process_jobs(&kc->pages_jobs, kc, run_pages_job); 667 process_jobs(&kc->io_jobs, kc, run_io_job); 668 blk_finish_plug(&plug); 669 } 670 671 /* 672 * If we are copying a small region we just dispatch a single job 673 * to do the copy, otherwise the io has to be split up into many 674 * jobs. 675 */ 676 static void dispatch_job(struct kcopyd_job *job) 677 { 678 struct dm_kcopyd_client *kc = job->kc; 679 atomic_inc(&kc->nr_jobs); 680 if (unlikely(!job->source.count)) 681 push(&kc->callback_jobs, job); 682 else if (job->pages == &zero_page_list) 683 push(&kc->io_jobs, job); 684 else 685 push(&kc->pages_jobs, job); 686 wake(kc); 687 } 688 689 static void segment_complete(int read_err, unsigned long write_err, 690 void *context) 691 { 692 /* FIXME: tidy this function */ 693 sector_t progress = 0; 694 sector_t count = 0; 695 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 696 struct kcopyd_job *job = sub_job->master_job; 697 struct dm_kcopyd_client *kc = job->kc; 698 699 mutex_lock(&job->lock); 700 701 /* update the error */ 702 if (read_err) 703 job->read_err = 1; 704 705 if (write_err) 706 job->write_err |= write_err; 707 708 /* 709 * Only dispatch more work if there hasn't been an error. 710 */ 711 if ((!job->read_err && !job->write_err) || 712 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 713 /* get the next chunk of work */ 714 progress = job->progress; 715 count = job->source.count - progress; 716 if (count) { 717 if (count > kc->sub_job_size) 718 count = kc->sub_job_size; 719 720 job->progress += count; 721 } 722 } 723 mutex_unlock(&job->lock); 724 725 if (count) { 726 int i; 727 728 *sub_job = *job; 729 sub_job->write_offset = progress; 730 sub_job->source.sector += progress; 731 sub_job->source.count = count; 732 733 for (i = 0; i < job->num_dests; i++) { 734 sub_job->dests[i].sector += progress; 735 sub_job->dests[i].count = count; 736 } 737 738 sub_job->fn = segment_complete; 739 sub_job->context = sub_job; 740 dispatch_job(sub_job); 741 742 } else if (atomic_dec_and_test(&job->sub_jobs)) { 743 744 /* 745 * Queue the completion callback to the kcopyd thread. 746 * 747 * Some callers assume that all the completions are called 748 * from a single thread and don't race with each other. 749 * 750 * We must not call the callback directly here because this 751 * code may not be executing in the thread. 752 */ 753 push(&kc->complete_jobs, job); 754 wake(kc); 755 } 756 } 757 758 /* 759 * Create some sub jobs to share the work between them. 760 */ 761 static void split_job(struct kcopyd_job *master_job) 762 { 763 int i; 764 765 atomic_inc(&master_job->kc->nr_jobs); 766 767 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 768 for (i = 0; i < SPLIT_COUNT; i++) { 769 master_job[i + 1].master_job = master_job; 770 segment_complete(0, 0u, &master_job[i + 1]); 771 } 772 } 773 774 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 775 unsigned int num_dests, struct dm_io_region *dests, 776 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 777 { 778 struct kcopyd_job *job; 779 int i; 780 781 /* 782 * Allocate an array of jobs consisting of one master job 783 * followed by SPLIT_COUNT sub jobs. 784 */ 785 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 786 mutex_init(&job->lock); 787 788 /* 789 * set up for the read. 790 */ 791 job->kc = kc; 792 job->flags = flags; 793 job->read_err = 0; 794 job->write_err = 0; 795 796 job->num_dests = num_dests; 797 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 798 799 /* 800 * If one of the destination is a host-managed zoned block device, 801 * we need to write sequentially. If one of the destination is a 802 * host-aware device, then leave it to the caller to choose what to do. 803 */ 804 if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 805 for (i = 0; i < job->num_dests; i++) { 806 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 807 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags); 808 break; 809 } 810 } 811 } 812 813 /* 814 * If we need to write sequentially, errors cannot be ignored. 815 */ 816 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 817 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) 818 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags); 819 820 if (from) { 821 job->source = *from; 822 job->pages = NULL; 823 job->rw = READ; 824 } else { 825 memset(&job->source, 0, sizeof job->source); 826 job->source.count = job->dests[0].count; 827 job->pages = &zero_page_list; 828 829 /* 830 * Use WRITE ZEROES to optimize zeroing if all dests support it. 831 */ 832 job->rw = REQ_OP_WRITE_ZEROES; 833 for (i = 0; i < job->num_dests; i++) 834 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 835 job->rw = WRITE; 836 break; 837 } 838 } 839 840 job->fn = fn; 841 job->context = context; 842 job->master_job = job; 843 job->write_offset = 0; 844 845 if (job->source.count <= kc->sub_job_size) 846 dispatch_job(job); 847 else { 848 job->progress = 0; 849 split_job(job); 850 } 851 } 852 EXPORT_SYMBOL(dm_kcopyd_copy); 853 854 void dm_kcopyd_zero(struct dm_kcopyd_client *kc, 855 unsigned num_dests, struct dm_io_region *dests, 856 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 857 { 858 dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 859 } 860 EXPORT_SYMBOL(dm_kcopyd_zero); 861 862 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 863 dm_kcopyd_notify_fn fn, void *context) 864 { 865 struct kcopyd_job *job; 866 867 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 868 869 memset(job, 0, sizeof(struct kcopyd_job)); 870 job->kc = kc; 871 job->fn = fn; 872 job->context = context; 873 job->master_job = job; 874 875 atomic_inc(&kc->nr_jobs); 876 877 return job; 878 } 879 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 880 881 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 882 { 883 struct kcopyd_job *job = j; 884 struct dm_kcopyd_client *kc = job->kc; 885 886 job->read_err = read_err; 887 job->write_err = write_err; 888 889 push(&kc->callback_jobs, job); 890 wake(kc); 891 } 892 EXPORT_SYMBOL(dm_kcopyd_do_callback); 893 894 /* 895 * Cancels a kcopyd job, eg. someone might be deactivating a 896 * mirror. 897 */ 898 #if 0 899 int kcopyd_cancel(struct kcopyd_job *job, int block) 900 { 901 /* FIXME: finish */ 902 return -1; 903 } 904 #endif /* 0 */ 905 906 /*----------------------------------------------------------------- 907 * Client setup 908 *---------------------------------------------------------------*/ 909 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 910 { 911 int r; 912 unsigned reserve_pages; 913 struct dm_kcopyd_client *kc; 914 915 kc = kzalloc(sizeof(*kc), GFP_KERNEL); 916 if (!kc) 917 return ERR_PTR(-ENOMEM); 918 919 spin_lock_init(&kc->job_lock); 920 INIT_LIST_HEAD(&kc->callback_jobs); 921 INIT_LIST_HEAD(&kc->complete_jobs); 922 INIT_LIST_HEAD(&kc->io_jobs); 923 INIT_LIST_HEAD(&kc->pages_jobs); 924 kc->throttle = throttle; 925 926 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache); 927 if (r) 928 goto bad_slab; 929 930 INIT_WORK(&kc->kcopyd_work, do_work); 931 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 932 if (!kc->kcopyd_wq) { 933 r = -ENOMEM; 934 goto bad_workqueue; 935 } 936 937 kc->sub_job_size = dm_get_kcopyd_subjob_size(); 938 reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE); 939 940 kc->pages = NULL; 941 kc->nr_reserved_pages = kc->nr_free_pages = 0; 942 r = client_reserve_pages(kc, reserve_pages); 943 if (r) 944 goto bad_client_pages; 945 946 kc->io_client = dm_io_client_create(); 947 if (IS_ERR(kc->io_client)) { 948 r = PTR_ERR(kc->io_client); 949 goto bad_io_client; 950 } 951 952 init_waitqueue_head(&kc->destroyq); 953 atomic_set(&kc->nr_jobs, 0); 954 955 return kc; 956 957 bad_io_client: 958 client_free_pages(kc); 959 bad_client_pages: 960 destroy_workqueue(kc->kcopyd_wq); 961 bad_workqueue: 962 mempool_exit(&kc->job_pool); 963 bad_slab: 964 kfree(kc); 965 966 return ERR_PTR(r); 967 } 968 EXPORT_SYMBOL(dm_kcopyd_client_create); 969 970 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 971 { 972 /* Wait for completion of all jobs submitted by this client. */ 973 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 974 975 BUG_ON(!list_empty(&kc->callback_jobs)); 976 BUG_ON(!list_empty(&kc->complete_jobs)); 977 BUG_ON(!list_empty(&kc->io_jobs)); 978 BUG_ON(!list_empty(&kc->pages_jobs)); 979 destroy_workqueue(kc->kcopyd_wq); 980 dm_io_client_destroy(kc->io_client); 981 client_free_pages(kc); 982 mempool_exit(&kc->job_pool); 983 kfree(kc); 984 } 985 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 986