1 /* 2 * padata.c - generic interface to process data streams in parallel 3 * 4 * Copyright (C) 2008, 2009 secunet Security Networks AG 5 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com> 6 * 7 * This program is free software; you can redistribute it and/or modify it 8 * under the terms and conditions of the GNU General Public License, 9 * version 2, as published by the Free Software Foundation. 10 * 11 * This program is distributed in the hope it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 14 * more details. 15 * 16 * You should have received a copy of the GNU General Public License along with 17 * this program; if not, write to the Free Software Foundation, Inc., 18 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. 19 */ 20 21 #include <linux/module.h> 22 #include <linux/cpumask.h> 23 #include <linux/err.h> 24 #include <linux/cpu.h> 25 #include <linux/padata.h> 26 #include <linux/mutex.h> 27 #include <linux/sched.h> 28 #include <linux/slab.h> 29 #include <linux/rcupdate.h> 30 31 #define MAX_SEQ_NR INT_MAX - NR_CPUS 32 #define MAX_OBJ_NUM 1000 33 34 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) 35 { 36 int cpu, target_cpu; 37 38 target_cpu = cpumask_first(pd->cpumask); 39 for (cpu = 0; cpu < cpu_index; cpu++) 40 target_cpu = cpumask_next(target_cpu, pd->cpumask); 41 42 return target_cpu; 43 } 44 45 static int padata_cpu_hash(struct padata_priv *padata) 46 { 47 int cpu_index; 48 struct parallel_data *pd; 49 50 pd = padata->pd; 51 52 /* 53 * Hash the sequence numbers to the cpus by taking 54 * seq_nr mod. number of cpus in use. 55 */ 56 cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask); 57 58 return padata_index_to_cpu(pd, cpu_index); 59 } 60 61 static void padata_parallel_worker(struct work_struct *work) 62 { 63 struct padata_queue *queue; 64 struct parallel_data *pd; 65 struct padata_instance *pinst; 66 LIST_HEAD(local_list); 67 68 local_bh_disable(); 69 queue = container_of(work, struct padata_queue, pwork); 70 pd = queue->pd; 71 pinst = pd->pinst; 72 73 spin_lock(&queue->parallel.lock); 74 list_replace_init(&queue->parallel.list, &local_list); 75 spin_unlock(&queue->parallel.lock); 76 77 while (!list_empty(&local_list)) { 78 struct padata_priv *padata; 79 80 padata = list_entry(local_list.next, 81 struct padata_priv, list); 82 83 list_del_init(&padata->list); 84 85 padata->parallel(padata); 86 } 87 88 local_bh_enable(); 89 } 90 91 /** 92 * padata_do_parallel - padata parallelization function 93 * 94 * @pinst: padata instance 95 * @padata: object to be parallelized 96 * @cb_cpu: cpu the serialization callback function will run on, 97 * must be in the cpumask of padata. 98 * 99 * The parallelization callback function will run with BHs off. 100 * Note: Every object which is parallelized by padata_do_parallel 101 * must be seen by padata_do_serial. 102 */ 103 int padata_do_parallel(struct padata_instance *pinst, 104 struct padata_priv *padata, int cb_cpu) 105 { 106 int target_cpu, err; 107 struct padata_queue *queue; 108 struct parallel_data *pd; 109 110 rcu_read_lock_bh(); 111 112 pd = rcu_dereference(pinst->pd); 113 114 err = 0; 115 if (!(pinst->flags & PADATA_INIT)) 116 goto out; 117 118 err = -EBUSY; 119 if ((pinst->flags & PADATA_RESET)) 120 goto out; 121 122 if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM) 123 goto out; 124 125 err = -EINVAL; 126 if (!cpumask_test_cpu(cb_cpu, pd->cpumask)) 127 goto out; 128 129 err = -EINPROGRESS; 130 atomic_inc(&pd->refcnt); 131 padata->pd = pd; 132 padata->cb_cpu = cb_cpu; 133 134 if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr)) 135 atomic_set(&pd->seq_nr, -1); 136 137 padata->seq_nr = atomic_inc_return(&pd->seq_nr); 138 139 target_cpu = padata_cpu_hash(padata); 140 queue = per_cpu_ptr(pd->queue, target_cpu); 141 142 spin_lock(&queue->parallel.lock); 143 list_add_tail(&padata->list, &queue->parallel.list); 144 spin_unlock(&queue->parallel.lock); 145 146 queue_work_on(target_cpu, pinst->wq, &queue->pwork); 147 148 out: 149 rcu_read_unlock_bh(); 150 151 return err; 152 } 153 EXPORT_SYMBOL(padata_do_parallel); 154 155 /* 156 * padata_get_next - Get the next object that needs serialization. 157 * 158 * Return values are: 159 * 160 * A pointer to the control struct of the next object that needs 161 * serialization, if present in one of the percpu reorder queues. 162 * 163 * NULL, if all percpu reorder queues are empty. 164 * 165 * -EINPROGRESS, if the next object that needs serialization will 166 * be parallel processed by another cpu and is not yet present in 167 * the cpu's reorder queue. 168 * 169 * -ENODATA, if this cpu has to do the parallel processing for 170 * the next object. 171 */ 172 static struct padata_priv *padata_get_next(struct parallel_data *pd) 173 { 174 int cpu, num_cpus, empty, calc_seq_nr; 175 int seq_nr, next_nr, overrun, next_overrun; 176 struct padata_queue *queue, *next_queue; 177 struct padata_priv *padata; 178 struct padata_list *reorder; 179 180 empty = 0; 181 next_nr = -1; 182 next_overrun = 0; 183 next_queue = NULL; 184 185 num_cpus = cpumask_weight(pd->cpumask); 186 187 for_each_cpu(cpu, pd->cpumask) { 188 queue = per_cpu_ptr(pd->queue, cpu); 189 reorder = &queue->reorder; 190 191 /* 192 * Calculate the seq_nr of the object that should be 193 * next in this reorder queue. 194 */ 195 overrun = 0; 196 calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus) 197 + queue->cpu_index; 198 199 if (unlikely(calc_seq_nr > pd->max_seq_nr)) { 200 calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1; 201 overrun = 1; 202 } 203 204 if (!list_empty(&reorder->list)) { 205 padata = list_entry(reorder->list.next, 206 struct padata_priv, list); 207 208 seq_nr = padata->seq_nr; 209 BUG_ON(calc_seq_nr != seq_nr); 210 } else { 211 seq_nr = calc_seq_nr; 212 empty++; 213 } 214 215 if (next_nr < 0 || seq_nr < next_nr 216 || (next_overrun && !overrun)) { 217 next_nr = seq_nr; 218 next_overrun = overrun; 219 next_queue = queue; 220 } 221 } 222 223 padata = NULL; 224 225 if (empty == num_cpus) 226 goto out; 227 228 reorder = &next_queue->reorder; 229 230 if (!list_empty(&reorder->list)) { 231 padata = list_entry(reorder->list.next, 232 struct padata_priv, list); 233 234 if (unlikely(next_overrun)) { 235 for_each_cpu(cpu, pd->cpumask) { 236 queue = per_cpu_ptr(pd->queue, cpu); 237 atomic_set(&queue->num_obj, 0); 238 } 239 } 240 241 spin_lock(&reorder->lock); 242 list_del_init(&padata->list); 243 atomic_dec(&pd->reorder_objects); 244 spin_unlock(&reorder->lock); 245 246 atomic_inc(&next_queue->num_obj); 247 248 goto out; 249 } 250 251 queue = per_cpu_ptr(pd->queue, smp_processor_id()); 252 if (queue->cpu_index == next_queue->cpu_index) { 253 padata = ERR_PTR(-ENODATA); 254 goto out; 255 } 256 257 padata = ERR_PTR(-EINPROGRESS); 258 out: 259 return padata; 260 } 261 262 static void padata_reorder(struct parallel_data *pd) 263 { 264 struct padata_priv *padata; 265 struct padata_queue *queue; 266 struct padata_instance *pinst = pd->pinst; 267 268 /* 269 * We need to ensure that only one cpu can work on dequeueing of 270 * the reorder queue the time. Calculating in which percpu reorder 271 * queue the next object will arrive takes some time. A spinlock 272 * would be highly contended. Also it is not clear in which order 273 * the objects arrive to the reorder queues. So a cpu could wait to 274 * get the lock just to notice that there is nothing to do at the 275 * moment. Therefore we use a trylock and let the holder of the lock 276 * care for all the objects enqueued during the holdtime of the lock. 277 */ 278 if (!spin_trylock_bh(&pd->lock)) 279 return; 280 281 while (1) { 282 padata = padata_get_next(pd); 283 284 /* 285 * All reorder queues are empty, or the next object that needs 286 * serialization is parallel processed by another cpu and is 287 * still on it's way to the cpu's reorder queue, nothing to 288 * do for now. 289 */ 290 if (!padata || PTR_ERR(padata) == -EINPROGRESS) 291 break; 292 293 /* 294 * This cpu has to do the parallel processing of the next 295 * object. It's waiting in the cpu's parallelization queue, 296 * so exit imediately. 297 */ 298 if (PTR_ERR(padata) == -ENODATA) { 299 del_timer(&pd->timer); 300 spin_unlock_bh(&pd->lock); 301 return; 302 } 303 304 queue = per_cpu_ptr(pd->queue, padata->cb_cpu); 305 306 spin_lock(&queue->serial.lock); 307 list_add_tail(&padata->list, &queue->serial.list); 308 spin_unlock(&queue->serial.lock); 309 310 queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork); 311 } 312 313 spin_unlock_bh(&pd->lock); 314 315 /* 316 * The next object that needs serialization might have arrived to 317 * the reorder queues in the meantime, we will be called again 318 * from the timer function if noone else cares for it. 319 */ 320 if (atomic_read(&pd->reorder_objects) 321 && !(pinst->flags & PADATA_RESET)) 322 mod_timer(&pd->timer, jiffies + HZ); 323 else 324 del_timer(&pd->timer); 325 326 return; 327 } 328 329 static void padata_reorder_timer(unsigned long arg) 330 { 331 struct parallel_data *pd = (struct parallel_data *)arg; 332 333 padata_reorder(pd); 334 } 335 336 static void padata_serial_worker(struct work_struct *work) 337 { 338 struct padata_queue *queue; 339 struct parallel_data *pd; 340 LIST_HEAD(local_list); 341 342 local_bh_disable(); 343 queue = container_of(work, struct padata_queue, swork); 344 pd = queue->pd; 345 346 spin_lock(&queue->serial.lock); 347 list_replace_init(&queue->serial.list, &local_list); 348 spin_unlock(&queue->serial.lock); 349 350 while (!list_empty(&local_list)) { 351 struct padata_priv *padata; 352 353 padata = list_entry(local_list.next, 354 struct padata_priv, list); 355 356 list_del_init(&padata->list); 357 358 padata->serial(padata); 359 atomic_dec(&pd->refcnt); 360 } 361 local_bh_enable(); 362 } 363 364 /** 365 * padata_do_serial - padata serialization function 366 * 367 * @padata: object to be serialized. 368 * 369 * padata_do_serial must be called for every parallelized object. 370 * The serialization callback function will run with BHs off. 371 */ 372 void padata_do_serial(struct padata_priv *padata) 373 { 374 int cpu; 375 struct padata_queue *queue; 376 struct parallel_data *pd; 377 378 pd = padata->pd; 379 380 cpu = get_cpu(); 381 queue = per_cpu_ptr(pd->queue, cpu); 382 383 spin_lock(&queue->reorder.lock); 384 atomic_inc(&pd->reorder_objects); 385 list_add_tail(&padata->list, &queue->reorder.list); 386 spin_unlock(&queue->reorder.lock); 387 388 put_cpu(); 389 390 padata_reorder(pd); 391 } 392 EXPORT_SYMBOL(padata_do_serial); 393 394 /* Allocate and initialize the internal cpumask dependend resources. */ 395 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, 396 const struct cpumask *cpumask) 397 { 398 int cpu, cpu_index, num_cpus; 399 struct padata_queue *queue; 400 struct parallel_data *pd; 401 402 cpu_index = 0; 403 404 pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL); 405 if (!pd) 406 goto err; 407 408 pd->queue = alloc_percpu(struct padata_queue); 409 if (!pd->queue) 410 goto err_free_pd; 411 412 if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL)) 413 goto err_free_queue; 414 415 cpumask_and(pd->cpumask, cpumask, cpu_active_mask); 416 417 for_each_cpu(cpu, pd->cpumask) { 418 queue = per_cpu_ptr(pd->queue, cpu); 419 420 queue->pd = pd; 421 422 queue->cpu_index = cpu_index; 423 cpu_index++; 424 425 INIT_LIST_HEAD(&queue->reorder.list); 426 INIT_LIST_HEAD(&queue->parallel.list); 427 INIT_LIST_HEAD(&queue->serial.list); 428 spin_lock_init(&queue->reorder.lock); 429 spin_lock_init(&queue->parallel.lock); 430 spin_lock_init(&queue->serial.lock); 431 432 INIT_WORK(&queue->pwork, padata_parallel_worker); 433 INIT_WORK(&queue->swork, padata_serial_worker); 434 atomic_set(&queue->num_obj, 0); 435 } 436 437 num_cpus = cpumask_weight(pd->cpumask); 438 pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1; 439 440 setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd); 441 atomic_set(&pd->seq_nr, -1); 442 atomic_set(&pd->reorder_objects, 0); 443 atomic_set(&pd->refcnt, 0); 444 pd->pinst = pinst; 445 spin_lock_init(&pd->lock); 446 447 return pd; 448 449 err_free_queue: 450 free_percpu(pd->queue); 451 err_free_pd: 452 kfree(pd); 453 err: 454 return NULL; 455 } 456 457 static void padata_free_pd(struct parallel_data *pd) 458 { 459 free_cpumask_var(pd->cpumask); 460 free_percpu(pd->queue); 461 kfree(pd); 462 } 463 464 /* Flush all objects out of the padata queues. */ 465 static void padata_flush_queues(struct parallel_data *pd) 466 { 467 int cpu; 468 struct padata_queue *queue; 469 470 for_each_cpu(cpu, pd->cpumask) { 471 queue = per_cpu_ptr(pd->queue, cpu); 472 flush_work(&queue->pwork); 473 } 474 475 del_timer_sync(&pd->timer); 476 477 if (atomic_read(&pd->reorder_objects)) 478 padata_reorder(pd); 479 480 for_each_cpu(cpu, pd->cpumask) { 481 queue = per_cpu_ptr(pd->queue, cpu); 482 flush_work(&queue->swork); 483 } 484 485 BUG_ON(atomic_read(&pd->refcnt) != 0); 486 } 487 488 /* Replace the internal control stucture with a new one. */ 489 static void padata_replace(struct padata_instance *pinst, 490 struct parallel_data *pd_new) 491 { 492 struct parallel_data *pd_old = pinst->pd; 493 494 pinst->flags |= PADATA_RESET; 495 496 rcu_assign_pointer(pinst->pd, pd_new); 497 498 synchronize_rcu(); 499 500 padata_flush_queues(pd_old); 501 padata_free_pd(pd_old); 502 503 pinst->flags &= ~PADATA_RESET; 504 } 505 506 /** 507 * padata_set_cpumask - set the cpumask that padata should use 508 * 509 * @pinst: padata instance 510 * @cpumask: the cpumask to use 511 */ 512 int padata_set_cpumask(struct padata_instance *pinst, 513 cpumask_var_t cpumask) 514 { 515 struct parallel_data *pd; 516 int err = 0; 517 518 mutex_lock(&pinst->lock); 519 520 get_online_cpus(); 521 522 pd = padata_alloc_pd(pinst, cpumask); 523 if (!pd) { 524 err = -ENOMEM; 525 goto out; 526 } 527 528 cpumask_copy(pinst->cpumask, cpumask); 529 530 padata_replace(pinst, pd); 531 532 out: 533 put_online_cpus(); 534 535 mutex_unlock(&pinst->lock); 536 537 return err; 538 } 539 EXPORT_SYMBOL(padata_set_cpumask); 540 541 static int __padata_add_cpu(struct padata_instance *pinst, int cpu) 542 { 543 struct parallel_data *pd; 544 545 if (cpumask_test_cpu(cpu, cpu_active_mask)) { 546 pd = padata_alloc_pd(pinst, pinst->cpumask); 547 if (!pd) 548 return -ENOMEM; 549 550 padata_replace(pinst, pd); 551 } 552 553 return 0; 554 } 555 556 /** 557 * padata_add_cpu - add a cpu to the padata cpumask 558 * 559 * @pinst: padata instance 560 * @cpu: cpu to add 561 */ 562 int padata_add_cpu(struct padata_instance *pinst, int cpu) 563 { 564 int err; 565 566 mutex_lock(&pinst->lock); 567 568 get_online_cpus(); 569 cpumask_set_cpu(cpu, pinst->cpumask); 570 err = __padata_add_cpu(pinst, cpu); 571 put_online_cpus(); 572 573 mutex_unlock(&pinst->lock); 574 575 return err; 576 } 577 EXPORT_SYMBOL(padata_add_cpu); 578 579 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu) 580 { 581 struct parallel_data *pd; 582 583 if (cpumask_test_cpu(cpu, cpu_online_mask)) { 584 pd = padata_alloc_pd(pinst, pinst->cpumask); 585 if (!pd) 586 return -ENOMEM; 587 588 padata_replace(pinst, pd); 589 } 590 591 return 0; 592 } 593 594 /** 595 * padata_remove_cpu - remove a cpu from the padata cpumask 596 * 597 * @pinst: padata instance 598 * @cpu: cpu to remove 599 */ 600 int padata_remove_cpu(struct padata_instance *pinst, int cpu) 601 { 602 int err; 603 604 mutex_lock(&pinst->lock); 605 606 get_online_cpus(); 607 cpumask_clear_cpu(cpu, pinst->cpumask); 608 err = __padata_remove_cpu(pinst, cpu); 609 put_online_cpus(); 610 611 mutex_unlock(&pinst->lock); 612 613 return err; 614 } 615 EXPORT_SYMBOL(padata_remove_cpu); 616 617 /** 618 * padata_start - start the parallel processing 619 * 620 * @pinst: padata instance to start 621 */ 622 void padata_start(struct padata_instance *pinst) 623 { 624 mutex_lock(&pinst->lock); 625 pinst->flags |= PADATA_INIT; 626 mutex_unlock(&pinst->lock); 627 } 628 EXPORT_SYMBOL(padata_start); 629 630 /** 631 * padata_stop - stop the parallel processing 632 * 633 * @pinst: padata instance to stop 634 */ 635 void padata_stop(struct padata_instance *pinst) 636 { 637 mutex_lock(&pinst->lock); 638 pinst->flags &= ~PADATA_INIT; 639 mutex_unlock(&pinst->lock); 640 } 641 EXPORT_SYMBOL(padata_stop); 642 643 #ifdef CONFIG_HOTPLUG_CPU 644 static int padata_cpu_callback(struct notifier_block *nfb, 645 unsigned long action, void *hcpu) 646 { 647 int err; 648 struct padata_instance *pinst; 649 int cpu = (unsigned long)hcpu; 650 651 pinst = container_of(nfb, struct padata_instance, cpu_notifier); 652 653 switch (action) { 654 case CPU_ONLINE: 655 case CPU_ONLINE_FROZEN: 656 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 657 break; 658 mutex_lock(&pinst->lock); 659 err = __padata_add_cpu(pinst, cpu); 660 mutex_unlock(&pinst->lock); 661 if (err) 662 return notifier_from_errno(err); 663 break; 664 665 case CPU_DOWN_PREPARE: 666 case CPU_DOWN_PREPARE_FROZEN: 667 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 668 break; 669 mutex_lock(&pinst->lock); 670 err = __padata_remove_cpu(pinst, cpu); 671 mutex_unlock(&pinst->lock); 672 if (err) 673 return notifier_from_errno(err); 674 break; 675 676 case CPU_UP_CANCELED: 677 case CPU_UP_CANCELED_FROZEN: 678 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 679 break; 680 mutex_lock(&pinst->lock); 681 __padata_remove_cpu(pinst, cpu); 682 mutex_unlock(&pinst->lock); 683 684 case CPU_DOWN_FAILED: 685 case CPU_DOWN_FAILED_FROZEN: 686 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 687 break; 688 mutex_lock(&pinst->lock); 689 __padata_add_cpu(pinst, cpu); 690 mutex_unlock(&pinst->lock); 691 } 692 693 return NOTIFY_OK; 694 } 695 #endif 696 697 /** 698 * padata_alloc - allocate and initialize a padata instance 699 * 700 * @cpumask: cpumask that padata uses for parallelization 701 * @wq: workqueue to use for the allocated padata instance 702 */ 703 struct padata_instance *padata_alloc(const struct cpumask *cpumask, 704 struct workqueue_struct *wq) 705 { 706 struct padata_instance *pinst; 707 struct parallel_data *pd; 708 709 pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL); 710 if (!pinst) 711 goto err; 712 713 get_online_cpus(); 714 715 pd = padata_alloc_pd(pinst, cpumask); 716 if (!pd) 717 goto err_free_inst; 718 719 if (!alloc_cpumask_var(&pinst->cpumask, GFP_KERNEL)) 720 goto err_free_pd; 721 722 rcu_assign_pointer(pinst->pd, pd); 723 724 pinst->wq = wq; 725 726 cpumask_copy(pinst->cpumask, cpumask); 727 728 pinst->flags = 0; 729 730 #ifdef CONFIG_HOTPLUG_CPU 731 pinst->cpu_notifier.notifier_call = padata_cpu_callback; 732 pinst->cpu_notifier.priority = 0; 733 register_hotcpu_notifier(&pinst->cpu_notifier); 734 #endif 735 736 put_online_cpus(); 737 738 mutex_init(&pinst->lock); 739 740 return pinst; 741 742 err_free_pd: 743 padata_free_pd(pd); 744 err_free_inst: 745 kfree(pinst); 746 put_online_cpus(); 747 err: 748 return NULL; 749 } 750 EXPORT_SYMBOL(padata_alloc); 751 752 /** 753 * padata_free - free a padata instance 754 * 755 * @padata_inst: padata instance to free 756 */ 757 void padata_free(struct padata_instance *pinst) 758 { 759 padata_stop(pinst); 760 761 synchronize_rcu(); 762 763 #ifdef CONFIG_HOTPLUG_CPU 764 unregister_hotcpu_notifier(&pinst->cpu_notifier); 765 #endif 766 get_online_cpus(); 767 padata_flush_queues(pinst->pd); 768 put_online_cpus(); 769 770 padata_free_pd(pinst->pd); 771 free_cpumask_var(pinst->cpumask); 772 kfree(pinst); 773 } 774 EXPORT_SYMBOL(padata_free); 775