xref: /linux/drivers/md/dm-kcopyd.c (revision 60e13231561b3a4c5269bfa1ef6c0569ad6f28ec)
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11 
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/device-mapper.h>
26 #include <linux/dm-kcopyd.h>
27 
28 #include "dm.h"
29 
30 #define SUB_JOB_SIZE	128
31 #define SPLIT_COUNT	8
32 #define MIN_JOBS	8
33 #define RESERVE_PAGES	(DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
34 
35 /*-----------------------------------------------------------------
36  * Each kcopyd client has its own little pool of preallocated
37  * pages for kcopyd io.
38  *---------------------------------------------------------------*/
39 struct dm_kcopyd_client {
40 	struct page_list *pages;
41 	unsigned nr_reserved_pages;
42 	unsigned nr_free_pages;
43 
44 	struct dm_io_client *io_client;
45 
46 	wait_queue_head_t destroyq;
47 	atomic_t nr_jobs;
48 
49 	mempool_t *job_pool;
50 
51 	struct workqueue_struct *kcopyd_wq;
52 	struct work_struct kcopyd_work;
53 
54 /*
55  * We maintain three lists of jobs:
56  *
57  * i)   jobs waiting for pages
58  * ii)  jobs that have pages, and are waiting for the io to be issued.
59  * iii) jobs that have completed.
60  *
61  * All three of these are protected by job_lock.
62  */
63 	spinlock_t job_lock;
64 	struct list_head complete_jobs;
65 	struct list_head io_jobs;
66 	struct list_head pages_jobs;
67 };
68 
69 static void wake(struct dm_kcopyd_client *kc)
70 {
71 	queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
72 }
73 
74 /*
75  * Obtain one page for the use of kcopyd.
76  */
77 static struct page_list *alloc_pl(gfp_t gfp)
78 {
79 	struct page_list *pl;
80 
81 	pl = kmalloc(sizeof(*pl), gfp);
82 	if (!pl)
83 		return NULL;
84 
85 	pl->page = alloc_page(gfp);
86 	if (!pl->page) {
87 		kfree(pl);
88 		return NULL;
89 	}
90 
91 	return pl;
92 }
93 
94 static void free_pl(struct page_list *pl)
95 {
96 	__free_page(pl->page);
97 	kfree(pl);
98 }
99 
100 /*
101  * Add the provided pages to a client's free page list, releasing
102  * back to the system any beyond the reserved_pages limit.
103  */
104 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
105 {
106 	struct page_list *next;
107 
108 	do {
109 		next = pl->next;
110 
111 		if (kc->nr_free_pages >= kc->nr_reserved_pages)
112 			free_pl(pl);
113 		else {
114 			pl->next = kc->pages;
115 			kc->pages = pl;
116 			kc->nr_free_pages++;
117 		}
118 
119 		pl = next;
120 	} while (pl);
121 }
122 
123 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
124 			    unsigned int nr, struct page_list **pages)
125 {
126 	struct page_list *pl;
127 
128 	*pages = NULL;
129 
130 	do {
131 		pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
132 		if (unlikely(!pl)) {
133 			/* Use reserved pages */
134 			pl = kc->pages;
135 			if (unlikely(!pl))
136 				goto out_of_memory;
137 			kc->pages = pl->next;
138 			kc->nr_free_pages--;
139 		}
140 		pl->next = *pages;
141 		*pages = pl;
142 	} while (--nr);
143 
144 	return 0;
145 
146 out_of_memory:
147 	if (*pages)
148 		kcopyd_put_pages(kc, *pages);
149 	return -ENOMEM;
150 }
151 
152 /*
153  * These three functions resize the page pool.
154  */
155 static void drop_pages(struct page_list *pl)
156 {
157 	struct page_list *next;
158 
159 	while (pl) {
160 		next = pl->next;
161 		free_pl(pl);
162 		pl = next;
163 	}
164 }
165 
166 /*
167  * Allocate and reserve nr_pages for the use of a specific client.
168  */
169 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
170 {
171 	unsigned i;
172 	struct page_list *pl = NULL, *next;
173 
174 	for (i = 0; i < nr_pages; i++) {
175 		next = alloc_pl(GFP_KERNEL);
176 		if (!next) {
177 			if (pl)
178 				drop_pages(pl);
179 			return -ENOMEM;
180 		}
181 		next->next = pl;
182 		pl = next;
183 	}
184 
185 	kc->nr_reserved_pages += nr_pages;
186 	kcopyd_put_pages(kc, pl);
187 
188 	return 0;
189 }
190 
191 static void client_free_pages(struct dm_kcopyd_client *kc)
192 {
193 	BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
194 	drop_pages(kc->pages);
195 	kc->pages = NULL;
196 	kc->nr_free_pages = kc->nr_reserved_pages = 0;
197 }
198 
199 /*-----------------------------------------------------------------
200  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
201  * for this reason we use a mempool to prevent the client from
202  * ever having to do io (which could cause a deadlock).
203  *---------------------------------------------------------------*/
204 struct kcopyd_job {
205 	struct dm_kcopyd_client *kc;
206 	struct list_head list;
207 	unsigned long flags;
208 
209 	/*
210 	 * Error state of the job.
211 	 */
212 	int read_err;
213 	unsigned long write_err;
214 
215 	/*
216 	 * Either READ or WRITE
217 	 */
218 	int rw;
219 	struct dm_io_region source;
220 
221 	/*
222 	 * The destinations for the transfer.
223 	 */
224 	unsigned int num_dests;
225 	struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
226 
227 	struct page_list *pages;
228 
229 	/*
230 	 * Set this to ensure you are notified when the job has
231 	 * completed.  'context' is for callback to use.
232 	 */
233 	dm_kcopyd_notify_fn fn;
234 	void *context;
235 
236 	/*
237 	 * These fields are only used if the job has been split
238 	 * into more manageable parts.
239 	 */
240 	struct mutex lock;
241 	atomic_t sub_jobs;
242 	sector_t progress;
243 
244 	struct kcopyd_job *master_job;
245 };
246 
247 static struct kmem_cache *_job_cache;
248 
249 int __init dm_kcopyd_init(void)
250 {
251 	_job_cache = kmem_cache_create("kcopyd_job",
252 				sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
253 				__alignof__(struct kcopyd_job), 0, NULL);
254 	if (!_job_cache)
255 		return -ENOMEM;
256 
257 	return 0;
258 }
259 
260 void dm_kcopyd_exit(void)
261 {
262 	kmem_cache_destroy(_job_cache);
263 	_job_cache = NULL;
264 }
265 
266 /*
267  * Functions to push and pop a job onto the head of a given job
268  * list.
269  */
270 static struct kcopyd_job *pop(struct list_head *jobs,
271 			      struct dm_kcopyd_client *kc)
272 {
273 	struct kcopyd_job *job = NULL;
274 	unsigned long flags;
275 
276 	spin_lock_irqsave(&kc->job_lock, flags);
277 
278 	if (!list_empty(jobs)) {
279 		job = list_entry(jobs->next, struct kcopyd_job, list);
280 		list_del(&job->list);
281 	}
282 	spin_unlock_irqrestore(&kc->job_lock, flags);
283 
284 	return job;
285 }
286 
287 static void push(struct list_head *jobs, struct kcopyd_job *job)
288 {
289 	unsigned long flags;
290 	struct dm_kcopyd_client *kc = job->kc;
291 
292 	spin_lock_irqsave(&kc->job_lock, flags);
293 	list_add_tail(&job->list, jobs);
294 	spin_unlock_irqrestore(&kc->job_lock, flags);
295 }
296 
297 
298 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
299 {
300 	unsigned long flags;
301 	struct dm_kcopyd_client *kc = job->kc;
302 
303 	spin_lock_irqsave(&kc->job_lock, flags);
304 	list_add(&job->list, jobs);
305 	spin_unlock_irqrestore(&kc->job_lock, flags);
306 }
307 
308 /*
309  * These three functions process 1 item from the corresponding
310  * job list.
311  *
312  * They return:
313  * < 0: error
314  *   0: success
315  * > 0: can't process yet.
316  */
317 static int run_complete_job(struct kcopyd_job *job)
318 {
319 	void *context = job->context;
320 	int read_err = job->read_err;
321 	unsigned long write_err = job->write_err;
322 	dm_kcopyd_notify_fn fn = job->fn;
323 	struct dm_kcopyd_client *kc = job->kc;
324 
325 	if (job->pages)
326 		kcopyd_put_pages(kc, job->pages);
327 	/*
328 	 * If this is the master job, the sub jobs have already
329 	 * completed so we can free everything.
330 	 */
331 	if (job->master_job == job)
332 		mempool_free(job, kc->job_pool);
333 	fn(read_err, write_err, context);
334 
335 	if (atomic_dec_and_test(&kc->nr_jobs))
336 		wake_up(&kc->destroyq);
337 
338 	return 0;
339 }
340 
341 static void complete_io(unsigned long error, void *context)
342 {
343 	struct kcopyd_job *job = (struct kcopyd_job *) context;
344 	struct dm_kcopyd_client *kc = job->kc;
345 
346 	if (error) {
347 		if (job->rw == WRITE)
348 			job->write_err |= error;
349 		else
350 			job->read_err = 1;
351 
352 		if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
353 			push(&kc->complete_jobs, job);
354 			wake(kc);
355 			return;
356 		}
357 	}
358 
359 	if (job->rw == WRITE)
360 		push(&kc->complete_jobs, job);
361 
362 	else {
363 		job->rw = WRITE;
364 		push(&kc->io_jobs, job);
365 	}
366 
367 	wake(kc);
368 }
369 
370 /*
371  * Request io on as many buffer heads as we can currently get for
372  * a particular job.
373  */
374 static int run_io_job(struct kcopyd_job *job)
375 {
376 	int r;
377 	struct dm_io_request io_req = {
378 		.bi_rw = job->rw,
379 		.mem.type = DM_IO_PAGE_LIST,
380 		.mem.ptr.pl = job->pages,
381 		.mem.offset = 0,
382 		.notify.fn = complete_io,
383 		.notify.context = job,
384 		.client = job->kc->io_client,
385 	};
386 
387 	if (job->rw == READ)
388 		r = dm_io(&io_req, 1, &job->source, NULL);
389 	else
390 		r = dm_io(&io_req, job->num_dests, job->dests, NULL);
391 
392 	return r;
393 }
394 
395 static int run_pages_job(struct kcopyd_job *job)
396 {
397 	int r;
398 	unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
399 
400 	r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
401 	if (!r) {
402 		/* this job is ready for io */
403 		push(&job->kc->io_jobs, job);
404 		return 0;
405 	}
406 
407 	if (r == -ENOMEM)
408 		/* can't complete now */
409 		return 1;
410 
411 	return r;
412 }
413 
414 /*
415  * Run through a list for as long as possible.  Returns the count
416  * of successful jobs.
417  */
418 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
419 			int (*fn) (struct kcopyd_job *))
420 {
421 	struct kcopyd_job *job;
422 	int r, count = 0;
423 
424 	while ((job = pop(jobs, kc))) {
425 
426 		r = fn(job);
427 
428 		if (r < 0) {
429 			/* error this rogue job */
430 			if (job->rw == WRITE)
431 				job->write_err = (unsigned long) -1L;
432 			else
433 				job->read_err = 1;
434 			push(&kc->complete_jobs, job);
435 			break;
436 		}
437 
438 		if (r > 0) {
439 			/*
440 			 * We couldn't service this job ATM, so
441 			 * push this job back onto the list.
442 			 */
443 			push_head(jobs, job);
444 			break;
445 		}
446 
447 		count++;
448 	}
449 
450 	return count;
451 }
452 
453 /*
454  * kcopyd does this every time it's woken up.
455  */
456 static void do_work(struct work_struct *work)
457 {
458 	struct dm_kcopyd_client *kc = container_of(work,
459 					struct dm_kcopyd_client, kcopyd_work);
460 	struct blk_plug plug;
461 
462 	/*
463 	 * The order that these are called is *very* important.
464 	 * complete jobs can free some pages for pages jobs.
465 	 * Pages jobs when successful will jump onto the io jobs
466 	 * list.  io jobs call wake when they complete and it all
467 	 * starts again.
468 	 */
469 	blk_start_plug(&plug);
470 	process_jobs(&kc->complete_jobs, kc, run_complete_job);
471 	process_jobs(&kc->pages_jobs, kc, run_pages_job);
472 	process_jobs(&kc->io_jobs, kc, run_io_job);
473 	blk_finish_plug(&plug);
474 }
475 
476 /*
477  * If we are copying a small region we just dispatch a single job
478  * to do the copy, otherwise the io has to be split up into many
479  * jobs.
480  */
481 static void dispatch_job(struct kcopyd_job *job)
482 {
483 	struct dm_kcopyd_client *kc = job->kc;
484 	atomic_inc(&kc->nr_jobs);
485 	if (unlikely(!job->source.count))
486 		push(&kc->complete_jobs, job);
487 	else
488 		push(&kc->pages_jobs, job);
489 	wake(kc);
490 }
491 
492 static void segment_complete(int read_err, unsigned long write_err,
493 			     void *context)
494 {
495 	/* FIXME: tidy this function */
496 	sector_t progress = 0;
497 	sector_t count = 0;
498 	struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
499 	struct kcopyd_job *job = sub_job->master_job;
500 	struct dm_kcopyd_client *kc = job->kc;
501 
502 	mutex_lock(&job->lock);
503 
504 	/* update the error */
505 	if (read_err)
506 		job->read_err = 1;
507 
508 	if (write_err)
509 		job->write_err |= write_err;
510 
511 	/*
512 	 * Only dispatch more work if there hasn't been an error.
513 	 */
514 	if ((!job->read_err && !job->write_err) ||
515 	    test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
516 		/* get the next chunk of work */
517 		progress = job->progress;
518 		count = job->source.count - progress;
519 		if (count) {
520 			if (count > SUB_JOB_SIZE)
521 				count = SUB_JOB_SIZE;
522 
523 			job->progress += count;
524 		}
525 	}
526 	mutex_unlock(&job->lock);
527 
528 	if (count) {
529 		int i;
530 
531 		*sub_job = *job;
532 		sub_job->source.sector += progress;
533 		sub_job->source.count = count;
534 
535 		for (i = 0; i < job->num_dests; i++) {
536 			sub_job->dests[i].sector += progress;
537 			sub_job->dests[i].count = count;
538 		}
539 
540 		sub_job->fn = segment_complete;
541 		sub_job->context = sub_job;
542 		dispatch_job(sub_job);
543 
544 	} else if (atomic_dec_and_test(&job->sub_jobs)) {
545 
546 		/*
547 		 * Queue the completion callback to the kcopyd thread.
548 		 *
549 		 * Some callers assume that all the completions are called
550 		 * from a single thread and don't race with each other.
551 		 *
552 		 * We must not call the callback directly here because this
553 		 * code may not be executing in the thread.
554 		 */
555 		push(&kc->complete_jobs, job);
556 		wake(kc);
557 	}
558 }
559 
560 /*
561  * Create some sub jobs to share the work between them.
562  */
563 static void split_job(struct kcopyd_job *master_job)
564 {
565 	int i;
566 
567 	atomic_inc(&master_job->kc->nr_jobs);
568 
569 	atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
570 	for (i = 0; i < SPLIT_COUNT; i++) {
571 		master_job[i + 1].master_job = master_job;
572 		segment_complete(0, 0u, &master_job[i + 1]);
573 	}
574 }
575 
576 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
577 		   unsigned int num_dests, struct dm_io_region *dests,
578 		   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
579 {
580 	struct kcopyd_job *job;
581 
582 	/*
583 	 * Allocate an array of jobs consisting of one master job
584 	 * followed by SPLIT_COUNT sub jobs.
585 	 */
586 	job = mempool_alloc(kc->job_pool, GFP_NOIO);
587 
588 	/*
589 	 * set up for the read.
590 	 */
591 	job->kc = kc;
592 	job->flags = flags;
593 	job->read_err = 0;
594 	job->write_err = 0;
595 	job->rw = READ;
596 
597 	job->source = *from;
598 
599 	job->num_dests = num_dests;
600 	memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
601 
602 	job->pages = NULL;
603 
604 	job->fn = fn;
605 	job->context = context;
606 	job->master_job = job;
607 
608 	if (job->source.count <= SUB_JOB_SIZE)
609 		dispatch_job(job);
610 	else {
611 		mutex_init(&job->lock);
612 		job->progress = 0;
613 		split_job(job);
614 	}
615 
616 	return 0;
617 }
618 EXPORT_SYMBOL(dm_kcopyd_copy);
619 
620 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
621 				 dm_kcopyd_notify_fn fn, void *context)
622 {
623 	struct kcopyd_job *job;
624 
625 	job = mempool_alloc(kc->job_pool, GFP_NOIO);
626 
627 	memset(job, 0, sizeof(struct kcopyd_job));
628 	job->kc = kc;
629 	job->fn = fn;
630 	job->context = context;
631 	job->master_job = job;
632 
633 	atomic_inc(&kc->nr_jobs);
634 
635 	return job;
636 }
637 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
638 
639 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
640 {
641 	struct kcopyd_job *job = j;
642 	struct dm_kcopyd_client *kc = job->kc;
643 
644 	job->read_err = read_err;
645 	job->write_err = write_err;
646 
647 	push(&kc->complete_jobs, job);
648 	wake(kc);
649 }
650 EXPORT_SYMBOL(dm_kcopyd_do_callback);
651 
652 /*
653  * Cancels a kcopyd job, eg. someone might be deactivating a
654  * mirror.
655  */
656 #if 0
657 int kcopyd_cancel(struct kcopyd_job *job, int block)
658 {
659 	/* FIXME: finish */
660 	return -1;
661 }
662 #endif  /*  0  */
663 
664 /*-----------------------------------------------------------------
665  * Client setup
666  *---------------------------------------------------------------*/
667 struct dm_kcopyd_client *dm_kcopyd_client_create(void)
668 {
669 	int r = -ENOMEM;
670 	struct dm_kcopyd_client *kc;
671 
672 	kc = kmalloc(sizeof(*kc), GFP_KERNEL);
673 	if (!kc)
674 		return ERR_PTR(-ENOMEM);
675 
676 	spin_lock_init(&kc->job_lock);
677 	INIT_LIST_HEAD(&kc->complete_jobs);
678 	INIT_LIST_HEAD(&kc->io_jobs);
679 	INIT_LIST_HEAD(&kc->pages_jobs);
680 
681 	kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
682 	if (!kc->job_pool)
683 		goto bad_slab;
684 
685 	INIT_WORK(&kc->kcopyd_work, do_work);
686 	kc->kcopyd_wq = alloc_workqueue("kcopyd",
687 					WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
688 	if (!kc->kcopyd_wq)
689 		goto bad_workqueue;
690 
691 	kc->pages = NULL;
692 	kc->nr_reserved_pages = kc->nr_free_pages = 0;
693 	r = client_reserve_pages(kc, RESERVE_PAGES);
694 	if (r)
695 		goto bad_client_pages;
696 
697 	kc->io_client = dm_io_client_create();
698 	if (IS_ERR(kc->io_client)) {
699 		r = PTR_ERR(kc->io_client);
700 		goto bad_io_client;
701 	}
702 
703 	init_waitqueue_head(&kc->destroyq);
704 	atomic_set(&kc->nr_jobs, 0);
705 
706 	return kc;
707 
708 bad_io_client:
709 	client_free_pages(kc);
710 bad_client_pages:
711 	destroy_workqueue(kc->kcopyd_wq);
712 bad_workqueue:
713 	mempool_destroy(kc->job_pool);
714 bad_slab:
715 	kfree(kc);
716 
717 	return ERR_PTR(r);
718 }
719 EXPORT_SYMBOL(dm_kcopyd_client_create);
720 
721 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
722 {
723 	/* Wait for completion of all jobs submitted by this client. */
724 	wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
725 
726 	BUG_ON(!list_empty(&kc->complete_jobs));
727 	BUG_ON(!list_empty(&kc->io_jobs));
728 	BUG_ON(!list_empty(&kc->pages_jobs));
729 	destroy_workqueue(kc->kcopyd_wq);
730 	dm_io_client_destroy(kc->io_client);
731 	client_free_pages(kc);
732 	mempool_destroy(kc->job_pool);
733 	kfree(kc);
734 }
735 EXPORT_SYMBOL(dm_kcopyd_client_destroy);
736