1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 atomic_t nr_jobs; 49 50 mempool_t *job_pool; 51 52 struct workqueue_struct *kcopyd_wq; 53 struct work_struct kcopyd_work; 54 55 struct dm_kcopyd_throttle *throttle; 56 57 /* 58 * We maintain three lists of jobs: 59 * 60 * i) jobs waiting for pages 61 * ii) jobs that have pages, and are waiting for the io to be issued. 62 * iii) jobs that have completed. 63 * 64 * All three of these are protected by job_lock. 65 */ 66 spinlock_t job_lock; 67 struct list_head complete_jobs; 68 struct list_head io_jobs; 69 struct list_head pages_jobs; 70 }; 71 72 static struct page_list zero_page_list; 73 74 static DEFINE_SPINLOCK(throttle_spinlock); 75 76 /* 77 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 78 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 79 * by 2. 80 */ 81 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 82 83 /* 84 * Sleep this number of milliseconds. 85 * 86 * The value was decided experimentally. 87 * Smaller values seem to cause an increased copy rate above the limit. 88 * The reason for this is unknown but possibly due to jiffies rounding errors 89 * or read/write cache inside the disk. 90 */ 91 #define SLEEP_MSEC 100 92 93 /* 94 * Maximum number of sleep events. There is a theoretical livelock if more 95 * kcopyd clients do work simultaneously which this limit avoids. 96 */ 97 #define MAX_SLEEPS 10 98 99 static void io_job_start(struct dm_kcopyd_throttle *t) 100 { 101 unsigned throttle, now, difference; 102 int slept = 0, skew; 103 104 if (unlikely(!t)) 105 return; 106 107 try_again: 108 spin_lock_irq(&throttle_spinlock); 109 110 throttle = ACCESS_ONCE(t->throttle); 111 112 if (likely(throttle >= 100)) 113 goto skip_limit; 114 115 now = jiffies; 116 difference = now - t->last_jiffies; 117 t->last_jiffies = now; 118 if (t->num_io_jobs) 119 t->io_period += difference; 120 t->total_period += difference; 121 122 /* 123 * Maintain sane values if we got a temporary overflow. 124 */ 125 if (unlikely(t->io_period > t->total_period)) 126 t->io_period = t->total_period; 127 128 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 129 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 130 t->total_period >>= shift; 131 t->io_period >>= shift; 132 } 133 134 skew = t->io_period - throttle * t->total_period / 100; 135 136 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 137 slept++; 138 spin_unlock_irq(&throttle_spinlock); 139 msleep(SLEEP_MSEC); 140 goto try_again; 141 } 142 143 skip_limit: 144 t->num_io_jobs++; 145 146 spin_unlock_irq(&throttle_spinlock); 147 } 148 149 static void io_job_finish(struct dm_kcopyd_throttle *t) 150 { 151 unsigned long flags; 152 153 if (unlikely(!t)) 154 return; 155 156 spin_lock_irqsave(&throttle_spinlock, flags); 157 158 t->num_io_jobs--; 159 160 if (likely(ACCESS_ONCE(t->throttle) >= 100)) 161 goto skip_limit; 162 163 if (!t->num_io_jobs) { 164 unsigned now, difference; 165 166 now = jiffies; 167 difference = now - t->last_jiffies; 168 t->last_jiffies = now; 169 170 t->io_period += difference; 171 t->total_period += difference; 172 173 /* 174 * Maintain sane values if we got a temporary overflow. 175 */ 176 if (unlikely(t->io_period > t->total_period)) 177 t->io_period = t->total_period; 178 } 179 180 skip_limit: 181 spin_unlock_irqrestore(&throttle_spinlock, flags); 182 } 183 184 185 static void wake(struct dm_kcopyd_client *kc) 186 { 187 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 188 } 189 190 /* 191 * Obtain one page for the use of kcopyd. 192 */ 193 static struct page_list *alloc_pl(gfp_t gfp) 194 { 195 struct page_list *pl; 196 197 pl = kmalloc(sizeof(*pl), gfp); 198 if (!pl) 199 return NULL; 200 201 pl->page = alloc_page(gfp); 202 if (!pl->page) { 203 kfree(pl); 204 return NULL; 205 } 206 207 return pl; 208 } 209 210 static void free_pl(struct page_list *pl) 211 { 212 __free_page(pl->page); 213 kfree(pl); 214 } 215 216 /* 217 * Add the provided pages to a client's free page list, releasing 218 * back to the system any beyond the reserved_pages limit. 219 */ 220 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 221 { 222 struct page_list *next; 223 224 do { 225 next = pl->next; 226 227 if (kc->nr_free_pages >= kc->nr_reserved_pages) 228 free_pl(pl); 229 else { 230 pl->next = kc->pages; 231 kc->pages = pl; 232 kc->nr_free_pages++; 233 } 234 235 pl = next; 236 } while (pl); 237 } 238 239 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 240 unsigned int nr, struct page_list **pages) 241 { 242 struct page_list *pl; 243 244 *pages = NULL; 245 246 do { 247 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 248 if (unlikely(!pl)) { 249 /* Use reserved pages */ 250 pl = kc->pages; 251 if (unlikely(!pl)) 252 goto out_of_memory; 253 kc->pages = pl->next; 254 kc->nr_free_pages--; 255 } 256 pl->next = *pages; 257 *pages = pl; 258 } while (--nr); 259 260 return 0; 261 262 out_of_memory: 263 if (*pages) 264 kcopyd_put_pages(kc, *pages); 265 return -ENOMEM; 266 } 267 268 /* 269 * These three functions resize the page pool. 270 */ 271 static void drop_pages(struct page_list *pl) 272 { 273 struct page_list *next; 274 275 while (pl) { 276 next = pl->next; 277 free_pl(pl); 278 pl = next; 279 } 280 } 281 282 /* 283 * Allocate and reserve nr_pages for the use of a specific client. 284 */ 285 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 286 { 287 unsigned i; 288 struct page_list *pl = NULL, *next; 289 290 for (i = 0; i < nr_pages; i++) { 291 next = alloc_pl(GFP_KERNEL); 292 if (!next) { 293 if (pl) 294 drop_pages(pl); 295 return -ENOMEM; 296 } 297 next->next = pl; 298 pl = next; 299 } 300 301 kc->nr_reserved_pages += nr_pages; 302 kcopyd_put_pages(kc, pl); 303 304 return 0; 305 } 306 307 static void client_free_pages(struct dm_kcopyd_client *kc) 308 { 309 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 310 drop_pages(kc->pages); 311 kc->pages = NULL; 312 kc->nr_free_pages = kc->nr_reserved_pages = 0; 313 } 314 315 /*----------------------------------------------------------------- 316 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 317 * for this reason we use a mempool to prevent the client from 318 * ever having to do io (which could cause a deadlock). 319 *---------------------------------------------------------------*/ 320 struct kcopyd_job { 321 struct dm_kcopyd_client *kc; 322 struct list_head list; 323 unsigned long flags; 324 325 /* 326 * Error state of the job. 327 */ 328 int read_err; 329 unsigned long write_err; 330 331 /* 332 * Either READ or WRITE 333 */ 334 int rw; 335 struct dm_io_region source; 336 337 /* 338 * The destinations for the transfer. 339 */ 340 unsigned int num_dests; 341 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 342 343 struct page_list *pages; 344 345 /* 346 * Set this to ensure you are notified when the job has 347 * completed. 'context' is for callback to use. 348 */ 349 dm_kcopyd_notify_fn fn; 350 void *context; 351 352 /* 353 * These fields are only used if the job has been split 354 * into more manageable parts. 355 */ 356 struct mutex lock; 357 atomic_t sub_jobs; 358 sector_t progress; 359 360 struct kcopyd_job *master_job; 361 }; 362 363 static struct kmem_cache *_job_cache; 364 365 int __init dm_kcopyd_init(void) 366 { 367 _job_cache = kmem_cache_create("kcopyd_job", 368 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 369 __alignof__(struct kcopyd_job), 0, NULL); 370 if (!_job_cache) 371 return -ENOMEM; 372 373 zero_page_list.next = &zero_page_list; 374 zero_page_list.page = ZERO_PAGE(0); 375 376 return 0; 377 } 378 379 void dm_kcopyd_exit(void) 380 { 381 kmem_cache_destroy(_job_cache); 382 _job_cache = NULL; 383 } 384 385 /* 386 * Functions to push and pop a job onto the head of a given job 387 * list. 388 */ 389 static struct kcopyd_job *pop(struct list_head *jobs, 390 struct dm_kcopyd_client *kc) 391 { 392 struct kcopyd_job *job = NULL; 393 unsigned long flags; 394 395 spin_lock_irqsave(&kc->job_lock, flags); 396 397 if (!list_empty(jobs)) { 398 job = list_entry(jobs->next, struct kcopyd_job, list); 399 list_del(&job->list); 400 } 401 spin_unlock_irqrestore(&kc->job_lock, flags); 402 403 return job; 404 } 405 406 static void push(struct list_head *jobs, struct kcopyd_job *job) 407 { 408 unsigned long flags; 409 struct dm_kcopyd_client *kc = job->kc; 410 411 spin_lock_irqsave(&kc->job_lock, flags); 412 list_add_tail(&job->list, jobs); 413 spin_unlock_irqrestore(&kc->job_lock, flags); 414 } 415 416 417 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 418 { 419 unsigned long flags; 420 struct dm_kcopyd_client *kc = job->kc; 421 422 spin_lock_irqsave(&kc->job_lock, flags); 423 list_add(&job->list, jobs); 424 spin_unlock_irqrestore(&kc->job_lock, flags); 425 } 426 427 /* 428 * These three functions process 1 item from the corresponding 429 * job list. 430 * 431 * They return: 432 * < 0: error 433 * 0: success 434 * > 0: can't process yet. 435 */ 436 static int run_complete_job(struct kcopyd_job *job) 437 { 438 void *context = job->context; 439 int read_err = job->read_err; 440 unsigned long write_err = job->write_err; 441 dm_kcopyd_notify_fn fn = job->fn; 442 struct dm_kcopyd_client *kc = job->kc; 443 444 if (job->pages && job->pages != &zero_page_list) 445 kcopyd_put_pages(kc, job->pages); 446 /* 447 * If this is the master job, the sub jobs have already 448 * completed so we can free everything. 449 */ 450 if (job->master_job == job) 451 mempool_free(job, kc->job_pool); 452 fn(read_err, write_err, context); 453 454 if (atomic_dec_and_test(&kc->nr_jobs)) 455 wake_up(&kc->destroyq); 456 457 return 0; 458 } 459 460 static void complete_io(unsigned long error, void *context) 461 { 462 struct kcopyd_job *job = (struct kcopyd_job *) context; 463 struct dm_kcopyd_client *kc = job->kc; 464 465 io_job_finish(kc->throttle); 466 467 if (error) { 468 if (op_is_write(job->rw)) 469 job->write_err |= error; 470 else 471 job->read_err = 1; 472 473 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 474 push(&kc->complete_jobs, job); 475 wake(kc); 476 return; 477 } 478 } 479 480 if (op_is_write(job->rw)) 481 push(&kc->complete_jobs, job); 482 483 else { 484 job->rw = WRITE; 485 push(&kc->io_jobs, job); 486 } 487 488 wake(kc); 489 } 490 491 /* 492 * Request io on as many buffer heads as we can currently get for 493 * a particular job. 494 */ 495 static int run_io_job(struct kcopyd_job *job) 496 { 497 int r; 498 struct dm_io_request io_req = { 499 .bi_op = job->rw, 500 .bi_op_flags = 0, 501 .mem.type = DM_IO_PAGE_LIST, 502 .mem.ptr.pl = job->pages, 503 .mem.offset = 0, 504 .notify.fn = complete_io, 505 .notify.context = job, 506 .client = job->kc->io_client, 507 }; 508 509 io_job_start(job->kc->throttle); 510 511 if (job->rw == READ) 512 r = dm_io(&io_req, 1, &job->source, NULL); 513 else 514 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 515 516 return r; 517 } 518 519 static int run_pages_job(struct kcopyd_job *job) 520 { 521 int r; 522 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 523 524 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 525 if (!r) { 526 /* this job is ready for io */ 527 push(&job->kc->io_jobs, job); 528 return 0; 529 } 530 531 if (r == -ENOMEM) 532 /* can't complete now */ 533 return 1; 534 535 return r; 536 } 537 538 /* 539 * Run through a list for as long as possible. Returns the count 540 * of successful jobs. 541 */ 542 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 543 int (*fn) (struct kcopyd_job *)) 544 { 545 struct kcopyd_job *job; 546 int r, count = 0; 547 548 while ((job = pop(jobs, kc))) { 549 550 r = fn(job); 551 552 if (r < 0) { 553 /* error this rogue job */ 554 if (op_is_write(job->rw)) 555 job->write_err = (unsigned long) -1L; 556 else 557 job->read_err = 1; 558 push(&kc->complete_jobs, job); 559 break; 560 } 561 562 if (r > 0) { 563 /* 564 * We couldn't service this job ATM, so 565 * push this job back onto the list. 566 */ 567 push_head(jobs, job); 568 break; 569 } 570 571 count++; 572 } 573 574 return count; 575 } 576 577 /* 578 * kcopyd does this every time it's woken up. 579 */ 580 static void do_work(struct work_struct *work) 581 { 582 struct dm_kcopyd_client *kc = container_of(work, 583 struct dm_kcopyd_client, kcopyd_work); 584 struct blk_plug plug; 585 586 /* 587 * The order that these are called is *very* important. 588 * complete jobs can free some pages for pages jobs. 589 * Pages jobs when successful will jump onto the io jobs 590 * list. io jobs call wake when they complete and it all 591 * starts again. 592 */ 593 blk_start_plug(&plug); 594 process_jobs(&kc->complete_jobs, kc, run_complete_job); 595 process_jobs(&kc->pages_jobs, kc, run_pages_job); 596 process_jobs(&kc->io_jobs, kc, run_io_job); 597 blk_finish_plug(&plug); 598 } 599 600 /* 601 * If we are copying a small region we just dispatch a single job 602 * to do the copy, otherwise the io has to be split up into many 603 * jobs. 604 */ 605 static void dispatch_job(struct kcopyd_job *job) 606 { 607 struct dm_kcopyd_client *kc = job->kc; 608 atomic_inc(&kc->nr_jobs); 609 if (unlikely(!job->source.count)) 610 push(&kc->complete_jobs, job); 611 else if (job->pages == &zero_page_list) 612 push(&kc->io_jobs, job); 613 else 614 push(&kc->pages_jobs, job); 615 wake(kc); 616 } 617 618 static void segment_complete(int read_err, unsigned long write_err, 619 void *context) 620 { 621 /* FIXME: tidy this function */ 622 sector_t progress = 0; 623 sector_t count = 0; 624 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 625 struct kcopyd_job *job = sub_job->master_job; 626 struct dm_kcopyd_client *kc = job->kc; 627 628 mutex_lock(&job->lock); 629 630 /* update the error */ 631 if (read_err) 632 job->read_err = 1; 633 634 if (write_err) 635 job->write_err |= write_err; 636 637 /* 638 * Only dispatch more work if there hasn't been an error. 639 */ 640 if ((!job->read_err && !job->write_err) || 641 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 642 /* get the next chunk of work */ 643 progress = job->progress; 644 count = job->source.count - progress; 645 if (count) { 646 if (count > SUB_JOB_SIZE) 647 count = SUB_JOB_SIZE; 648 649 job->progress += count; 650 } 651 } 652 mutex_unlock(&job->lock); 653 654 if (count) { 655 int i; 656 657 *sub_job = *job; 658 sub_job->source.sector += progress; 659 sub_job->source.count = count; 660 661 for (i = 0; i < job->num_dests; i++) { 662 sub_job->dests[i].sector += progress; 663 sub_job->dests[i].count = count; 664 } 665 666 sub_job->fn = segment_complete; 667 sub_job->context = sub_job; 668 dispatch_job(sub_job); 669 670 } else if (atomic_dec_and_test(&job->sub_jobs)) { 671 672 /* 673 * Queue the completion callback to the kcopyd thread. 674 * 675 * Some callers assume that all the completions are called 676 * from a single thread and don't race with each other. 677 * 678 * We must not call the callback directly here because this 679 * code may not be executing in the thread. 680 */ 681 push(&kc->complete_jobs, job); 682 wake(kc); 683 } 684 } 685 686 /* 687 * Create some sub jobs to share the work between them. 688 */ 689 static void split_job(struct kcopyd_job *master_job) 690 { 691 int i; 692 693 atomic_inc(&master_job->kc->nr_jobs); 694 695 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 696 for (i = 0; i < SPLIT_COUNT; i++) { 697 master_job[i + 1].master_job = master_job; 698 segment_complete(0, 0u, &master_job[i + 1]); 699 } 700 } 701 702 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 703 unsigned int num_dests, struct dm_io_region *dests, 704 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 705 { 706 struct kcopyd_job *job; 707 int i; 708 709 /* 710 * Allocate an array of jobs consisting of one master job 711 * followed by SPLIT_COUNT sub jobs. 712 */ 713 job = mempool_alloc(kc->job_pool, GFP_NOIO); 714 715 /* 716 * set up for the read. 717 */ 718 job->kc = kc; 719 job->flags = flags; 720 job->read_err = 0; 721 job->write_err = 0; 722 723 job->num_dests = num_dests; 724 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 725 726 if (from) { 727 job->source = *from; 728 job->pages = NULL; 729 job->rw = READ; 730 } else { 731 memset(&job->source, 0, sizeof job->source); 732 job->source.count = job->dests[0].count; 733 job->pages = &zero_page_list; 734 735 /* 736 * Use WRITE SAME to optimize zeroing if all dests support it. 737 */ 738 job->rw = REQ_OP_WRITE_SAME; 739 for (i = 0; i < job->num_dests; i++) 740 if (!bdev_write_same(job->dests[i].bdev)) { 741 job->rw = WRITE; 742 break; 743 } 744 } 745 746 job->fn = fn; 747 job->context = context; 748 job->master_job = job; 749 750 if (job->source.count <= SUB_JOB_SIZE) 751 dispatch_job(job); 752 else { 753 mutex_init(&job->lock); 754 job->progress = 0; 755 split_job(job); 756 } 757 758 return 0; 759 } 760 EXPORT_SYMBOL(dm_kcopyd_copy); 761 762 int dm_kcopyd_zero(struct dm_kcopyd_client *kc, 763 unsigned num_dests, struct dm_io_region *dests, 764 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 765 { 766 return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 767 } 768 EXPORT_SYMBOL(dm_kcopyd_zero); 769 770 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 771 dm_kcopyd_notify_fn fn, void *context) 772 { 773 struct kcopyd_job *job; 774 775 job = mempool_alloc(kc->job_pool, GFP_NOIO); 776 777 memset(job, 0, sizeof(struct kcopyd_job)); 778 job->kc = kc; 779 job->fn = fn; 780 job->context = context; 781 job->master_job = job; 782 783 atomic_inc(&kc->nr_jobs); 784 785 return job; 786 } 787 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 788 789 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 790 { 791 struct kcopyd_job *job = j; 792 struct dm_kcopyd_client *kc = job->kc; 793 794 job->read_err = read_err; 795 job->write_err = write_err; 796 797 push(&kc->complete_jobs, job); 798 wake(kc); 799 } 800 EXPORT_SYMBOL(dm_kcopyd_do_callback); 801 802 /* 803 * Cancels a kcopyd job, eg. someone might be deactivating a 804 * mirror. 805 */ 806 #if 0 807 int kcopyd_cancel(struct kcopyd_job *job, int block) 808 { 809 /* FIXME: finish */ 810 return -1; 811 } 812 #endif /* 0 */ 813 814 /*----------------------------------------------------------------- 815 * Client setup 816 *---------------------------------------------------------------*/ 817 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 818 { 819 int r = -ENOMEM; 820 struct dm_kcopyd_client *kc; 821 822 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 823 if (!kc) 824 return ERR_PTR(-ENOMEM); 825 826 spin_lock_init(&kc->job_lock); 827 INIT_LIST_HEAD(&kc->complete_jobs); 828 INIT_LIST_HEAD(&kc->io_jobs); 829 INIT_LIST_HEAD(&kc->pages_jobs); 830 kc->throttle = throttle; 831 832 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 833 if (!kc->job_pool) 834 goto bad_slab; 835 836 INIT_WORK(&kc->kcopyd_work, do_work); 837 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 838 if (!kc->kcopyd_wq) 839 goto bad_workqueue; 840 841 kc->pages = NULL; 842 kc->nr_reserved_pages = kc->nr_free_pages = 0; 843 r = client_reserve_pages(kc, RESERVE_PAGES); 844 if (r) 845 goto bad_client_pages; 846 847 kc->io_client = dm_io_client_create(); 848 if (IS_ERR(kc->io_client)) { 849 r = PTR_ERR(kc->io_client); 850 goto bad_io_client; 851 } 852 853 init_waitqueue_head(&kc->destroyq); 854 atomic_set(&kc->nr_jobs, 0); 855 856 return kc; 857 858 bad_io_client: 859 client_free_pages(kc); 860 bad_client_pages: 861 destroy_workqueue(kc->kcopyd_wq); 862 bad_workqueue: 863 mempool_destroy(kc->job_pool); 864 bad_slab: 865 kfree(kc); 866 867 return ERR_PTR(r); 868 } 869 EXPORT_SYMBOL(dm_kcopyd_client_create); 870 871 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 872 { 873 /* Wait for completion of all jobs submitted by this client. */ 874 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 875 876 BUG_ON(!list_empty(&kc->complete_jobs)); 877 BUG_ON(!list_empty(&kc->io_jobs)); 878 BUG_ON(!list_empty(&kc->pages_jobs)); 879 destroy_workqueue(kc->kcopyd_wq); 880 dm_io_client_destroy(kc->io_client); 881 client_free_pages(kc); 882 mempool_destroy(kc->job_pool); 883 kfree(kc); 884 } 885 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 886