xref: /linux/kernel/padata.c (revision a33f32244d8550da8b4a26e277ce07d5c6d158b5)
1 /*
2  * padata.c - generic interface to process data streams in parallel
3  *
4  * Copyright (C) 2008, 2009 secunet Security Networks AG
5  * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms and conditions of the GNU General Public License,
9  * version 2, as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along with
17  * this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20 
21 #include <linux/module.h>
22 #include <linux/cpumask.h>
23 #include <linux/err.h>
24 #include <linux/cpu.h>
25 #include <linux/padata.h>
26 #include <linux/mutex.h>
27 #include <linux/sched.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 
31 #define MAX_SEQ_NR INT_MAX - NR_CPUS
32 #define MAX_OBJ_NUM 10000 * NR_CPUS
33 
34 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
35 {
36 	int cpu, target_cpu;
37 
38 	target_cpu = cpumask_first(pd->cpumask);
39 	for (cpu = 0; cpu < cpu_index; cpu++)
40 		target_cpu = cpumask_next(target_cpu, pd->cpumask);
41 
42 	return target_cpu;
43 }
44 
45 static int padata_cpu_hash(struct padata_priv *padata)
46 {
47 	int cpu_index;
48 	struct parallel_data *pd;
49 
50 	pd =  padata->pd;
51 
52 	/*
53 	 * Hash the sequence numbers to the cpus by taking
54 	 * seq_nr mod. number of cpus in use.
55 	 */
56 	cpu_index =  padata->seq_nr % cpumask_weight(pd->cpumask);
57 
58 	return padata_index_to_cpu(pd, cpu_index);
59 }
60 
61 static void padata_parallel_worker(struct work_struct *work)
62 {
63 	struct padata_queue *queue;
64 	struct parallel_data *pd;
65 	struct padata_instance *pinst;
66 	LIST_HEAD(local_list);
67 
68 	local_bh_disable();
69 	queue = container_of(work, struct padata_queue, pwork);
70 	pd = queue->pd;
71 	pinst = pd->pinst;
72 
73 	spin_lock(&queue->parallel.lock);
74 	list_replace_init(&queue->parallel.list, &local_list);
75 	spin_unlock(&queue->parallel.lock);
76 
77 	while (!list_empty(&local_list)) {
78 		struct padata_priv *padata;
79 
80 		padata = list_entry(local_list.next,
81 				    struct padata_priv, list);
82 
83 		list_del_init(&padata->list);
84 
85 		padata->parallel(padata);
86 	}
87 
88 	local_bh_enable();
89 }
90 
91 /*
92  * padata_do_parallel - padata parallelization function
93  *
94  * @pinst: padata instance
95  * @padata: object to be parallelized
96  * @cb_cpu: cpu the serialization callback function will run on,
97  *          must be in the cpumask of padata.
98  *
99  * The parallelization callback function will run with BHs off.
100  * Note: Every object which is parallelized by padata_do_parallel
101  * must be seen by padata_do_serial.
102  */
103 int padata_do_parallel(struct padata_instance *pinst,
104 		       struct padata_priv *padata, int cb_cpu)
105 {
106 	int target_cpu, err;
107 	struct padata_queue *queue;
108 	struct parallel_data *pd;
109 
110 	rcu_read_lock_bh();
111 
112 	pd = rcu_dereference(pinst->pd);
113 
114 	err = 0;
115 	if (!(pinst->flags & PADATA_INIT))
116 		goto out;
117 
118 	err =  -EBUSY;
119 	if ((pinst->flags & PADATA_RESET))
120 		goto out;
121 
122 	if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
123 		goto out;
124 
125 	err = -EINVAL;
126 	if (!cpumask_test_cpu(cb_cpu, pd->cpumask))
127 		goto out;
128 
129 	err = -EINPROGRESS;
130 	atomic_inc(&pd->refcnt);
131 	padata->pd = pd;
132 	padata->cb_cpu = cb_cpu;
133 
134 	if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr))
135 		atomic_set(&pd->seq_nr, -1);
136 
137 	padata->seq_nr = atomic_inc_return(&pd->seq_nr);
138 
139 	target_cpu = padata_cpu_hash(padata);
140 	queue = per_cpu_ptr(pd->queue, target_cpu);
141 
142 	spin_lock(&queue->parallel.lock);
143 	list_add_tail(&padata->list, &queue->parallel.list);
144 	spin_unlock(&queue->parallel.lock);
145 
146 	queue_work_on(target_cpu, pinst->wq, &queue->pwork);
147 
148 out:
149 	rcu_read_unlock_bh();
150 
151 	return err;
152 }
153 EXPORT_SYMBOL(padata_do_parallel);
154 
155 static struct padata_priv *padata_get_next(struct parallel_data *pd)
156 {
157 	int cpu, num_cpus, empty, calc_seq_nr;
158 	int seq_nr, next_nr, overrun, next_overrun;
159 	struct padata_queue *queue, *next_queue;
160 	struct padata_priv *padata;
161 	struct padata_list *reorder;
162 
163 	empty = 0;
164 	next_nr = -1;
165 	next_overrun = 0;
166 	next_queue = NULL;
167 
168 	num_cpus = cpumask_weight(pd->cpumask);
169 
170 	for_each_cpu(cpu, pd->cpumask) {
171 		queue = per_cpu_ptr(pd->queue, cpu);
172 		reorder = &queue->reorder;
173 
174 		/*
175 		 * Calculate the seq_nr of the object that should be
176 		 * next in this queue.
177 		 */
178 		overrun = 0;
179 		calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus)
180 			       + queue->cpu_index;
181 
182 		if (unlikely(calc_seq_nr > pd->max_seq_nr)) {
183 			calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1;
184 			overrun = 1;
185 		}
186 
187 		if (!list_empty(&reorder->list)) {
188 			padata = list_entry(reorder->list.next,
189 					    struct padata_priv, list);
190 
191 			seq_nr  = padata->seq_nr;
192 			BUG_ON(calc_seq_nr != seq_nr);
193 		} else {
194 			seq_nr = calc_seq_nr;
195 			empty++;
196 		}
197 
198 		if (next_nr < 0 || seq_nr < next_nr
199 		    || (next_overrun && !overrun)) {
200 			next_nr = seq_nr;
201 			next_overrun = overrun;
202 			next_queue = queue;
203 		}
204 	}
205 
206 	padata = NULL;
207 
208 	if (empty == num_cpus)
209 		goto out;
210 
211 	reorder = &next_queue->reorder;
212 
213 	if (!list_empty(&reorder->list)) {
214 		padata = list_entry(reorder->list.next,
215 				    struct padata_priv, list);
216 
217 		if (unlikely(next_overrun)) {
218 			for_each_cpu(cpu, pd->cpumask) {
219 				queue = per_cpu_ptr(pd->queue, cpu);
220 				atomic_set(&queue->num_obj, 0);
221 			}
222 		}
223 
224 		spin_lock(&reorder->lock);
225 		list_del_init(&padata->list);
226 		atomic_dec(&pd->reorder_objects);
227 		spin_unlock(&reorder->lock);
228 
229 		atomic_inc(&next_queue->num_obj);
230 
231 		goto out;
232 	}
233 
234 	if (next_nr % num_cpus == next_queue->cpu_index) {
235 		padata = ERR_PTR(-ENODATA);
236 		goto out;
237 	}
238 
239 	padata = ERR_PTR(-EINPROGRESS);
240 out:
241 	return padata;
242 }
243 
244 static void padata_reorder(struct parallel_data *pd)
245 {
246 	struct padata_priv *padata;
247 	struct padata_queue *queue;
248 	struct padata_instance *pinst = pd->pinst;
249 
250 try_again:
251 	if (!spin_trylock_bh(&pd->lock))
252 		goto out;
253 
254 	while (1) {
255 		padata = padata_get_next(pd);
256 
257 		if (!padata || PTR_ERR(padata) == -EINPROGRESS)
258 			break;
259 
260 		if (PTR_ERR(padata) == -ENODATA) {
261 			spin_unlock_bh(&pd->lock);
262 			goto out;
263 		}
264 
265 		queue = per_cpu_ptr(pd->queue, padata->cb_cpu);
266 
267 		spin_lock(&queue->serial.lock);
268 		list_add_tail(&padata->list, &queue->serial.list);
269 		spin_unlock(&queue->serial.lock);
270 
271 		queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork);
272 	}
273 
274 	spin_unlock_bh(&pd->lock);
275 
276 	if (atomic_read(&pd->reorder_objects))
277 		goto try_again;
278 
279 out:
280 	return;
281 }
282 
283 static void padata_serial_worker(struct work_struct *work)
284 {
285 	struct padata_queue *queue;
286 	struct parallel_data *pd;
287 	LIST_HEAD(local_list);
288 
289 	local_bh_disable();
290 	queue = container_of(work, struct padata_queue, swork);
291 	pd = queue->pd;
292 
293 	spin_lock(&queue->serial.lock);
294 	list_replace_init(&queue->serial.list, &local_list);
295 	spin_unlock(&queue->serial.lock);
296 
297 	while (!list_empty(&local_list)) {
298 		struct padata_priv *padata;
299 
300 		padata = list_entry(local_list.next,
301 				    struct padata_priv, list);
302 
303 		list_del_init(&padata->list);
304 
305 		padata->serial(padata);
306 		atomic_dec(&pd->refcnt);
307 	}
308 	local_bh_enable();
309 }
310 
311 /*
312  * padata_do_serial - padata serialization function
313  *
314  * @padata: object to be serialized.
315  *
316  * padata_do_serial must be called for every parallelized object.
317  * The serialization callback function will run with BHs off.
318  */
319 void padata_do_serial(struct padata_priv *padata)
320 {
321 	int cpu;
322 	struct padata_queue *queue;
323 	struct parallel_data *pd;
324 
325 	pd = padata->pd;
326 
327 	cpu = get_cpu();
328 	queue = per_cpu_ptr(pd->queue, cpu);
329 
330 	spin_lock(&queue->reorder.lock);
331 	atomic_inc(&pd->reorder_objects);
332 	list_add_tail(&padata->list, &queue->reorder.list);
333 	spin_unlock(&queue->reorder.lock);
334 
335 	put_cpu();
336 
337 	padata_reorder(pd);
338 }
339 EXPORT_SYMBOL(padata_do_serial);
340 
341 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
342 					     const struct cpumask *cpumask)
343 {
344 	int cpu, cpu_index, num_cpus;
345 	struct padata_queue *queue;
346 	struct parallel_data *pd;
347 
348 	cpu_index = 0;
349 
350 	pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
351 	if (!pd)
352 		goto err;
353 
354 	pd->queue = alloc_percpu(struct padata_queue);
355 	if (!pd->queue)
356 		goto err_free_pd;
357 
358 	if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL))
359 		goto err_free_queue;
360 
361 	for_each_possible_cpu(cpu) {
362 		queue = per_cpu_ptr(pd->queue, cpu);
363 
364 		queue->pd = pd;
365 
366 		if (cpumask_test_cpu(cpu, cpumask)
367 		    && cpumask_test_cpu(cpu, cpu_active_mask)) {
368 			queue->cpu_index = cpu_index;
369 			cpu_index++;
370 		} else
371 			queue->cpu_index = -1;
372 
373 		INIT_LIST_HEAD(&queue->reorder.list);
374 		INIT_LIST_HEAD(&queue->parallel.list);
375 		INIT_LIST_HEAD(&queue->serial.list);
376 		spin_lock_init(&queue->reorder.lock);
377 		spin_lock_init(&queue->parallel.lock);
378 		spin_lock_init(&queue->serial.lock);
379 
380 		INIT_WORK(&queue->pwork, padata_parallel_worker);
381 		INIT_WORK(&queue->swork, padata_serial_worker);
382 		atomic_set(&queue->num_obj, 0);
383 	}
384 
385 	cpumask_and(pd->cpumask, cpumask, cpu_active_mask);
386 
387 	num_cpus = cpumask_weight(pd->cpumask);
388 	pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1;
389 
390 	atomic_set(&pd->seq_nr, -1);
391 	atomic_set(&pd->reorder_objects, 0);
392 	atomic_set(&pd->refcnt, 0);
393 	pd->pinst = pinst;
394 	spin_lock_init(&pd->lock);
395 
396 	return pd;
397 
398 err_free_queue:
399 	free_percpu(pd->queue);
400 err_free_pd:
401 	kfree(pd);
402 err:
403 	return NULL;
404 }
405 
406 static void padata_free_pd(struct parallel_data *pd)
407 {
408 	free_cpumask_var(pd->cpumask);
409 	free_percpu(pd->queue);
410 	kfree(pd);
411 }
412 
413 static void padata_replace(struct padata_instance *pinst,
414 			   struct parallel_data *pd_new)
415 {
416 	struct parallel_data *pd_old = pinst->pd;
417 
418 	pinst->flags |= PADATA_RESET;
419 
420 	rcu_assign_pointer(pinst->pd, pd_new);
421 
422 	synchronize_rcu();
423 
424 	while (atomic_read(&pd_old->refcnt) != 0)
425 		yield();
426 
427 	flush_workqueue(pinst->wq);
428 
429 	padata_free_pd(pd_old);
430 
431 	pinst->flags &= ~PADATA_RESET;
432 }
433 
434 /*
435  * padata_set_cpumask - set the cpumask that padata should use
436  *
437  * @pinst: padata instance
438  * @cpumask: the cpumask to use
439  */
440 int padata_set_cpumask(struct padata_instance *pinst,
441 			cpumask_var_t cpumask)
442 {
443 	struct parallel_data *pd;
444 	int err = 0;
445 
446 	might_sleep();
447 
448 	mutex_lock(&pinst->lock);
449 
450 	pd = padata_alloc_pd(pinst, cpumask);
451 	if (!pd) {
452 		err = -ENOMEM;
453 		goto out;
454 	}
455 
456 	cpumask_copy(pinst->cpumask, cpumask);
457 
458 	padata_replace(pinst, pd);
459 
460 out:
461 	mutex_unlock(&pinst->lock);
462 
463 	return err;
464 }
465 EXPORT_SYMBOL(padata_set_cpumask);
466 
467 static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
468 {
469 	struct parallel_data *pd;
470 
471 	if (cpumask_test_cpu(cpu, cpu_active_mask)) {
472 		pd = padata_alloc_pd(pinst, pinst->cpumask);
473 		if (!pd)
474 			return -ENOMEM;
475 
476 		padata_replace(pinst, pd);
477 	}
478 
479 	return 0;
480 }
481 
482 /*
483  * padata_add_cpu - add a cpu to the padata cpumask
484  *
485  * @pinst: padata instance
486  * @cpu: cpu to add
487  */
488 int padata_add_cpu(struct padata_instance *pinst, int cpu)
489 {
490 	int err;
491 
492 	might_sleep();
493 
494 	mutex_lock(&pinst->lock);
495 
496 	cpumask_set_cpu(cpu, pinst->cpumask);
497 	err = __padata_add_cpu(pinst, cpu);
498 
499 	mutex_unlock(&pinst->lock);
500 
501 	return err;
502 }
503 EXPORT_SYMBOL(padata_add_cpu);
504 
505 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
506 {
507 	struct parallel_data *pd;
508 
509 	if (cpumask_test_cpu(cpu, cpu_online_mask)) {
510 		pd = padata_alloc_pd(pinst, pinst->cpumask);
511 		if (!pd)
512 			return -ENOMEM;
513 
514 		padata_replace(pinst, pd);
515 	}
516 
517 	return 0;
518 }
519 
520 /*
521  * padata_remove_cpu - remove a cpu from the padata cpumask
522  *
523  * @pinst: padata instance
524  * @cpu: cpu to remove
525  */
526 int padata_remove_cpu(struct padata_instance *pinst, int cpu)
527 {
528 	int err;
529 
530 	might_sleep();
531 
532 	mutex_lock(&pinst->lock);
533 
534 	cpumask_clear_cpu(cpu, pinst->cpumask);
535 	err = __padata_remove_cpu(pinst, cpu);
536 
537 	mutex_unlock(&pinst->lock);
538 
539 	return err;
540 }
541 EXPORT_SYMBOL(padata_remove_cpu);
542 
543 /*
544  * padata_start - start the parallel processing
545  *
546  * @pinst: padata instance to start
547  */
548 void padata_start(struct padata_instance *pinst)
549 {
550 	might_sleep();
551 
552 	mutex_lock(&pinst->lock);
553 	pinst->flags |= PADATA_INIT;
554 	mutex_unlock(&pinst->lock);
555 }
556 EXPORT_SYMBOL(padata_start);
557 
558 /*
559  * padata_stop - stop the parallel processing
560  *
561  * @pinst: padata instance to stop
562  */
563 void padata_stop(struct padata_instance *pinst)
564 {
565 	might_sleep();
566 
567 	mutex_lock(&pinst->lock);
568 	pinst->flags &= ~PADATA_INIT;
569 	mutex_unlock(&pinst->lock);
570 }
571 EXPORT_SYMBOL(padata_stop);
572 
573 static int __cpuinit padata_cpu_callback(struct notifier_block *nfb,
574 					 unsigned long action, void *hcpu)
575 {
576 	int err;
577 	struct padata_instance *pinst;
578 	int cpu = (unsigned long)hcpu;
579 
580 	pinst = container_of(nfb, struct padata_instance, cpu_notifier);
581 
582 	switch (action) {
583 	case CPU_ONLINE:
584 	case CPU_ONLINE_FROZEN:
585 		if (!cpumask_test_cpu(cpu, pinst->cpumask))
586 			break;
587 		mutex_lock(&pinst->lock);
588 		err = __padata_add_cpu(pinst, cpu);
589 		mutex_unlock(&pinst->lock);
590 		if (err)
591 			return NOTIFY_BAD;
592 		break;
593 
594 	case CPU_DOWN_PREPARE:
595 	case CPU_DOWN_PREPARE_FROZEN:
596 		if (!cpumask_test_cpu(cpu, pinst->cpumask))
597 			break;
598 		mutex_lock(&pinst->lock);
599 		err = __padata_remove_cpu(pinst, cpu);
600 		mutex_unlock(&pinst->lock);
601 		if (err)
602 			return NOTIFY_BAD;
603 		break;
604 
605 	case CPU_UP_CANCELED:
606 	case CPU_UP_CANCELED_FROZEN:
607 		if (!cpumask_test_cpu(cpu, pinst->cpumask))
608 			break;
609 		mutex_lock(&pinst->lock);
610 		__padata_remove_cpu(pinst, cpu);
611 		mutex_unlock(&pinst->lock);
612 
613 	case CPU_DOWN_FAILED:
614 	case CPU_DOWN_FAILED_FROZEN:
615 		if (!cpumask_test_cpu(cpu, pinst->cpumask))
616 			break;
617 		mutex_lock(&pinst->lock);
618 		__padata_add_cpu(pinst, cpu);
619 		mutex_unlock(&pinst->lock);
620 	}
621 
622 	return NOTIFY_OK;
623 }
624 
625 /*
626  * padata_alloc - allocate and initialize a padata instance
627  *
628  * @cpumask: cpumask that padata uses for parallelization
629  * @wq: workqueue to use for the allocated padata instance
630  */
631 struct padata_instance *padata_alloc(const struct cpumask *cpumask,
632 				     struct workqueue_struct *wq)
633 {
634 	int err;
635 	struct padata_instance *pinst;
636 	struct parallel_data *pd;
637 
638 	pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
639 	if (!pinst)
640 		goto err;
641 
642 	pd = padata_alloc_pd(pinst, cpumask);
643 	if (!pd)
644 		goto err_free_inst;
645 
646 	if (!alloc_cpumask_var(&pinst->cpumask, GFP_KERNEL))
647 		goto err_free_pd;
648 
649 	rcu_assign_pointer(pinst->pd, pd);
650 
651 	pinst->wq = wq;
652 
653 	cpumask_copy(pinst->cpumask, cpumask);
654 
655 	pinst->flags = 0;
656 
657 	pinst->cpu_notifier.notifier_call = padata_cpu_callback;
658 	pinst->cpu_notifier.priority = 0;
659 	err = register_hotcpu_notifier(&pinst->cpu_notifier);
660 	if (err)
661 		goto err_free_cpumask;
662 
663 	mutex_init(&pinst->lock);
664 
665 	return pinst;
666 
667 err_free_cpumask:
668 	free_cpumask_var(pinst->cpumask);
669 err_free_pd:
670 	padata_free_pd(pd);
671 err_free_inst:
672 	kfree(pinst);
673 err:
674 	return NULL;
675 }
676 EXPORT_SYMBOL(padata_alloc);
677 
678 /*
679  * padata_free - free a padata instance
680  *
681  * @ padata_inst: padata instance to free
682  */
683 void padata_free(struct padata_instance *pinst)
684 {
685 	padata_stop(pinst);
686 
687 	synchronize_rcu();
688 
689 	while (atomic_read(&pinst->pd->refcnt) != 0)
690 		yield();
691 
692 	unregister_hotcpu_notifier(&pinst->cpu_notifier);
693 	padata_free_pd(pinst->pd);
694 	free_cpumask_var(pinst->cpumask);
695 	kfree(pinst);
696 }
697 EXPORT_SYMBOL(padata_free);
698