1 /* 2 * padata.c - generic interface to process data streams in parallel 3 * 4 * Copyright (C) 2008, 2009 secunet Security Networks AG 5 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com> 6 * 7 * This program is free software; you can redistribute it and/or modify it 8 * under the terms and conditions of the GNU General Public License, 9 * version 2, as published by the Free Software Foundation. 10 * 11 * This program is distributed in the hope it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 14 * more details. 15 * 16 * You should have received a copy of the GNU General Public License along with 17 * this program; if not, write to the Free Software Foundation, Inc., 18 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. 19 */ 20 21 #include <linux/module.h> 22 #include <linux/cpumask.h> 23 #include <linux/err.h> 24 #include <linux/cpu.h> 25 #include <linux/padata.h> 26 #include <linux/mutex.h> 27 #include <linux/sched.h> 28 #include <linux/slab.h> 29 #include <linux/rcupdate.h> 30 31 #define MAX_SEQ_NR INT_MAX - NR_CPUS 32 #define MAX_OBJ_NUM 10000 * NR_CPUS 33 34 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) 35 { 36 int cpu, target_cpu; 37 38 target_cpu = cpumask_first(pd->cpumask); 39 for (cpu = 0; cpu < cpu_index; cpu++) 40 target_cpu = cpumask_next(target_cpu, pd->cpumask); 41 42 return target_cpu; 43 } 44 45 static int padata_cpu_hash(struct padata_priv *padata) 46 { 47 int cpu_index; 48 struct parallel_data *pd; 49 50 pd = padata->pd; 51 52 /* 53 * Hash the sequence numbers to the cpus by taking 54 * seq_nr mod. number of cpus in use. 55 */ 56 cpu_index = padata->seq_nr % cpumask_weight(pd->cpumask); 57 58 return padata_index_to_cpu(pd, cpu_index); 59 } 60 61 static void padata_parallel_worker(struct work_struct *work) 62 { 63 struct padata_queue *queue; 64 struct parallel_data *pd; 65 struct padata_instance *pinst; 66 LIST_HEAD(local_list); 67 68 local_bh_disable(); 69 queue = container_of(work, struct padata_queue, pwork); 70 pd = queue->pd; 71 pinst = pd->pinst; 72 73 spin_lock(&queue->parallel.lock); 74 list_replace_init(&queue->parallel.list, &local_list); 75 spin_unlock(&queue->parallel.lock); 76 77 while (!list_empty(&local_list)) { 78 struct padata_priv *padata; 79 80 padata = list_entry(local_list.next, 81 struct padata_priv, list); 82 83 list_del_init(&padata->list); 84 85 padata->parallel(padata); 86 } 87 88 local_bh_enable(); 89 } 90 91 /* 92 * padata_do_parallel - padata parallelization function 93 * 94 * @pinst: padata instance 95 * @padata: object to be parallelized 96 * @cb_cpu: cpu the serialization callback function will run on, 97 * must be in the cpumask of padata. 98 * 99 * The parallelization callback function will run with BHs off. 100 * Note: Every object which is parallelized by padata_do_parallel 101 * must be seen by padata_do_serial. 102 */ 103 int padata_do_parallel(struct padata_instance *pinst, 104 struct padata_priv *padata, int cb_cpu) 105 { 106 int target_cpu, err; 107 struct padata_queue *queue; 108 struct parallel_data *pd; 109 110 rcu_read_lock_bh(); 111 112 pd = rcu_dereference(pinst->pd); 113 114 err = 0; 115 if (!(pinst->flags & PADATA_INIT)) 116 goto out; 117 118 err = -EBUSY; 119 if ((pinst->flags & PADATA_RESET)) 120 goto out; 121 122 if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM) 123 goto out; 124 125 err = -EINVAL; 126 if (!cpumask_test_cpu(cb_cpu, pd->cpumask)) 127 goto out; 128 129 err = -EINPROGRESS; 130 atomic_inc(&pd->refcnt); 131 padata->pd = pd; 132 padata->cb_cpu = cb_cpu; 133 134 if (unlikely(atomic_read(&pd->seq_nr) == pd->max_seq_nr)) 135 atomic_set(&pd->seq_nr, -1); 136 137 padata->seq_nr = atomic_inc_return(&pd->seq_nr); 138 139 target_cpu = padata_cpu_hash(padata); 140 queue = per_cpu_ptr(pd->queue, target_cpu); 141 142 spin_lock(&queue->parallel.lock); 143 list_add_tail(&padata->list, &queue->parallel.list); 144 spin_unlock(&queue->parallel.lock); 145 146 queue_work_on(target_cpu, pinst->wq, &queue->pwork); 147 148 out: 149 rcu_read_unlock_bh(); 150 151 return err; 152 } 153 EXPORT_SYMBOL(padata_do_parallel); 154 155 static struct padata_priv *padata_get_next(struct parallel_data *pd) 156 { 157 int cpu, num_cpus, empty, calc_seq_nr; 158 int seq_nr, next_nr, overrun, next_overrun; 159 struct padata_queue *queue, *next_queue; 160 struct padata_priv *padata; 161 struct padata_list *reorder; 162 163 empty = 0; 164 next_nr = -1; 165 next_overrun = 0; 166 next_queue = NULL; 167 168 num_cpus = cpumask_weight(pd->cpumask); 169 170 for_each_cpu(cpu, pd->cpumask) { 171 queue = per_cpu_ptr(pd->queue, cpu); 172 reorder = &queue->reorder; 173 174 /* 175 * Calculate the seq_nr of the object that should be 176 * next in this queue. 177 */ 178 overrun = 0; 179 calc_seq_nr = (atomic_read(&queue->num_obj) * num_cpus) 180 + queue->cpu_index; 181 182 if (unlikely(calc_seq_nr > pd->max_seq_nr)) { 183 calc_seq_nr = calc_seq_nr - pd->max_seq_nr - 1; 184 overrun = 1; 185 } 186 187 if (!list_empty(&reorder->list)) { 188 padata = list_entry(reorder->list.next, 189 struct padata_priv, list); 190 191 seq_nr = padata->seq_nr; 192 BUG_ON(calc_seq_nr != seq_nr); 193 } else { 194 seq_nr = calc_seq_nr; 195 empty++; 196 } 197 198 if (next_nr < 0 || seq_nr < next_nr 199 || (next_overrun && !overrun)) { 200 next_nr = seq_nr; 201 next_overrun = overrun; 202 next_queue = queue; 203 } 204 } 205 206 padata = NULL; 207 208 if (empty == num_cpus) 209 goto out; 210 211 reorder = &next_queue->reorder; 212 213 if (!list_empty(&reorder->list)) { 214 padata = list_entry(reorder->list.next, 215 struct padata_priv, list); 216 217 if (unlikely(next_overrun)) { 218 for_each_cpu(cpu, pd->cpumask) { 219 queue = per_cpu_ptr(pd->queue, cpu); 220 atomic_set(&queue->num_obj, 0); 221 } 222 } 223 224 spin_lock(&reorder->lock); 225 list_del_init(&padata->list); 226 atomic_dec(&pd->reorder_objects); 227 spin_unlock(&reorder->lock); 228 229 atomic_inc(&next_queue->num_obj); 230 231 goto out; 232 } 233 234 if (next_nr % num_cpus == next_queue->cpu_index) { 235 padata = ERR_PTR(-ENODATA); 236 goto out; 237 } 238 239 padata = ERR_PTR(-EINPROGRESS); 240 out: 241 return padata; 242 } 243 244 static void padata_reorder(struct parallel_data *pd) 245 { 246 struct padata_priv *padata; 247 struct padata_queue *queue; 248 struct padata_instance *pinst = pd->pinst; 249 250 try_again: 251 if (!spin_trylock_bh(&pd->lock)) 252 goto out; 253 254 while (1) { 255 padata = padata_get_next(pd); 256 257 if (!padata || PTR_ERR(padata) == -EINPROGRESS) 258 break; 259 260 if (PTR_ERR(padata) == -ENODATA) { 261 spin_unlock_bh(&pd->lock); 262 goto out; 263 } 264 265 queue = per_cpu_ptr(pd->queue, padata->cb_cpu); 266 267 spin_lock(&queue->serial.lock); 268 list_add_tail(&padata->list, &queue->serial.list); 269 spin_unlock(&queue->serial.lock); 270 271 queue_work_on(padata->cb_cpu, pinst->wq, &queue->swork); 272 } 273 274 spin_unlock_bh(&pd->lock); 275 276 if (atomic_read(&pd->reorder_objects)) 277 goto try_again; 278 279 out: 280 return; 281 } 282 283 static void padata_serial_worker(struct work_struct *work) 284 { 285 struct padata_queue *queue; 286 struct parallel_data *pd; 287 LIST_HEAD(local_list); 288 289 local_bh_disable(); 290 queue = container_of(work, struct padata_queue, swork); 291 pd = queue->pd; 292 293 spin_lock(&queue->serial.lock); 294 list_replace_init(&queue->serial.list, &local_list); 295 spin_unlock(&queue->serial.lock); 296 297 while (!list_empty(&local_list)) { 298 struct padata_priv *padata; 299 300 padata = list_entry(local_list.next, 301 struct padata_priv, list); 302 303 list_del_init(&padata->list); 304 305 padata->serial(padata); 306 atomic_dec(&pd->refcnt); 307 } 308 local_bh_enable(); 309 } 310 311 /* 312 * padata_do_serial - padata serialization function 313 * 314 * @padata: object to be serialized. 315 * 316 * padata_do_serial must be called for every parallelized object. 317 * The serialization callback function will run with BHs off. 318 */ 319 void padata_do_serial(struct padata_priv *padata) 320 { 321 int cpu; 322 struct padata_queue *queue; 323 struct parallel_data *pd; 324 325 pd = padata->pd; 326 327 cpu = get_cpu(); 328 queue = per_cpu_ptr(pd->queue, cpu); 329 330 spin_lock(&queue->reorder.lock); 331 atomic_inc(&pd->reorder_objects); 332 list_add_tail(&padata->list, &queue->reorder.list); 333 spin_unlock(&queue->reorder.lock); 334 335 put_cpu(); 336 337 padata_reorder(pd); 338 } 339 EXPORT_SYMBOL(padata_do_serial); 340 341 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, 342 const struct cpumask *cpumask) 343 { 344 int cpu, cpu_index, num_cpus; 345 struct padata_queue *queue; 346 struct parallel_data *pd; 347 348 cpu_index = 0; 349 350 pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL); 351 if (!pd) 352 goto err; 353 354 pd->queue = alloc_percpu(struct padata_queue); 355 if (!pd->queue) 356 goto err_free_pd; 357 358 if (!alloc_cpumask_var(&pd->cpumask, GFP_KERNEL)) 359 goto err_free_queue; 360 361 for_each_possible_cpu(cpu) { 362 queue = per_cpu_ptr(pd->queue, cpu); 363 364 queue->pd = pd; 365 366 if (cpumask_test_cpu(cpu, cpumask) 367 && cpumask_test_cpu(cpu, cpu_active_mask)) { 368 queue->cpu_index = cpu_index; 369 cpu_index++; 370 } else 371 queue->cpu_index = -1; 372 373 INIT_LIST_HEAD(&queue->reorder.list); 374 INIT_LIST_HEAD(&queue->parallel.list); 375 INIT_LIST_HEAD(&queue->serial.list); 376 spin_lock_init(&queue->reorder.lock); 377 spin_lock_init(&queue->parallel.lock); 378 spin_lock_init(&queue->serial.lock); 379 380 INIT_WORK(&queue->pwork, padata_parallel_worker); 381 INIT_WORK(&queue->swork, padata_serial_worker); 382 atomic_set(&queue->num_obj, 0); 383 } 384 385 cpumask_and(pd->cpumask, cpumask, cpu_active_mask); 386 387 num_cpus = cpumask_weight(pd->cpumask); 388 pd->max_seq_nr = (MAX_SEQ_NR / num_cpus) * num_cpus - 1; 389 390 atomic_set(&pd->seq_nr, -1); 391 atomic_set(&pd->reorder_objects, 0); 392 atomic_set(&pd->refcnt, 0); 393 pd->pinst = pinst; 394 spin_lock_init(&pd->lock); 395 396 return pd; 397 398 err_free_queue: 399 free_percpu(pd->queue); 400 err_free_pd: 401 kfree(pd); 402 err: 403 return NULL; 404 } 405 406 static void padata_free_pd(struct parallel_data *pd) 407 { 408 free_cpumask_var(pd->cpumask); 409 free_percpu(pd->queue); 410 kfree(pd); 411 } 412 413 static void padata_replace(struct padata_instance *pinst, 414 struct parallel_data *pd_new) 415 { 416 struct parallel_data *pd_old = pinst->pd; 417 418 pinst->flags |= PADATA_RESET; 419 420 rcu_assign_pointer(pinst->pd, pd_new); 421 422 synchronize_rcu(); 423 424 while (atomic_read(&pd_old->refcnt) != 0) 425 yield(); 426 427 flush_workqueue(pinst->wq); 428 429 padata_free_pd(pd_old); 430 431 pinst->flags &= ~PADATA_RESET; 432 } 433 434 /* 435 * padata_set_cpumask - set the cpumask that padata should use 436 * 437 * @pinst: padata instance 438 * @cpumask: the cpumask to use 439 */ 440 int padata_set_cpumask(struct padata_instance *pinst, 441 cpumask_var_t cpumask) 442 { 443 struct parallel_data *pd; 444 int err = 0; 445 446 might_sleep(); 447 448 mutex_lock(&pinst->lock); 449 450 pd = padata_alloc_pd(pinst, cpumask); 451 if (!pd) { 452 err = -ENOMEM; 453 goto out; 454 } 455 456 cpumask_copy(pinst->cpumask, cpumask); 457 458 padata_replace(pinst, pd); 459 460 out: 461 mutex_unlock(&pinst->lock); 462 463 return err; 464 } 465 EXPORT_SYMBOL(padata_set_cpumask); 466 467 static int __padata_add_cpu(struct padata_instance *pinst, int cpu) 468 { 469 struct parallel_data *pd; 470 471 if (cpumask_test_cpu(cpu, cpu_active_mask)) { 472 pd = padata_alloc_pd(pinst, pinst->cpumask); 473 if (!pd) 474 return -ENOMEM; 475 476 padata_replace(pinst, pd); 477 } 478 479 return 0; 480 } 481 482 /* 483 * padata_add_cpu - add a cpu to the padata cpumask 484 * 485 * @pinst: padata instance 486 * @cpu: cpu to add 487 */ 488 int padata_add_cpu(struct padata_instance *pinst, int cpu) 489 { 490 int err; 491 492 might_sleep(); 493 494 mutex_lock(&pinst->lock); 495 496 cpumask_set_cpu(cpu, pinst->cpumask); 497 err = __padata_add_cpu(pinst, cpu); 498 499 mutex_unlock(&pinst->lock); 500 501 return err; 502 } 503 EXPORT_SYMBOL(padata_add_cpu); 504 505 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu) 506 { 507 struct parallel_data *pd; 508 509 if (cpumask_test_cpu(cpu, cpu_online_mask)) { 510 pd = padata_alloc_pd(pinst, pinst->cpumask); 511 if (!pd) 512 return -ENOMEM; 513 514 padata_replace(pinst, pd); 515 } 516 517 return 0; 518 } 519 520 /* 521 * padata_remove_cpu - remove a cpu from the padata cpumask 522 * 523 * @pinst: padata instance 524 * @cpu: cpu to remove 525 */ 526 int padata_remove_cpu(struct padata_instance *pinst, int cpu) 527 { 528 int err; 529 530 might_sleep(); 531 532 mutex_lock(&pinst->lock); 533 534 cpumask_clear_cpu(cpu, pinst->cpumask); 535 err = __padata_remove_cpu(pinst, cpu); 536 537 mutex_unlock(&pinst->lock); 538 539 return err; 540 } 541 EXPORT_SYMBOL(padata_remove_cpu); 542 543 /* 544 * padata_start - start the parallel processing 545 * 546 * @pinst: padata instance to start 547 */ 548 void padata_start(struct padata_instance *pinst) 549 { 550 might_sleep(); 551 552 mutex_lock(&pinst->lock); 553 pinst->flags |= PADATA_INIT; 554 mutex_unlock(&pinst->lock); 555 } 556 EXPORT_SYMBOL(padata_start); 557 558 /* 559 * padata_stop - stop the parallel processing 560 * 561 * @pinst: padata instance to stop 562 */ 563 void padata_stop(struct padata_instance *pinst) 564 { 565 might_sleep(); 566 567 mutex_lock(&pinst->lock); 568 pinst->flags &= ~PADATA_INIT; 569 mutex_unlock(&pinst->lock); 570 } 571 EXPORT_SYMBOL(padata_stop); 572 573 static int __cpuinit padata_cpu_callback(struct notifier_block *nfb, 574 unsigned long action, void *hcpu) 575 { 576 int err; 577 struct padata_instance *pinst; 578 int cpu = (unsigned long)hcpu; 579 580 pinst = container_of(nfb, struct padata_instance, cpu_notifier); 581 582 switch (action) { 583 case CPU_ONLINE: 584 case CPU_ONLINE_FROZEN: 585 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 586 break; 587 mutex_lock(&pinst->lock); 588 err = __padata_add_cpu(pinst, cpu); 589 mutex_unlock(&pinst->lock); 590 if (err) 591 return NOTIFY_BAD; 592 break; 593 594 case CPU_DOWN_PREPARE: 595 case CPU_DOWN_PREPARE_FROZEN: 596 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 597 break; 598 mutex_lock(&pinst->lock); 599 err = __padata_remove_cpu(pinst, cpu); 600 mutex_unlock(&pinst->lock); 601 if (err) 602 return NOTIFY_BAD; 603 break; 604 605 case CPU_UP_CANCELED: 606 case CPU_UP_CANCELED_FROZEN: 607 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 608 break; 609 mutex_lock(&pinst->lock); 610 __padata_remove_cpu(pinst, cpu); 611 mutex_unlock(&pinst->lock); 612 613 case CPU_DOWN_FAILED: 614 case CPU_DOWN_FAILED_FROZEN: 615 if (!cpumask_test_cpu(cpu, pinst->cpumask)) 616 break; 617 mutex_lock(&pinst->lock); 618 __padata_add_cpu(pinst, cpu); 619 mutex_unlock(&pinst->lock); 620 } 621 622 return NOTIFY_OK; 623 } 624 625 /* 626 * padata_alloc - allocate and initialize a padata instance 627 * 628 * @cpumask: cpumask that padata uses for parallelization 629 * @wq: workqueue to use for the allocated padata instance 630 */ 631 struct padata_instance *padata_alloc(const struct cpumask *cpumask, 632 struct workqueue_struct *wq) 633 { 634 int err; 635 struct padata_instance *pinst; 636 struct parallel_data *pd; 637 638 pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL); 639 if (!pinst) 640 goto err; 641 642 pd = padata_alloc_pd(pinst, cpumask); 643 if (!pd) 644 goto err_free_inst; 645 646 if (!alloc_cpumask_var(&pinst->cpumask, GFP_KERNEL)) 647 goto err_free_pd; 648 649 rcu_assign_pointer(pinst->pd, pd); 650 651 pinst->wq = wq; 652 653 cpumask_copy(pinst->cpumask, cpumask); 654 655 pinst->flags = 0; 656 657 pinst->cpu_notifier.notifier_call = padata_cpu_callback; 658 pinst->cpu_notifier.priority = 0; 659 err = register_hotcpu_notifier(&pinst->cpu_notifier); 660 if (err) 661 goto err_free_cpumask; 662 663 mutex_init(&pinst->lock); 664 665 return pinst; 666 667 err_free_cpumask: 668 free_cpumask_var(pinst->cpumask); 669 err_free_pd: 670 padata_free_pd(pd); 671 err_free_inst: 672 kfree(pinst); 673 err: 674 return NULL; 675 } 676 EXPORT_SYMBOL(padata_alloc); 677 678 /* 679 * padata_free - free a padata instance 680 * 681 * @ padata_inst: padata instance to free 682 */ 683 void padata_free(struct padata_instance *pinst) 684 { 685 padata_stop(pinst); 686 687 synchronize_rcu(); 688 689 while (atomic_read(&pinst->pd->refcnt) != 0) 690 yield(); 691 692 unregister_hotcpu_notifier(&pinst->cpu_notifier); 693 padata_free_pd(pinst->pd); 694 free_cpumask_var(pinst->cpumask); 695 kfree(pinst); 696 } 697 EXPORT_SYMBOL(padata_free); 698