Lines Matching +full:pd +full:- +full:node
1 // SPDX-License-Identifier: GPL-2.0
3 * padata.c - generic interface to process data streams in parallel
5 * See Documentation/core-api/padata.rst for more information.
47 static void padata_free_pd(struct parallel_data *pd);
50 static inline void padata_get_pd(struct parallel_data *pd)
52 refcount_inc(&pd->refcnt);
55 static inline void padata_put_pd_cnt(struct parallel_data *pd, int cnt)
57 if (refcount_sub_and_test(cnt, &pd->refcnt))
58 padata_free_pd(pd);
61 static inline void padata_put_pd(struct parallel_data *pd)
63 padata_put_pd_cnt(pd, 1);
66 static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
72 int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
74 return cpumask_nth(cpu_index, pd->cpumask.pcpu);
87 list_del(&pw->pw_list);
103 INIT_WORK_ONSTACK(&pw->pw_work, work_fn);
105 INIT_WORK(&pw->pw_work, work_fn);
106 pw->pw_data = data;
122 list_add(&pw->pw_list, head);
132 list_add(&pw->pw_list, &padata_free_works);
144 list_del(&cur->pw_list);
154 struct padata_priv *padata = pw->pw_data;
157 padata->parallel(padata);
165 * padata_do_parallel - padata parallelization function
172 * none found, returns -EINVAL.
183 struct padata_instance *pinst = ps->pinst;
184 struct parallel_data *pd;
190 pd = rcu_dereference_bh(ps->pd);
192 err = -EINVAL;
193 if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
196 if (!cpumask_test_cpu(*cb_cpu, pd->cpumask.cbcpu)) {
197 if (cpumask_empty(pd->cpumask.cbcpu))
201 cpu_index = *cb_cpu % cpumask_weight(pd->cpumask.cbcpu);
202 *cb_cpu = cpumask_nth(cpu_index, pd->cpumask.cbcpu);
205 err = -EBUSY;
206 if ((pinst->flags & PADATA_RESET))
209 padata_get_pd(pd);
210 padata->pd = pd;
211 padata->cb_cpu = *cb_cpu;
214 padata->seq_nr = ++pd->seq_nr;
220 padata->parallel(padata);
227 queue_work(pinst->parallel_wq, &pw->pw_work);
239 * padata_find_next - Find the next object that needs serialization.
248 static struct padata_priv *padata_find_next(struct parallel_data *pd, int cpu,
254 reorder = per_cpu_ptr(pd->reorder_list, cpu);
256 spin_lock(&reorder->lock);
257 if (list_empty(&reorder->list))
260 padata = list_entry(reorder->list.next, struct padata_priv, list);
266 if (padata->seq_nr != processed)
269 list_del_init(&padata->list);
270 spin_unlock(&reorder->lock);
274 pd->processed = processed;
275 pd->cpu = cpu;
276 spin_unlock(&reorder->lock);
282 struct parallel_data *pd = padata->pd;
283 struct padata_instance *pinst = pd->ps->pinst;
287 processed = pd->processed;
288 cpu = pd->cpu;
297 cpu = cpumask_first(pd->cpumask.pcpu);
299 cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu);
301 cb_cpu = padata->cb_cpu;
302 squeue = per_cpu_ptr(pd->squeue, cb_cpu);
304 spin_lock(&squeue->serial.lock);
305 list_add_tail(&padata->list, &squeue->serial.list);
306 queue_work_on(cb_cpu, pinst->serial_wq, &squeue->work);
313 padata = padata_find_next(pd, cpu, processed);
314 spin_unlock(&squeue->serial.lock);
321 struct parallel_data *pd;
327 pd = squeue->pd;
329 spin_lock(&squeue->serial.lock);
330 list_replace_init(&squeue->serial.list, &local_list);
331 spin_unlock(&squeue->serial.lock);
341 list_del_init(&padata->list);
343 padata->serial(padata);
348 padata_put_pd_cnt(pd, cnt);
352 * padata_do_serial - padata serialization function
361 struct parallel_data *pd = padata->pd;
362 int hashed_cpu = padata_cpu_hash(pd, padata->seq_nr);
363 struct padata_list *reorder = per_cpu_ptr(pd->reorder_list, hashed_cpu);
368 spin_lock(&reorder->lock);
370 list_for_each_prev(pos, &reorder->list) {
373 if ((signed int)(cur->seq_nr - padata->seq_nr) < 0)
376 if (padata->seq_nr != pd->processed) {
378 list_add(&padata->list, pos);
380 spin_unlock(&reorder->lock);
394 return -ENOMEM;
396 /* Restrict parallel_wq workers to pd->cpumask.pcpu. */
397 cpumask_copy(attrs->cpumask, pinst->cpumask.pcpu);
398 err = apply_workqueue_attrs(pinst->parallel_wq, attrs);
407 struct padata_mt_job_state *ps = pw->pw_data;
408 struct padata_mt_job *job = ps->job;
411 spin_lock(&ps->lock);
413 while (job->size > 0) {
416 start = job->start;
418 size = roundup(start + 1, ps->chunk_size) - start;
419 size = min(size, job->size);
422 job->start = end;
423 job->size -= size;
425 spin_unlock(&ps->lock);
426 job->thread_fn(start, end, job->fn_arg);
427 spin_lock(&ps->lock);
430 ++ps->nworks_fini;
431 done = (ps->nworks_fini == ps->nworks);
432 spin_unlock(&ps->lock);
435 complete(&ps->completion);
439 * padata_do_multithreaded - run a multithreaded job
454 if (job->size == 0)
458 nworks = max(job->size / max(job->min_chunk, job->align), 1ul);
459 nworks = min(nworks, job->max_threads);
463 job->thread_fn(job->start, job->start + job->size, job->fn_arg);
478 * Ensure chunk_size is at least 1 to prevent divide-by-0
481 ps.chunk_size = job->size / (ps.nworks * load_balance_factor);
482 ps.chunk_size = max(ps.chunk_size, job->min_chunk);
484 ps.chunk_size = roundup(ps.chunk_size, job->align);
487 if (job->numa_aware) {
493 queue_work_node(nid, system_dfl_wq, &pw->pw_work);
495 queue_work(system_dfl_wq, &pw->pw_work);
511 INIT_LIST_HEAD(&pd_list->list);
512 spin_lock_init(&pd_list->lock);
516 static void padata_init_squeues(struct parallel_data *pd)
521 for_each_cpu(cpu, pd->cpumask.cbcpu) {
522 squeue = per_cpu_ptr(pd->squeue, cpu);
523 squeue->pd = pd;
524 __padata_list_init(&squeue->serial);
525 INIT_WORK(&squeue->work, padata_serial_worker);
529 /* Initialize per-CPU reorder lists */
530 static void padata_init_reorder_list(struct parallel_data *pd)
535 for_each_cpu(cpu, pd->cpumask.pcpu) {
536 list = per_cpu_ptr(pd->reorder_list, cpu);
544 struct padata_instance *pinst = ps->pinst;
545 struct parallel_data *pd;
547 pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
548 if (!pd)
551 pd->reorder_list = alloc_percpu(struct padata_list);
552 if (!pd->reorder_list)
555 pd->squeue = alloc_percpu(struct padata_serial_queue);
556 if (!pd->squeue)
559 pd->ps = ps;
561 if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
563 if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL))
566 cpumask_and(pd->cpumask.pcpu, pinst->cpumask.pcpu, cpu_online_mask);
567 cpumask_and(pd->cpumask.cbcpu, pinst->cpumask.cbcpu, cpu_online_mask);
569 padata_init_reorder_list(pd);
570 padata_init_squeues(pd);
571 pd->seq_nr = -1;
572 refcount_set(&pd->refcnt, 1);
573 pd->cpu = cpumask_first(pd->cpumask.pcpu);
575 return pd;
578 free_cpumask_var(pd->cpumask.pcpu);
580 free_percpu(pd->squeue);
582 free_percpu(pd->reorder_list);
584 kfree(pd);
589 static void padata_free_pd(struct parallel_data *pd)
591 free_cpumask_var(pd->cpumask.pcpu);
592 free_cpumask_var(pd->cpumask.cbcpu);
593 free_percpu(pd->reorder_list);
594 free_percpu(pd->squeue);
595 kfree(pd);
600 pinst->flags |= PADATA_INIT;
605 if (!(pinst->flags & PADATA_INIT))
608 pinst->flags &= ~PADATA_INIT;
620 return -ENOMEM;
622 ps->opd = rcu_dereference_protected(ps->pd, 1);
623 rcu_assign_pointer(ps->pd, pd_new);
633 pinst->flags |= PADATA_RESET;
635 list_for_each_entry(ps, &pinst->pslist, list) {
643 list_for_each_entry_continue_reverse(ps, &pinst->pslist, list)
644 padata_put_pd(ps->opd);
646 pinst->flags &= ~PADATA_RESET;
656 pinst->flags |= PADATA_INVALID;
660 pinst->flags &= ~PADATA_INVALID;
682 cpumask_copy(pinst->cpumask.pcpu, pcpumask);
683 cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
694 * padata_set_cpumask - Sets specified by @cpumask_type cpumask to the value
707 int err = -EINVAL;
710 mutex_lock(&pinst->lock);
714 serial_mask = pinst->cpumask.cbcpu;
718 parallel_mask = pinst->cpumask.pcpu;
728 mutex_unlock(&pinst->lock);
744 if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
745 padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
757 if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
758 !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
769 return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
770 cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
773 static int padata_cpu_online(unsigned int cpu, struct hlist_node *node)
778 pinst = hlist_entry_safe(node, struct padata_instance, cpu_online_node);
782 mutex_lock(&pinst->lock);
784 mutex_unlock(&pinst->lock);
788 static int padata_cpu_dead(unsigned int cpu, struct hlist_node *node)
793 pinst = hlist_entry_safe(node, struct padata_instance, cpu_dead_node);
797 mutex_lock(&pinst->lock);
799 mutex_unlock(&pinst->lock);
810 &pinst->cpu_dead_node);
811 cpuhp_state_remove_instance_nocalls(hp_online, &pinst->cpu_online_node);
814 WARN_ON(!list_empty(&pinst->pslist));
816 free_cpumask_var(pinst->cpumask.pcpu);
817 free_cpumask_var(pinst->cpumask.cbcpu);
818 destroy_workqueue(pinst->serial_wq);
819 destroy_workqueue(pinst->parallel_wq);
847 mutex_lock(&pinst->lock);
848 if (!strcmp(attr->name, "serial_cpumask"))
849 cpumask = pinst->cpumask.cbcpu;
851 cpumask = pinst->cpumask.pcpu;
855 mutex_unlock(&pinst->lock);
856 return len < PAGE_SIZE ? len : -EINVAL;
868 return -ENOMEM;
875 mask_type = !strcmp(attr->name, "serial_cpumask") ?
898 * serial_cpumask [RW] - cpumask for serial workers
899 * parallel_cpumask [RW] - cpumask for parallel workers
913 ssize_t ret = -EIO;
917 if (pentry->show)
918 ret = pentry->show(pinst, attr, buf);
928 ssize_t ret = -EIO;
932 if (pentry->store)
933 ret = pentry->store(pinst, attr, buf, count);
950 * padata_alloc - allocate and initialize a padata instance
963 pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
965 if (!pinst->parallel_wq)
970 pinst->serial_wq = alloc_workqueue("%s_serial",
973 if (!pinst->serial_wq)
976 if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
978 if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
979 free_cpumask_var(pinst->cpumask.pcpu);
983 INIT_LIST_HEAD(&pinst->pslist);
985 cpumask_copy(pinst->cpumask.pcpu, cpu_possible_mask);
986 cpumask_copy(pinst->cpumask.cbcpu, cpu_possible_mask);
993 kobject_init(&pinst->kobj, &padata_attr_type);
994 mutex_init(&pinst->lock);
998 &pinst->cpu_online_node);
1000 &pinst->cpu_dead_node);
1008 free_cpumask_var(pinst->cpumask.pcpu);
1009 free_cpumask_var(pinst->cpumask.cbcpu);
1011 destroy_workqueue(pinst->serial_wq);
1014 destroy_workqueue(pinst->parallel_wq);
1023 * padata_free - free a padata instance
1029 kobject_put(&pinst->kobj);
1034 * padata_alloc_shell - Allocate and initialize padata shell.
1042 struct parallel_data *pd;
1049 ps->pinst = pinst;
1052 pd = padata_alloc_pd(ps);
1055 if (!pd)
1058 mutex_lock(&pinst->lock);
1059 RCU_INIT_POINTER(ps->pd, pd);
1060 list_add(&ps->list, &pinst->pslist);
1061 mutex_unlock(&pinst->lock);
1073 * padata_free_shell - free a padata shell
1079 struct parallel_data *pd;
1084 mutex_lock(&ps->pinst->lock);
1085 list_del(&ps->list);
1086 pd = rcu_dereference_protected(ps->pd, 1);
1087 padata_put_pd(pd);
1088 mutex_unlock(&ps->pinst->lock);