1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 49 mempool_t job_pool; 50 51 struct workqueue_struct *kcopyd_wq; 52 struct work_struct kcopyd_work; 53 54 struct dm_kcopyd_throttle *throttle; 55 56 atomic_t nr_jobs; 57 58 /* 59 * We maintain three lists of jobs: 60 * 61 * i) jobs waiting for pages 62 * ii) jobs that have pages, and are waiting for the io to be issued. 63 * iii) jobs that have completed. 64 * 65 * All three of these are protected by job_lock. 66 */ 67 spinlock_t job_lock; 68 struct list_head complete_jobs; 69 struct list_head io_jobs; 70 struct list_head pages_jobs; 71 }; 72 73 static struct page_list zero_page_list; 74 75 static DEFINE_SPINLOCK(throttle_spinlock); 76 77 /* 78 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 79 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 80 * by 2. 81 */ 82 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 83 84 /* 85 * Sleep this number of milliseconds. 86 * 87 * The value was decided experimentally. 88 * Smaller values seem to cause an increased copy rate above the limit. 89 * The reason for this is unknown but possibly due to jiffies rounding errors 90 * or read/write cache inside the disk. 91 */ 92 #define SLEEP_MSEC 100 93 94 /* 95 * Maximum number of sleep events. There is a theoretical livelock if more 96 * kcopyd clients do work simultaneously which this limit avoids. 97 */ 98 #define MAX_SLEEPS 10 99 100 static void io_job_start(struct dm_kcopyd_throttle *t) 101 { 102 unsigned throttle, now, difference; 103 int slept = 0, skew; 104 105 if (unlikely(!t)) 106 return; 107 108 try_again: 109 spin_lock_irq(&throttle_spinlock); 110 111 throttle = READ_ONCE(t->throttle); 112 113 if (likely(throttle >= 100)) 114 goto skip_limit; 115 116 now = jiffies; 117 difference = now - t->last_jiffies; 118 t->last_jiffies = now; 119 if (t->num_io_jobs) 120 t->io_period += difference; 121 t->total_period += difference; 122 123 /* 124 * Maintain sane values if we got a temporary overflow. 125 */ 126 if (unlikely(t->io_period > t->total_period)) 127 t->io_period = t->total_period; 128 129 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 130 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 131 t->total_period >>= shift; 132 t->io_period >>= shift; 133 } 134 135 skew = t->io_period - throttle * t->total_period / 100; 136 137 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 138 slept++; 139 spin_unlock_irq(&throttle_spinlock); 140 msleep(SLEEP_MSEC); 141 goto try_again; 142 } 143 144 skip_limit: 145 t->num_io_jobs++; 146 147 spin_unlock_irq(&throttle_spinlock); 148 } 149 150 static void io_job_finish(struct dm_kcopyd_throttle *t) 151 { 152 unsigned long flags; 153 154 if (unlikely(!t)) 155 return; 156 157 spin_lock_irqsave(&throttle_spinlock, flags); 158 159 t->num_io_jobs--; 160 161 if (likely(READ_ONCE(t->throttle) >= 100)) 162 goto skip_limit; 163 164 if (!t->num_io_jobs) { 165 unsigned now, difference; 166 167 now = jiffies; 168 difference = now - t->last_jiffies; 169 t->last_jiffies = now; 170 171 t->io_period += difference; 172 t->total_period += difference; 173 174 /* 175 * Maintain sane values if we got a temporary overflow. 176 */ 177 if (unlikely(t->io_period > t->total_period)) 178 t->io_period = t->total_period; 179 } 180 181 skip_limit: 182 spin_unlock_irqrestore(&throttle_spinlock, flags); 183 } 184 185 186 static void wake(struct dm_kcopyd_client *kc) 187 { 188 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 189 } 190 191 /* 192 * Obtain one page for the use of kcopyd. 193 */ 194 static struct page_list *alloc_pl(gfp_t gfp) 195 { 196 struct page_list *pl; 197 198 pl = kmalloc(sizeof(*pl), gfp); 199 if (!pl) 200 return NULL; 201 202 pl->page = alloc_page(gfp); 203 if (!pl->page) { 204 kfree(pl); 205 return NULL; 206 } 207 208 return pl; 209 } 210 211 static void free_pl(struct page_list *pl) 212 { 213 __free_page(pl->page); 214 kfree(pl); 215 } 216 217 /* 218 * Add the provided pages to a client's free page list, releasing 219 * back to the system any beyond the reserved_pages limit. 220 */ 221 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 222 { 223 struct page_list *next; 224 225 do { 226 next = pl->next; 227 228 if (kc->nr_free_pages >= kc->nr_reserved_pages) 229 free_pl(pl); 230 else { 231 pl->next = kc->pages; 232 kc->pages = pl; 233 kc->nr_free_pages++; 234 } 235 236 pl = next; 237 } while (pl); 238 } 239 240 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 241 unsigned int nr, struct page_list **pages) 242 { 243 struct page_list *pl; 244 245 *pages = NULL; 246 247 do { 248 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 249 if (unlikely(!pl)) { 250 /* Use reserved pages */ 251 pl = kc->pages; 252 if (unlikely(!pl)) 253 goto out_of_memory; 254 kc->pages = pl->next; 255 kc->nr_free_pages--; 256 } 257 pl->next = *pages; 258 *pages = pl; 259 } while (--nr); 260 261 return 0; 262 263 out_of_memory: 264 if (*pages) 265 kcopyd_put_pages(kc, *pages); 266 return -ENOMEM; 267 } 268 269 /* 270 * These three functions resize the page pool. 271 */ 272 static void drop_pages(struct page_list *pl) 273 { 274 struct page_list *next; 275 276 while (pl) { 277 next = pl->next; 278 free_pl(pl); 279 pl = next; 280 } 281 } 282 283 /* 284 * Allocate and reserve nr_pages for the use of a specific client. 285 */ 286 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 287 { 288 unsigned i; 289 struct page_list *pl = NULL, *next; 290 291 for (i = 0; i < nr_pages; i++) { 292 next = alloc_pl(GFP_KERNEL); 293 if (!next) { 294 if (pl) 295 drop_pages(pl); 296 return -ENOMEM; 297 } 298 next->next = pl; 299 pl = next; 300 } 301 302 kc->nr_reserved_pages += nr_pages; 303 kcopyd_put_pages(kc, pl); 304 305 return 0; 306 } 307 308 static void client_free_pages(struct dm_kcopyd_client *kc) 309 { 310 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 311 drop_pages(kc->pages); 312 kc->pages = NULL; 313 kc->nr_free_pages = kc->nr_reserved_pages = 0; 314 } 315 316 /*----------------------------------------------------------------- 317 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 318 * for this reason we use a mempool to prevent the client from 319 * ever having to do io (which could cause a deadlock). 320 *---------------------------------------------------------------*/ 321 struct kcopyd_job { 322 struct dm_kcopyd_client *kc; 323 struct list_head list; 324 unsigned long flags; 325 326 /* 327 * Error state of the job. 328 */ 329 int read_err; 330 unsigned long write_err; 331 332 /* 333 * Either READ or WRITE 334 */ 335 int rw; 336 struct dm_io_region source; 337 338 /* 339 * The destinations for the transfer. 340 */ 341 unsigned int num_dests; 342 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 343 344 struct page_list *pages; 345 346 /* 347 * Set this to ensure you are notified when the job has 348 * completed. 'context' is for callback to use. 349 */ 350 dm_kcopyd_notify_fn fn; 351 void *context; 352 353 /* 354 * These fields are only used if the job has been split 355 * into more manageable parts. 356 */ 357 struct mutex lock; 358 atomic_t sub_jobs; 359 sector_t progress; 360 sector_t write_offset; 361 362 struct kcopyd_job *master_job; 363 }; 364 365 static struct kmem_cache *_job_cache; 366 367 int __init dm_kcopyd_init(void) 368 { 369 _job_cache = kmem_cache_create("kcopyd_job", 370 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 371 __alignof__(struct kcopyd_job), 0, NULL); 372 if (!_job_cache) 373 return -ENOMEM; 374 375 zero_page_list.next = &zero_page_list; 376 zero_page_list.page = ZERO_PAGE(0); 377 378 return 0; 379 } 380 381 void dm_kcopyd_exit(void) 382 { 383 kmem_cache_destroy(_job_cache); 384 _job_cache = NULL; 385 } 386 387 /* 388 * Functions to push and pop a job onto the head of a given job 389 * list. 390 */ 391 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 392 struct dm_kcopyd_client *kc) 393 { 394 struct kcopyd_job *job; 395 396 /* 397 * For I/O jobs, pop any read, any write without sequential write 398 * constraint and sequential writes that are at the right position. 399 */ 400 list_for_each_entry(job, jobs, list) { 401 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 402 list_del(&job->list); 403 return job; 404 } 405 406 if (job->write_offset == job->master_job->write_offset) { 407 job->master_job->write_offset += job->source.count; 408 list_del(&job->list); 409 return job; 410 } 411 } 412 413 return NULL; 414 } 415 416 static struct kcopyd_job *pop(struct list_head *jobs, 417 struct dm_kcopyd_client *kc) 418 { 419 struct kcopyd_job *job = NULL; 420 unsigned long flags; 421 422 spin_lock_irqsave(&kc->job_lock, flags); 423 424 if (!list_empty(jobs)) { 425 if (jobs == &kc->io_jobs) 426 job = pop_io_job(jobs, kc); 427 else { 428 job = list_entry(jobs->next, struct kcopyd_job, list); 429 list_del(&job->list); 430 } 431 } 432 spin_unlock_irqrestore(&kc->job_lock, flags); 433 434 return job; 435 } 436 437 static void push(struct list_head *jobs, struct kcopyd_job *job) 438 { 439 unsigned long flags; 440 struct dm_kcopyd_client *kc = job->kc; 441 442 spin_lock_irqsave(&kc->job_lock, flags); 443 list_add_tail(&job->list, jobs); 444 spin_unlock_irqrestore(&kc->job_lock, flags); 445 } 446 447 448 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 449 { 450 unsigned long flags; 451 struct dm_kcopyd_client *kc = job->kc; 452 453 spin_lock_irqsave(&kc->job_lock, flags); 454 list_add(&job->list, jobs); 455 spin_unlock_irqrestore(&kc->job_lock, flags); 456 } 457 458 /* 459 * These three functions process 1 item from the corresponding 460 * job list. 461 * 462 * They return: 463 * < 0: error 464 * 0: success 465 * > 0: can't process yet. 466 */ 467 static int run_complete_job(struct kcopyd_job *job) 468 { 469 void *context = job->context; 470 int read_err = job->read_err; 471 unsigned long write_err = job->write_err; 472 dm_kcopyd_notify_fn fn = job->fn; 473 struct dm_kcopyd_client *kc = job->kc; 474 475 if (job->pages && job->pages != &zero_page_list) 476 kcopyd_put_pages(kc, job->pages); 477 /* 478 * If this is the master job, the sub jobs have already 479 * completed so we can free everything. 480 */ 481 if (job->master_job == job) { 482 mutex_destroy(&job->lock); 483 mempool_free(job, &kc->job_pool); 484 } 485 fn(read_err, write_err, context); 486 487 if (atomic_dec_and_test(&kc->nr_jobs)) 488 wake_up(&kc->destroyq); 489 490 return 0; 491 } 492 493 static void complete_io(unsigned long error, void *context) 494 { 495 struct kcopyd_job *job = (struct kcopyd_job *) context; 496 struct dm_kcopyd_client *kc = job->kc; 497 498 io_job_finish(kc->throttle); 499 500 if (error) { 501 if (op_is_write(job->rw)) 502 job->write_err |= error; 503 else 504 job->read_err = 1; 505 506 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 507 push(&kc->complete_jobs, job); 508 wake(kc); 509 return; 510 } 511 } 512 513 if (op_is_write(job->rw)) 514 push(&kc->complete_jobs, job); 515 516 else { 517 job->rw = WRITE; 518 push(&kc->io_jobs, job); 519 } 520 521 wake(kc); 522 } 523 524 /* 525 * Request io on as many buffer heads as we can currently get for 526 * a particular job. 527 */ 528 static int run_io_job(struct kcopyd_job *job) 529 { 530 int r; 531 struct dm_io_request io_req = { 532 .bi_op = job->rw, 533 .bi_op_flags = 0, 534 .mem.type = DM_IO_PAGE_LIST, 535 .mem.ptr.pl = job->pages, 536 .mem.offset = 0, 537 .notify.fn = complete_io, 538 .notify.context = job, 539 .client = job->kc->io_client, 540 }; 541 542 /* 543 * If we need to write sequentially and some reads or writes failed, 544 * no point in continuing. 545 */ 546 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 547 job->master_job->write_err) 548 return -EIO; 549 550 io_job_start(job->kc->throttle); 551 552 if (job->rw == READ) 553 r = dm_io(&io_req, 1, &job->source, NULL); 554 else 555 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 556 557 return r; 558 } 559 560 static int run_pages_job(struct kcopyd_job *job) 561 { 562 int r; 563 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 564 565 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 566 if (!r) { 567 /* this job is ready for io */ 568 push(&job->kc->io_jobs, job); 569 return 0; 570 } 571 572 if (r == -ENOMEM) 573 /* can't complete now */ 574 return 1; 575 576 return r; 577 } 578 579 /* 580 * Run through a list for as long as possible. Returns the count 581 * of successful jobs. 582 */ 583 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 584 int (*fn) (struct kcopyd_job *)) 585 { 586 struct kcopyd_job *job; 587 int r, count = 0; 588 589 while ((job = pop(jobs, kc))) { 590 591 r = fn(job); 592 593 if (r < 0) { 594 /* error this rogue job */ 595 if (op_is_write(job->rw)) 596 job->write_err = (unsigned long) -1L; 597 else 598 job->read_err = 1; 599 push(&kc->complete_jobs, job); 600 break; 601 } 602 603 if (r > 0) { 604 /* 605 * We couldn't service this job ATM, so 606 * push this job back onto the list. 607 */ 608 push_head(jobs, job); 609 break; 610 } 611 612 count++; 613 } 614 615 return count; 616 } 617 618 /* 619 * kcopyd does this every time it's woken up. 620 */ 621 static void do_work(struct work_struct *work) 622 { 623 struct dm_kcopyd_client *kc = container_of(work, 624 struct dm_kcopyd_client, kcopyd_work); 625 struct blk_plug plug; 626 627 /* 628 * The order that these are called is *very* important. 629 * complete jobs can free some pages for pages jobs. 630 * Pages jobs when successful will jump onto the io jobs 631 * list. io jobs call wake when they complete and it all 632 * starts again. 633 */ 634 blk_start_plug(&plug); 635 process_jobs(&kc->complete_jobs, kc, run_complete_job); 636 process_jobs(&kc->pages_jobs, kc, run_pages_job); 637 process_jobs(&kc->io_jobs, kc, run_io_job); 638 blk_finish_plug(&plug); 639 } 640 641 /* 642 * If we are copying a small region we just dispatch a single job 643 * to do the copy, otherwise the io has to be split up into many 644 * jobs. 645 */ 646 static void dispatch_job(struct kcopyd_job *job) 647 { 648 struct dm_kcopyd_client *kc = job->kc; 649 atomic_inc(&kc->nr_jobs); 650 if (unlikely(!job->source.count)) 651 push(&kc->complete_jobs, job); 652 else if (job->pages == &zero_page_list) 653 push(&kc->io_jobs, job); 654 else 655 push(&kc->pages_jobs, job); 656 wake(kc); 657 } 658 659 static void segment_complete(int read_err, unsigned long write_err, 660 void *context) 661 { 662 /* FIXME: tidy this function */ 663 sector_t progress = 0; 664 sector_t count = 0; 665 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 666 struct kcopyd_job *job = sub_job->master_job; 667 struct dm_kcopyd_client *kc = job->kc; 668 669 mutex_lock(&job->lock); 670 671 /* update the error */ 672 if (read_err) 673 job->read_err = 1; 674 675 if (write_err) 676 job->write_err |= write_err; 677 678 /* 679 * Only dispatch more work if there hasn't been an error. 680 */ 681 if ((!job->read_err && !job->write_err) || 682 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 683 /* get the next chunk of work */ 684 progress = job->progress; 685 count = job->source.count - progress; 686 if (count) { 687 if (count > SUB_JOB_SIZE) 688 count = SUB_JOB_SIZE; 689 690 job->progress += count; 691 } 692 } 693 mutex_unlock(&job->lock); 694 695 if (count) { 696 int i; 697 698 *sub_job = *job; 699 sub_job->write_offset = progress; 700 sub_job->source.sector += progress; 701 sub_job->source.count = count; 702 703 for (i = 0; i < job->num_dests; i++) { 704 sub_job->dests[i].sector += progress; 705 sub_job->dests[i].count = count; 706 } 707 708 sub_job->fn = segment_complete; 709 sub_job->context = sub_job; 710 dispatch_job(sub_job); 711 712 } else if (atomic_dec_and_test(&job->sub_jobs)) { 713 714 /* 715 * Queue the completion callback to the kcopyd thread. 716 * 717 * Some callers assume that all the completions are called 718 * from a single thread and don't race with each other. 719 * 720 * We must not call the callback directly here because this 721 * code may not be executing in the thread. 722 */ 723 push(&kc->complete_jobs, job); 724 wake(kc); 725 } 726 } 727 728 /* 729 * Create some sub jobs to share the work between them. 730 */ 731 static void split_job(struct kcopyd_job *master_job) 732 { 733 int i; 734 735 atomic_inc(&master_job->kc->nr_jobs); 736 737 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 738 for (i = 0; i < SPLIT_COUNT; i++) { 739 master_job[i + 1].master_job = master_job; 740 segment_complete(0, 0u, &master_job[i + 1]); 741 } 742 } 743 744 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 745 unsigned int num_dests, struct dm_io_region *dests, 746 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 747 { 748 struct kcopyd_job *job; 749 int i; 750 751 /* 752 * Allocate an array of jobs consisting of one master job 753 * followed by SPLIT_COUNT sub jobs. 754 */ 755 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 756 mutex_init(&job->lock); 757 758 /* 759 * set up for the read. 760 */ 761 job->kc = kc; 762 job->flags = flags; 763 job->read_err = 0; 764 job->write_err = 0; 765 766 job->num_dests = num_dests; 767 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 768 769 /* 770 * If one of the destination is a host-managed zoned block device, 771 * we need to write sequentially. If one of the destination is a 772 * host-aware device, then leave it to the caller to choose what to do. 773 */ 774 if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 775 for (i = 0; i < job->num_dests; i++) { 776 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 777 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags); 778 break; 779 } 780 } 781 } 782 783 /* 784 * If we need to write sequentially, errors cannot be ignored. 785 */ 786 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 787 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) 788 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags); 789 790 if (from) { 791 job->source = *from; 792 job->pages = NULL; 793 job->rw = READ; 794 } else { 795 memset(&job->source, 0, sizeof job->source); 796 job->source.count = job->dests[0].count; 797 job->pages = &zero_page_list; 798 799 /* 800 * Use WRITE ZEROES to optimize zeroing if all dests support it. 801 */ 802 job->rw = REQ_OP_WRITE_ZEROES; 803 for (i = 0; i < job->num_dests; i++) 804 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 805 job->rw = WRITE; 806 break; 807 } 808 } 809 810 job->fn = fn; 811 job->context = context; 812 job->master_job = job; 813 job->write_offset = 0; 814 815 if (job->source.count <= SUB_JOB_SIZE) 816 dispatch_job(job); 817 else { 818 job->progress = 0; 819 split_job(job); 820 } 821 822 return 0; 823 } 824 EXPORT_SYMBOL(dm_kcopyd_copy); 825 826 int dm_kcopyd_zero(struct dm_kcopyd_client *kc, 827 unsigned num_dests, struct dm_io_region *dests, 828 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 829 { 830 return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 831 } 832 EXPORT_SYMBOL(dm_kcopyd_zero); 833 834 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 835 dm_kcopyd_notify_fn fn, void *context) 836 { 837 struct kcopyd_job *job; 838 839 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 840 841 memset(job, 0, sizeof(struct kcopyd_job)); 842 job->kc = kc; 843 job->fn = fn; 844 job->context = context; 845 job->master_job = job; 846 847 atomic_inc(&kc->nr_jobs); 848 849 return job; 850 } 851 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 852 853 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 854 { 855 struct kcopyd_job *job = j; 856 struct dm_kcopyd_client *kc = job->kc; 857 858 job->read_err = read_err; 859 job->write_err = write_err; 860 861 push(&kc->complete_jobs, job); 862 wake(kc); 863 } 864 EXPORT_SYMBOL(dm_kcopyd_do_callback); 865 866 /* 867 * Cancels a kcopyd job, eg. someone might be deactivating a 868 * mirror. 869 */ 870 #if 0 871 int kcopyd_cancel(struct kcopyd_job *job, int block) 872 { 873 /* FIXME: finish */ 874 return -1; 875 } 876 #endif /* 0 */ 877 878 /*----------------------------------------------------------------- 879 * Client setup 880 *---------------------------------------------------------------*/ 881 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 882 { 883 int r; 884 struct dm_kcopyd_client *kc; 885 886 kc = kzalloc(sizeof(*kc), GFP_KERNEL); 887 if (!kc) 888 return ERR_PTR(-ENOMEM); 889 890 spin_lock_init(&kc->job_lock); 891 INIT_LIST_HEAD(&kc->complete_jobs); 892 INIT_LIST_HEAD(&kc->io_jobs); 893 INIT_LIST_HEAD(&kc->pages_jobs); 894 kc->throttle = throttle; 895 896 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache); 897 if (r) 898 goto bad_slab; 899 900 INIT_WORK(&kc->kcopyd_work, do_work); 901 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 902 if (!kc->kcopyd_wq) { 903 r = -ENOMEM; 904 goto bad_workqueue; 905 } 906 907 kc->pages = NULL; 908 kc->nr_reserved_pages = kc->nr_free_pages = 0; 909 r = client_reserve_pages(kc, RESERVE_PAGES); 910 if (r) 911 goto bad_client_pages; 912 913 kc->io_client = dm_io_client_create(); 914 if (IS_ERR(kc->io_client)) { 915 r = PTR_ERR(kc->io_client); 916 goto bad_io_client; 917 } 918 919 init_waitqueue_head(&kc->destroyq); 920 atomic_set(&kc->nr_jobs, 0); 921 922 return kc; 923 924 bad_io_client: 925 client_free_pages(kc); 926 bad_client_pages: 927 destroy_workqueue(kc->kcopyd_wq); 928 bad_workqueue: 929 mempool_exit(&kc->job_pool); 930 bad_slab: 931 kfree(kc); 932 933 return ERR_PTR(r); 934 } 935 EXPORT_SYMBOL(dm_kcopyd_client_create); 936 937 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 938 { 939 /* Wait for completion of all jobs submitted by this client. */ 940 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 941 942 BUG_ON(!list_empty(&kc->complete_jobs)); 943 BUG_ON(!list_empty(&kc->io_jobs)); 944 BUG_ON(!list_empty(&kc->pages_jobs)); 945 destroy_workqueue(kc->kcopyd_wq); 946 dm_io_client_destroy(kc->io_client); 947 client_free_pages(kc); 948 mempool_exit(&kc->job_pool); 949 kfree(kc); 950 } 951 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 952