1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/device-mapper.h> 26 #include <linux/dm-kcopyd.h> 27 28 #include "dm.h" 29 30 #define SUB_JOB_SIZE 128 31 #define SPLIT_COUNT 8 32 #define MIN_JOBS 8 33 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 34 35 /*----------------------------------------------------------------- 36 * Each kcopyd client has its own little pool of preallocated 37 * pages for kcopyd io. 38 *---------------------------------------------------------------*/ 39 struct dm_kcopyd_client { 40 struct page_list *pages; 41 unsigned nr_reserved_pages; 42 unsigned nr_free_pages; 43 44 struct dm_io_client *io_client; 45 46 wait_queue_head_t destroyq; 47 atomic_t nr_jobs; 48 49 mempool_t *job_pool; 50 51 struct workqueue_struct *kcopyd_wq; 52 struct work_struct kcopyd_work; 53 54 /* 55 * We maintain three lists of jobs: 56 * 57 * i) jobs waiting for pages 58 * ii) jobs that have pages, and are waiting for the io to be issued. 59 * iii) jobs that have completed. 60 * 61 * All three of these are protected by job_lock. 62 */ 63 spinlock_t job_lock; 64 struct list_head complete_jobs; 65 struct list_head io_jobs; 66 struct list_head pages_jobs; 67 }; 68 69 static void wake(struct dm_kcopyd_client *kc) 70 { 71 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 72 } 73 74 /* 75 * Obtain one page for the use of kcopyd. 76 */ 77 static struct page_list *alloc_pl(gfp_t gfp) 78 { 79 struct page_list *pl; 80 81 pl = kmalloc(sizeof(*pl), gfp); 82 if (!pl) 83 return NULL; 84 85 pl->page = alloc_page(gfp); 86 if (!pl->page) { 87 kfree(pl); 88 return NULL; 89 } 90 91 return pl; 92 } 93 94 static void free_pl(struct page_list *pl) 95 { 96 __free_page(pl->page); 97 kfree(pl); 98 } 99 100 /* 101 * Add the provided pages to a client's free page list, releasing 102 * back to the system any beyond the reserved_pages limit. 103 */ 104 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 105 { 106 struct page_list *next; 107 108 do { 109 next = pl->next; 110 111 if (kc->nr_free_pages >= kc->nr_reserved_pages) 112 free_pl(pl); 113 else { 114 pl->next = kc->pages; 115 kc->pages = pl; 116 kc->nr_free_pages++; 117 } 118 119 pl = next; 120 } while (pl); 121 } 122 123 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 124 unsigned int nr, struct page_list **pages) 125 { 126 struct page_list *pl; 127 128 *pages = NULL; 129 130 do { 131 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY); 132 if (unlikely(!pl)) { 133 /* Use reserved pages */ 134 pl = kc->pages; 135 if (unlikely(!pl)) 136 goto out_of_memory; 137 kc->pages = pl->next; 138 kc->nr_free_pages--; 139 } 140 pl->next = *pages; 141 *pages = pl; 142 } while (--nr); 143 144 return 0; 145 146 out_of_memory: 147 if (*pages) 148 kcopyd_put_pages(kc, *pages); 149 return -ENOMEM; 150 } 151 152 /* 153 * These three functions resize the page pool. 154 */ 155 static void drop_pages(struct page_list *pl) 156 { 157 struct page_list *next; 158 159 while (pl) { 160 next = pl->next; 161 free_pl(pl); 162 pl = next; 163 } 164 } 165 166 /* 167 * Allocate and reserve nr_pages for the use of a specific client. 168 */ 169 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 170 { 171 unsigned i; 172 struct page_list *pl = NULL, *next; 173 174 for (i = 0; i < nr_pages; i++) { 175 next = alloc_pl(GFP_KERNEL); 176 if (!next) { 177 if (pl) 178 drop_pages(pl); 179 return -ENOMEM; 180 } 181 next->next = pl; 182 pl = next; 183 } 184 185 kc->nr_reserved_pages += nr_pages; 186 kcopyd_put_pages(kc, pl); 187 188 return 0; 189 } 190 191 static void client_free_pages(struct dm_kcopyd_client *kc) 192 { 193 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 194 drop_pages(kc->pages); 195 kc->pages = NULL; 196 kc->nr_free_pages = kc->nr_reserved_pages = 0; 197 } 198 199 /*----------------------------------------------------------------- 200 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 201 * for this reason we use a mempool to prevent the client from 202 * ever having to do io (which could cause a deadlock). 203 *---------------------------------------------------------------*/ 204 struct kcopyd_job { 205 struct dm_kcopyd_client *kc; 206 struct list_head list; 207 unsigned long flags; 208 209 /* 210 * Error state of the job. 211 */ 212 int read_err; 213 unsigned long write_err; 214 215 /* 216 * Either READ or WRITE 217 */ 218 int rw; 219 struct dm_io_region source; 220 221 /* 222 * The destinations for the transfer. 223 */ 224 unsigned int num_dests; 225 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 226 227 struct page_list *pages; 228 229 /* 230 * Set this to ensure you are notified when the job has 231 * completed. 'context' is for callback to use. 232 */ 233 dm_kcopyd_notify_fn fn; 234 void *context; 235 236 /* 237 * These fields are only used if the job has been split 238 * into more manageable parts. 239 */ 240 struct mutex lock; 241 atomic_t sub_jobs; 242 sector_t progress; 243 244 struct kcopyd_job *master_job; 245 }; 246 247 static struct kmem_cache *_job_cache; 248 249 int __init dm_kcopyd_init(void) 250 { 251 _job_cache = kmem_cache_create("kcopyd_job", 252 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 253 __alignof__(struct kcopyd_job), 0, NULL); 254 if (!_job_cache) 255 return -ENOMEM; 256 257 return 0; 258 } 259 260 void dm_kcopyd_exit(void) 261 { 262 kmem_cache_destroy(_job_cache); 263 _job_cache = NULL; 264 } 265 266 /* 267 * Functions to push and pop a job onto the head of a given job 268 * list. 269 */ 270 static struct kcopyd_job *pop(struct list_head *jobs, 271 struct dm_kcopyd_client *kc) 272 { 273 struct kcopyd_job *job = NULL; 274 unsigned long flags; 275 276 spin_lock_irqsave(&kc->job_lock, flags); 277 278 if (!list_empty(jobs)) { 279 job = list_entry(jobs->next, struct kcopyd_job, list); 280 list_del(&job->list); 281 } 282 spin_unlock_irqrestore(&kc->job_lock, flags); 283 284 return job; 285 } 286 287 static void push(struct list_head *jobs, struct kcopyd_job *job) 288 { 289 unsigned long flags; 290 struct dm_kcopyd_client *kc = job->kc; 291 292 spin_lock_irqsave(&kc->job_lock, flags); 293 list_add_tail(&job->list, jobs); 294 spin_unlock_irqrestore(&kc->job_lock, flags); 295 } 296 297 298 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 299 { 300 unsigned long flags; 301 struct dm_kcopyd_client *kc = job->kc; 302 303 spin_lock_irqsave(&kc->job_lock, flags); 304 list_add(&job->list, jobs); 305 spin_unlock_irqrestore(&kc->job_lock, flags); 306 } 307 308 /* 309 * These three functions process 1 item from the corresponding 310 * job list. 311 * 312 * They return: 313 * < 0: error 314 * 0: success 315 * > 0: can't process yet. 316 */ 317 static int run_complete_job(struct kcopyd_job *job) 318 { 319 void *context = job->context; 320 int read_err = job->read_err; 321 unsigned long write_err = job->write_err; 322 dm_kcopyd_notify_fn fn = job->fn; 323 struct dm_kcopyd_client *kc = job->kc; 324 325 if (job->pages) 326 kcopyd_put_pages(kc, job->pages); 327 /* 328 * If this is the master job, the sub jobs have already 329 * completed so we can free everything. 330 */ 331 if (job->master_job == job) 332 mempool_free(job, kc->job_pool); 333 fn(read_err, write_err, context); 334 335 if (atomic_dec_and_test(&kc->nr_jobs)) 336 wake_up(&kc->destroyq); 337 338 return 0; 339 } 340 341 static void complete_io(unsigned long error, void *context) 342 { 343 struct kcopyd_job *job = (struct kcopyd_job *) context; 344 struct dm_kcopyd_client *kc = job->kc; 345 346 if (error) { 347 if (job->rw == WRITE) 348 job->write_err |= error; 349 else 350 job->read_err = 1; 351 352 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 353 push(&kc->complete_jobs, job); 354 wake(kc); 355 return; 356 } 357 } 358 359 if (job->rw == WRITE) 360 push(&kc->complete_jobs, job); 361 362 else { 363 job->rw = WRITE; 364 push(&kc->io_jobs, job); 365 } 366 367 wake(kc); 368 } 369 370 /* 371 * Request io on as many buffer heads as we can currently get for 372 * a particular job. 373 */ 374 static int run_io_job(struct kcopyd_job *job) 375 { 376 int r; 377 struct dm_io_request io_req = { 378 .bi_rw = job->rw, 379 .mem.type = DM_IO_PAGE_LIST, 380 .mem.ptr.pl = job->pages, 381 .mem.offset = 0, 382 .notify.fn = complete_io, 383 .notify.context = job, 384 .client = job->kc->io_client, 385 }; 386 387 if (job->rw == READ) 388 r = dm_io(&io_req, 1, &job->source, NULL); 389 else 390 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 391 392 return r; 393 } 394 395 static int run_pages_job(struct kcopyd_job *job) 396 { 397 int r; 398 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 399 400 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 401 if (!r) { 402 /* this job is ready for io */ 403 push(&job->kc->io_jobs, job); 404 return 0; 405 } 406 407 if (r == -ENOMEM) 408 /* can't complete now */ 409 return 1; 410 411 return r; 412 } 413 414 /* 415 * Run through a list for as long as possible. Returns the count 416 * of successful jobs. 417 */ 418 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 419 int (*fn) (struct kcopyd_job *)) 420 { 421 struct kcopyd_job *job; 422 int r, count = 0; 423 424 while ((job = pop(jobs, kc))) { 425 426 r = fn(job); 427 428 if (r < 0) { 429 /* error this rogue job */ 430 if (job->rw == WRITE) 431 job->write_err = (unsigned long) -1L; 432 else 433 job->read_err = 1; 434 push(&kc->complete_jobs, job); 435 break; 436 } 437 438 if (r > 0) { 439 /* 440 * We couldn't service this job ATM, so 441 * push this job back onto the list. 442 */ 443 push_head(jobs, job); 444 break; 445 } 446 447 count++; 448 } 449 450 return count; 451 } 452 453 /* 454 * kcopyd does this every time it's woken up. 455 */ 456 static void do_work(struct work_struct *work) 457 { 458 struct dm_kcopyd_client *kc = container_of(work, 459 struct dm_kcopyd_client, kcopyd_work); 460 struct blk_plug plug; 461 462 /* 463 * The order that these are called is *very* important. 464 * complete jobs can free some pages for pages jobs. 465 * Pages jobs when successful will jump onto the io jobs 466 * list. io jobs call wake when they complete and it all 467 * starts again. 468 */ 469 blk_start_plug(&plug); 470 process_jobs(&kc->complete_jobs, kc, run_complete_job); 471 process_jobs(&kc->pages_jobs, kc, run_pages_job); 472 process_jobs(&kc->io_jobs, kc, run_io_job); 473 blk_finish_plug(&plug); 474 } 475 476 /* 477 * If we are copying a small region we just dispatch a single job 478 * to do the copy, otherwise the io has to be split up into many 479 * jobs. 480 */ 481 static void dispatch_job(struct kcopyd_job *job) 482 { 483 struct dm_kcopyd_client *kc = job->kc; 484 atomic_inc(&kc->nr_jobs); 485 if (unlikely(!job->source.count)) 486 push(&kc->complete_jobs, job); 487 else 488 push(&kc->pages_jobs, job); 489 wake(kc); 490 } 491 492 static void segment_complete(int read_err, unsigned long write_err, 493 void *context) 494 { 495 /* FIXME: tidy this function */ 496 sector_t progress = 0; 497 sector_t count = 0; 498 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 499 struct kcopyd_job *job = sub_job->master_job; 500 struct dm_kcopyd_client *kc = job->kc; 501 502 mutex_lock(&job->lock); 503 504 /* update the error */ 505 if (read_err) 506 job->read_err = 1; 507 508 if (write_err) 509 job->write_err |= write_err; 510 511 /* 512 * Only dispatch more work if there hasn't been an error. 513 */ 514 if ((!job->read_err && !job->write_err) || 515 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 516 /* get the next chunk of work */ 517 progress = job->progress; 518 count = job->source.count - progress; 519 if (count) { 520 if (count > SUB_JOB_SIZE) 521 count = SUB_JOB_SIZE; 522 523 job->progress += count; 524 } 525 } 526 mutex_unlock(&job->lock); 527 528 if (count) { 529 int i; 530 531 *sub_job = *job; 532 sub_job->source.sector += progress; 533 sub_job->source.count = count; 534 535 for (i = 0; i < job->num_dests; i++) { 536 sub_job->dests[i].sector += progress; 537 sub_job->dests[i].count = count; 538 } 539 540 sub_job->fn = segment_complete; 541 sub_job->context = sub_job; 542 dispatch_job(sub_job); 543 544 } else if (atomic_dec_and_test(&job->sub_jobs)) { 545 546 /* 547 * Queue the completion callback to the kcopyd thread. 548 * 549 * Some callers assume that all the completions are called 550 * from a single thread and don't race with each other. 551 * 552 * We must not call the callback directly here because this 553 * code may not be executing in the thread. 554 */ 555 push(&kc->complete_jobs, job); 556 wake(kc); 557 } 558 } 559 560 /* 561 * Create some sub jobs to share the work between them. 562 */ 563 static void split_job(struct kcopyd_job *master_job) 564 { 565 int i; 566 567 atomic_inc(&master_job->kc->nr_jobs); 568 569 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 570 for (i = 0; i < SPLIT_COUNT; i++) { 571 master_job[i + 1].master_job = master_job; 572 segment_complete(0, 0u, &master_job[i + 1]); 573 } 574 } 575 576 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 577 unsigned int num_dests, struct dm_io_region *dests, 578 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 579 { 580 struct kcopyd_job *job; 581 582 /* 583 * Allocate an array of jobs consisting of one master job 584 * followed by SPLIT_COUNT sub jobs. 585 */ 586 job = mempool_alloc(kc->job_pool, GFP_NOIO); 587 588 /* 589 * set up for the read. 590 */ 591 job->kc = kc; 592 job->flags = flags; 593 job->read_err = 0; 594 job->write_err = 0; 595 job->rw = READ; 596 597 job->source = *from; 598 599 job->num_dests = num_dests; 600 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 601 602 job->pages = NULL; 603 604 job->fn = fn; 605 job->context = context; 606 job->master_job = job; 607 608 if (job->source.count <= SUB_JOB_SIZE) 609 dispatch_job(job); 610 else { 611 mutex_init(&job->lock); 612 job->progress = 0; 613 split_job(job); 614 } 615 616 return 0; 617 } 618 EXPORT_SYMBOL(dm_kcopyd_copy); 619 620 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 621 dm_kcopyd_notify_fn fn, void *context) 622 { 623 struct kcopyd_job *job; 624 625 job = mempool_alloc(kc->job_pool, GFP_NOIO); 626 627 memset(job, 0, sizeof(struct kcopyd_job)); 628 job->kc = kc; 629 job->fn = fn; 630 job->context = context; 631 job->master_job = job; 632 633 atomic_inc(&kc->nr_jobs); 634 635 return job; 636 } 637 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 638 639 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 640 { 641 struct kcopyd_job *job = j; 642 struct dm_kcopyd_client *kc = job->kc; 643 644 job->read_err = read_err; 645 job->write_err = write_err; 646 647 push(&kc->complete_jobs, job); 648 wake(kc); 649 } 650 EXPORT_SYMBOL(dm_kcopyd_do_callback); 651 652 /* 653 * Cancels a kcopyd job, eg. someone might be deactivating a 654 * mirror. 655 */ 656 #if 0 657 int kcopyd_cancel(struct kcopyd_job *job, int block) 658 { 659 /* FIXME: finish */ 660 return -1; 661 } 662 #endif /* 0 */ 663 664 /*----------------------------------------------------------------- 665 * Client setup 666 *---------------------------------------------------------------*/ 667 struct dm_kcopyd_client *dm_kcopyd_client_create(void) 668 { 669 int r = -ENOMEM; 670 struct dm_kcopyd_client *kc; 671 672 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 673 if (!kc) 674 return ERR_PTR(-ENOMEM); 675 676 spin_lock_init(&kc->job_lock); 677 INIT_LIST_HEAD(&kc->complete_jobs); 678 INIT_LIST_HEAD(&kc->io_jobs); 679 INIT_LIST_HEAD(&kc->pages_jobs); 680 681 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 682 if (!kc->job_pool) 683 goto bad_slab; 684 685 INIT_WORK(&kc->kcopyd_work, do_work); 686 kc->kcopyd_wq = alloc_workqueue("kcopyd", 687 WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); 688 if (!kc->kcopyd_wq) 689 goto bad_workqueue; 690 691 kc->pages = NULL; 692 kc->nr_reserved_pages = kc->nr_free_pages = 0; 693 r = client_reserve_pages(kc, RESERVE_PAGES); 694 if (r) 695 goto bad_client_pages; 696 697 kc->io_client = dm_io_client_create(); 698 if (IS_ERR(kc->io_client)) { 699 r = PTR_ERR(kc->io_client); 700 goto bad_io_client; 701 } 702 703 init_waitqueue_head(&kc->destroyq); 704 atomic_set(&kc->nr_jobs, 0); 705 706 return kc; 707 708 bad_io_client: 709 client_free_pages(kc); 710 bad_client_pages: 711 destroy_workqueue(kc->kcopyd_wq); 712 bad_workqueue: 713 mempool_destroy(kc->job_pool); 714 bad_slab: 715 kfree(kc); 716 717 return ERR_PTR(r); 718 } 719 EXPORT_SYMBOL(dm_kcopyd_client_create); 720 721 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 722 { 723 /* Wait for completion of all jobs submitted by this client. */ 724 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 725 726 BUG_ON(!list_empty(&kc->complete_jobs)); 727 BUG_ON(!list_empty(&kc->io_jobs)); 728 BUG_ON(!list_empty(&kc->pages_jobs)); 729 destroy_workqueue(kc->kcopyd_wq); 730 dm_io_client_destroy(kc->io_client); 731 client_free_pages(kc); 732 mempool_destroy(kc->job_pool); 733 kfree(kc); 734 } 735 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 736