1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 36 /* 37 * The per-CPU workqueue (if single thread, we always use the first 38 * possible cpu). 39 * 40 * The sequence counters are for flush_scheduled_work(). It wants to wait 41 * until all currently-scheduled works are completed, but it doesn't 42 * want to be livelocked by new, incoming ones. So it waits until 43 * remove_sequence is >= the insert_sequence which pertained when 44 * flush_scheduled_work() was called. 45 */ 46 struct cpu_workqueue_struct { 47 48 spinlock_t lock; 49 50 long remove_sequence; /* Least-recently added (next to run) */ 51 long insert_sequence; /* Next to add */ 52 53 struct list_head worklist; 54 wait_queue_head_t more_work; 55 wait_queue_head_t work_done; 56 57 struct workqueue_struct *wq; 58 struct task_struct *thread; 59 60 int run_depth; /* Detect run_workqueue() recursion depth */ 61 62 int freezeable; /* Freeze the thread during suspend */ 63 } ____cacheline_aligned; 64 65 /* 66 * The externally visible workqueue abstraction is an array of 67 * per-CPU workqueues: 68 */ 69 struct workqueue_struct { 70 struct cpu_workqueue_struct *cpu_wq; 71 const char *name; 72 struct list_head list; /* Empty if single thread */ 73 }; 74 75 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 76 threads to each one as cpus come/go. */ 77 static DEFINE_MUTEX(workqueue_mutex); 78 static LIST_HEAD(workqueues); 79 80 static int singlethread_cpu; 81 82 /* If it's single threaded, it isn't in the list of workqueues. */ 83 static inline int is_single_threaded(struct workqueue_struct *wq) 84 { 85 return list_empty(&wq->list); 86 } 87 88 static inline void set_wq_data(struct work_struct *work, void *wq) 89 { 90 unsigned long new, old, res; 91 92 /* assume the pending flag is already set and that the task has already 93 * been queued on this workqueue */ 94 new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING); 95 res = work->management; 96 if (res != new) { 97 do { 98 old = res; 99 new = (unsigned long) wq; 100 new |= (old & WORK_STRUCT_FLAG_MASK); 101 res = cmpxchg(&work->management, old, new); 102 } while (res != old); 103 } 104 } 105 106 static inline void *get_wq_data(struct work_struct *work) 107 { 108 return (void *) (work->management & WORK_STRUCT_WQ_DATA_MASK); 109 } 110 111 static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) 112 { 113 int ret = 0; 114 unsigned long flags; 115 116 spin_lock_irqsave(&cwq->lock, flags); 117 /* 118 * We need to re-validate the work info after we've gotten 119 * the cpu_workqueue lock. We can run the work now iff: 120 * 121 * - the wq_data still matches the cpu_workqueue_struct 122 * - AND the work is still marked pending 123 * - AND the work is still on a list (which will be this 124 * workqueue_struct list) 125 * 126 * All these conditions are important, because we 127 * need to protect against the work being run right 128 * now on another CPU (all but the last one might be 129 * true if it's currently running and has not been 130 * released yet, for example). 131 */ 132 if (get_wq_data(work) == cwq 133 && work_pending(work) 134 && !list_empty(&work->entry)) { 135 work_func_t f = work->func; 136 list_del_init(&work->entry); 137 spin_unlock_irqrestore(&cwq->lock, flags); 138 139 if (!test_bit(WORK_STRUCT_NOAUTOREL, &work->management)) 140 work_release(work); 141 f(work); 142 143 spin_lock_irqsave(&cwq->lock, flags); 144 cwq->remove_sequence++; 145 wake_up(&cwq->work_done); 146 ret = 1; 147 } 148 spin_unlock_irqrestore(&cwq->lock, flags); 149 return ret; 150 } 151 152 /** 153 * run_scheduled_work - run scheduled work synchronously 154 * @work: work to run 155 * 156 * This checks if the work was pending, and runs it 157 * synchronously if so. It returns a boolean to indicate 158 * whether it had any scheduled work to run or not. 159 * 160 * NOTE! This _only_ works for normal work_structs. You 161 * CANNOT use this for delayed work, because the wq data 162 * for delayed work will not point properly to the per- 163 * CPU workqueue struct, but will change! 164 */ 165 int fastcall run_scheduled_work(struct work_struct *work) 166 { 167 for (;;) { 168 struct cpu_workqueue_struct *cwq; 169 170 if (!work_pending(work)) 171 return 0; 172 if (list_empty(&work->entry)) 173 return 0; 174 /* NOTE! This depends intimately on __queue_work! */ 175 cwq = get_wq_data(work); 176 if (!cwq) 177 return 0; 178 if (__run_work(cwq, work)) 179 return 1; 180 } 181 } 182 EXPORT_SYMBOL(run_scheduled_work); 183 184 /* Preempt must be disabled. */ 185 static void __queue_work(struct cpu_workqueue_struct *cwq, 186 struct work_struct *work) 187 { 188 unsigned long flags; 189 190 spin_lock_irqsave(&cwq->lock, flags); 191 set_wq_data(work, cwq); 192 list_add_tail(&work->entry, &cwq->worklist); 193 cwq->insert_sequence++; 194 wake_up(&cwq->more_work); 195 spin_unlock_irqrestore(&cwq->lock, flags); 196 } 197 198 /** 199 * queue_work - queue work on a workqueue 200 * @wq: workqueue to use 201 * @work: work to queue 202 * 203 * Returns 0 if @work was already on a queue, non-zero otherwise. 204 * 205 * We queue the work to the CPU it was submitted, but there is no 206 * guarantee that it will be processed by that CPU. 207 */ 208 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 209 { 210 int ret = 0, cpu = get_cpu(); 211 212 if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) { 213 if (unlikely(is_single_threaded(wq))) 214 cpu = singlethread_cpu; 215 BUG_ON(!list_empty(&work->entry)); 216 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 217 ret = 1; 218 } 219 put_cpu(); 220 return ret; 221 } 222 EXPORT_SYMBOL_GPL(queue_work); 223 224 static void delayed_work_timer_fn(unsigned long __data) 225 { 226 struct delayed_work *dwork = (struct delayed_work *)__data; 227 struct workqueue_struct *wq = get_wq_data(&dwork->work); 228 int cpu = smp_processor_id(); 229 230 if (unlikely(is_single_threaded(wq))) 231 cpu = singlethread_cpu; 232 233 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work); 234 } 235 236 /** 237 * queue_delayed_work - queue work on a workqueue after delay 238 * @wq: workqueue to use 239 * @work: delayable work to queue 240 * @delay: number of jiffies to wait before queueing 241 * 242 * Returns 0 if @work was already on a queue, non-zero otherwise. 243 */ 244 int fastcall queue_delayed_work(struct workqueue_struct *wq, 245 struct delayed_work *dwork, unsigned long delay) 246 { 247 int ret = 0; 248 struct timer_list *timer = &dwork->timer; 249 struct work_struct *work = &dwork->work; 250 251 if (delay == 0) 252 return queue_work(wq, work); 253 254 if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) { 255 BUG_ON(timer_pending(timer)); 256 BUG_ON(!list_empty(&work->entry)); 257 258 /* This stores wq for the moment, for the timer_fn */ 259 set_wq_data(work, wq); 260 timer->expires = jiffies + delay; 261 timer->data = (unsigned long)dwork; 262 timer->function = delayed_work_timer_fn; 263 add_timer(timer); 264 ret = 1; 265 } 266 return ret; 267 } 268 EXPORT_SYMBOL_GPL(queue_delayed_work); 269 270 /** 271 * queue_delayed_work_on - queue work on specific CPU after delay 272 * @cpu: CPU number to execute work on 273 * @wq: workqueue to use 274 * @work: work to queue 275 * @delay: number of jiffies to wait before queueing 276 * 277 * Returns 0 if @work was already on a queue, non-zero otherwise. 278 */ 279 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 280 struct delayed_work *dwork, unsigned long delay) 281 { 282 int ret = 0; 283 struct timer_list *timer = &dwork->timer; 284 struct work_struct *work = &dwork->work; 285 286 if (!test_and_set_bit(WORK_STRUCT_PENDING, &work->management)) { 287 BUG_ON(timer_pending(timer)); 288 BUG_ON(!list_empty(&work->entry)); 289 290 /* This stores wq for the moment, for the timer_fn */ 291 set_wq_data(work, wq); 292 timer->expires = jiffies + delay; 293 timer->data = (unsigned long)dwork; 294 timer->function = delayed_work_timer_fn; 295 add_timer_on(timer, cpu); 296 ret = 1; 297 } 298 return ret; 299 } 300 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 301 302 static void run_workqueue(struct cpu_workqueue_struct *cwq) 303 { 304 unsigned long flags; 305 306 /* 307 * Keep taking off work from the queue until 308 * done. 309 */ 310 spin_lock_irqsave(&cwq->lock, flags); 311 cwq->run_depth++; 312 if (cwq->run_depth > 3) { 313 /* morton gets to eat his hat */ 314 printk("%s: recursion depth exceeded: %d\n", 315 __FUNCTION__, cwq->run_depth); 316 dump_stack(); 317 } 318 while (!list_empty(&cwq->worklist)) { 319 struct work_struct *work = list_entry(cwq->worklist.next, 320 struct work_struct, entry); 321 work_func_t f = work->func; 322 323 list_del_init(cwq->worklist.next); 324 spin_unlock_irqrestore(&cwq->lock, flags); 325 326 BUG_ON(get_wq_data(work) != cwq); 327 if (!test_bit(WORK_STRUCT_NOAUTOREL, &work->management)) 328 work_release(work); 329 f(work); 330 331 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 332 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 333 "%s/0x%08x/%d\n", 334 current->comm, preempt_count(), 335 current->pid); 336 printk(KERN_ERR " last function: "); 337 print_symbol("%s\n", (unsigned long)f); 338 debug_show_held_locks(current); 339 dump_stack(); 340 } 341 342 spin_lock_irqsave(&cwq->lock, flags); 343 cwq->remove_sequence++; 344 wake_up(&cwq->work_done); 345 } 346 cwq->run_depth--; 347 spin_unlock_irqrestore(&cwq->lock, flags); 348 } 349 350 static int worker_thread(void *__cwq) 351 { 352 struct cpu_workqueue_struct *cwq = __cwq; 353 DECLARE_WAITQUEUE(wait, current); 354 struct k_sigaction sa; 355 sigset_t blocked; 356 357 if (!cwq->freezeable) 358 current->flags |= PF_NOFREEZE; 359 360 set_user_nice(current, -5); 361 362 /* Block and flush all signals */ 363 sigfillset(&blocked); 364 sigprocmask(SIG_BLOCK, &blocked, NULL); 365 flush_signals(current); 366 367 /* 368 * We inherited MPOL_INTERLEAVE from the booting kernel. 369 * Set MPOL_DEFAULT to insure node local allocations. 370 */ 371 numa_default_policy(); 372 373 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 374 sa.sa.sa_handler = SIG_IGN; 375 sa.sa.sa_flags = 0; 376 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 377 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 378 379 set_current_state(TASK_INTERRUPTIBLE); 380 while (!kthread_should_stop()) { 381 if (cwq->freezeable) 382 try_to_freeze(); 383 384 add_wait_queue(&cwq->more_work, &wait); 385 if (list_empty(&cwq->worklist)) 386 schedule(); 387 else 388 __set_current_state(TASK_RUNNING); 389 remove_wait_queue(&cwq->more_work, &wait); 390 391 if (!list_empty(&cwq->worklist)) 392 run_workqueue(cwq); 393 set_current_state(TASK_INTERRUPTIBLE); 394 } 395 __set_current_state(TASK_RUNNING); 396 return 0; 397 } 398 399 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 400 { 401 if (cwq->thread == current) { 402 /* 403 * Probably keventd trying to flush its own queue. So simply run 404 * it by hand rather than deadlocking. 405 */ 406 run_workqueue(cwq); 407 } else { 408 DEFINE_WAIT(wait); 409 long sequence_needed; 410 411 spin_lock_irq(&cwq->lock); 412 sequence_needed = cwq->insert_sequence; 413 414 while (sequence_needed - cwq->remove_sequence > 0) { 415 prepare_to_wait(&cwq->work_done, &wait, 416 TASK_UNINTERRUPTIBLE); 417 spin_unlock_irq(&cwq->lock); 418 schedule(); 419 spin_lock_irq(&cwq->lock); 420 } 421 finish_wait(&cwq->work_done, &wait); 422 spin_unlock_irq(&cwq->lock); 423 } 424 } 425 426 /** 427 * flush_workqueue - ensure that any scheduled work has run to completion. 428 * @wq: workqueue to flush 429 * 430 * Forces execution of the workqueue and blocks until its completion. 431 * This is typically used in driver shutdown handlers. 432 * 433 * This function will sample each workqueue's current insert_sequence number and 434 * will sleep until the head sequence is greater than or equal to that. This 435 * means that we sleep until all works which were queued on entry have been 436 * handled, but we are not livelocked by new incoming ones. 437 * 438 * This function used to run the workqueues itself. Now we just wait for the 439 * helper threads to do it. 440 */ 441 void fastcall flush_workqueue(struct workqueue_struct *wq) 442 { 443 might_sleep(); 444 445 if (is_single_threaded(wq)) { 446 /* Always use first cpu's area. */ 447 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 448 } else { 449 int cpu; 450 451 mutex_lock(&workqueue_mutex); 452 for_each_online_cpu(cpu) 453 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 454 mutex_unlock(&workqueue_mutex); 455 } 456 } 457 EXPORT_SYMBOL_GPL(flush_workqueue); 458 459 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 460 int cpu, int freezeable) 461 { 462 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 463 struct task_struct *p; 464 465 spin_lock_init(&cwq->lock); 466 cwq->wq = wq; 467 cwq->thread = NULL; 468 cwq->insert_sequence = 0; 469 cwq->remove_sequence = 0; 470 cwq->freezeable = freezeable; 471 INIT_LIST_HEAD(&cwq->worklist); 472 init_waitqueue_head(&cwq->more_work); 473 init_waitqueue_head(&cwq->work_done); 474 475 if (is_single_threaded(wq)) 476 p = kthread_create(worker_thread, cwq, "%s", wq->name); 477 else 478 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 479 if (IS_ERR(p)) 480 return NULL; 481 cwq->thread = p; 482 return p; 483 } 484 485 struct workqueue_struct *__create_workqueue(const char *name, 486 int singlethread, int freezeable) 487 { 488 int cpu, destroy = 0; 489 struct workqueue_struct *wq; 490 struct task_struct *p; 491 492 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 493 if (!wq) 494 return NULL; 495 496 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 497 if (!wq->cpu_wq) { 498 kfree(wq); 499 return NULL; 500 } 501 502 wq->name = name; 503 mutex_lock(&workqueue_mutex); 504 if (singlethread) { 505 INIT_LIST_HEAD(&wq->list); 506 p = create_workqueue_thread(wq, singlethread_cpu, freezeable); 507 if (!p) 508 destroy = 1; 509 else 510 wake_up_process(p); 511 } else { 512 list_add(&wq->list, &workqueues); 513 for_each_online_cpu(cpu) { 514 p = create_workqueue_thread(wq, cpu, freezeable); 515 if (p) { 516 kthread_bind(p, cpu); 517 wake_up_process(p); 518 } else 519 destroy = 1; 520 } 521 } 522 mutex_unlock(&workqueue_mutex); 523 524 /* 525 * Was there any error during startup? If yes then clean up: 526 */ 527 if (destroy) { 528 destroy_workqueue(wq); 529 wq = NULL; 530 } 531 return wq; 532 } 533 EXPORT_SYMBOL_GPL(__create_workqueue); 534 535 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 536 { 537 struct cpu_workqueue_struct *cwq; 538 unsigned long flags; 539 struct task_struct *p; 540 541 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 542 spin_lock_irqsave(&cwq->lock, flags); 543 p = cwq->thread; 544 cwq->thread = NULL; 545 spin_unlock_irqrestore(&cwq->lock, flags); 546 if (p) 547 kthread_stop(p); 548 } 549 550 /** 551 * destroy_workqueue - safely terminate a workqueue 552 * @wq: target workqueue 553 * 554 * Safely destroy a workqueue. All work currently pending will be done first. 555 */ 556 void destroy_workqueue(struct workqueue_struct *wq) 557 { 558 int cpu; 559 560 flush_workqueue(wq); 561 562 /* We don't need the distraction of CPUs appearing and vanishing. */ 563 mutex_lock(&workqueue_mutex); 564 if (is_single_threaded(wq)) 565 cleanup_workqueue_thread(wq, singlethread_cpu); 566 else { 567 for_each_online_cpu(cpu) 568 cleanup_workqueue_thread(wq, cpu); 569 list_del(&wq->list); 570 } 571 mutex_unlock(&workqueue_mutex); 572 free_percpu(wq->cpu_wq); 573 kfree(wq); 574 } 575 EXPORT_SYMBOL_GPL(destroy_workqueue); 576 577 static struct workqueue_struct *keventd_wq; 578 579 /** 580 * schedule_work - put work task in global workqueue 581 * @work: job to be done 582 * 583 * This puts a job in the kernel-global workqueue. 584 */ 585 int fastcall schedule_work(struct work_struct *work) 586 { 587 return queue_work(keventd_wq, work); 588 } 589 EXPORT_SYMBOL(schedule_work); 590 591 /** 592 * schedule_delayed_work - put work task in global workqueue after delay 593 * @dwork: job to be done 594 * @delay: number of jiffies to wait or 0 for immediate execution 595 * 596 * After waiting for a given time this puts a job in the kernel-global 597 * workqueue. 598 */ 599 int fastcall schedule_delayed_work(struct delayed_work *dwork, unsigned long delay) 600 { 601 return queue_delayed_work(keventd_wq, dwork, delay); 602 } 603 EXPORT_SYMBOL(schedule_delayed_work); 604 605 /** 606 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 607 * @cpu: cpu to use 608 * @dwork: job to be done 609 * @delay: number of jiffies to wait 610 * 611 * After waiting for a given time this puts a job in the kernel-global 612 * workqueue on the specified CPU. 613 */ 614 int schedule_delayed_work_on(int cpu, 615 struct delayed_work *dwork, unsigned long delay) 616 { 617 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 618 } 619 EXPORT_SYMBOL(schedule_delayed_work_on); 620 621 /** 622 * schedule_on_each_cpu - call a function on each online CPU from keventd 623 * @func: the function to call 624 * 625 * Returns zero on success. 626 * Returns -ve errno on failure. 627 * 628 * Appears to be racy against CPU hotplug. 629 * 630 * schedule_on_each_cpu() is very slow. 631 */ 632 int schedule_on_each_cpu(work_func_t func) 633 { 634 int cpu; 635 struct work_struct *works; 636 637 works = alloc_percpu(struct work_struct); 638 if (!works) 639 return -ENOMEM; 640 641 mutex_lock(&workqueue_mutex); 642 for_each_online_cpu(cpu) { 643 INIT_WORK(per_cpu_ptr(works, cpu), func); 644 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), 645 per_cpu_ptr(works, cpu)); 646 } 647 mutex_unlock(&workqueue_mutex); 648 flush_workqueue(keventd_wq); 649 free_percpu(works); 650 return 0; 651 } 652 653 void flush_scheduled_work(void) 654 { 655 flush_workqueue(keventd_wq); 656 } 657 EXPORT_SYMBOL(flush_scheduled_work); 658 659 /** 660 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 661 * work whose handler rearms the delayed work. 662 * @wq: the controlling workqueue structure 663 * @dwork: the delayed work struct 664 */ 665 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 666 struct delayed_work *dwork) 667 { 668 while (!cancel_delayed_work(dwork)) 669 flush_workqueue(wq); 670 } 671 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 672 673 /** 674 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 675 * work whose handler rearms the delayed work. 676 * @dwork: the delayed work struct 677 */ 678 void cancel_rearming_delayed_work(struct delayed_work *dwork) 679 { 680 cancel_rearming_delayed_workqueue(keventd_wq, dwork); 681 } 682 EXPORT_SYMBOL(cancel_rearming_delayed_work); 683 684 /** 685 * execute_in_process_context - reliably execute the routine with user context 686 * @fn: the function to execute 687 * @ew: guaranteed storage for the execute work structure (must 688 * be available when the work executes) 689 * 690 * Executes the function immediately if process context is available, 691 * otherwise schedules the function for delayed execution. 692 * 693 * Returns: 0 - function was executed 694 * 1 - function was scheduled for execution 695 */ 696 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 697 { 698 if (!in_interrupt()) { 699 fn(&ew->work); 700 return 0; 701 } 702 703 INIT_WORK(&ew->work, fn); 704 schedule_work(&ew->work); 705 706 return 1; 707 } 708 EXPORT_SYMBOL_GPL(execute_in_process_context); 709 710 int keventd_up(void) 711 { 712 return keventd_wq != NULL; 713 } 714 715 int current_is_keventd(void) 716 { 717 struct cpu_workqueue_struct *cwq; 718 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 719 int ret = 0; 720 721 BUG_ON(!keventd_wq); 722 723 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 724 if (current == cwq->thread) 725 ret = 1; 726 727 return ret; 728 729 } 730 731 /* Take the work from this (downed) CPU. */ 732 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 733 { 734 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 735 struct list_head list; 736 struct work_struct *work; 737 738 spin_lock_irq(&cwq->lock); 739 list_replace_init(&cwq->worklist, &list); 740 741 while (!list_empty(&list)) { 742 printk("Taking work for %s\n", wq->name); 743 work = list_entry(list.next,struct work_struct,entry); 744 list_del(&work->entry); 745 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 746 } 747 spin_unlock_irq(&cwq->lock); 748 } 749 750 /* We're holding the cpucontrol mutex here */ 751 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 752 unsigned long action, 753 void *hcpu) 754 { 755 unsigned int hotcpu = (unsigned long)hcpu; 756 struct workqueue_struct *wq; 757 758 switch (action) { 759 case CPU_UP_PREPARE: 760 mutex_lock(&workqueue_mutex); 761 /* Create a new workqueue thread for it. */ 762 list_for_each_entry(wq, &workqueues, list) { 763 if (!create_workqueue_thread(wq, hotcpu, 0)) { 764 printk("workqueue for %i failed\n", hotcpu); 765 return NOTIFY_BAD; 766 } 767 } 768 break; 769 770 case CPU_ONLINE: 771 /* Kick off worker threads. */ 772 list_for_each_entry(wq, &workqueues, list) { 773 struct cpu_workqueue_struct *cwq; 774 775 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 776 kthread_bind(cwq->thread, hotcpu); 777 wake_up_process(cwq->thread); 778 } 779 mutex_unlock(&workqueue_mutex); 780 break; 781 782 case CPU_UP_CANCELED: 783 list_for_each_entry(wq, &workqueues, list) { 784 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 785 continue; 786 /* Unbind so it can run. */ 787 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 788 any_online_cpu(cpu_online_map)); 789 cleanup_workqueue_thread(wq, hotcpu); 790 } 791 mutex_unlock(&workqueue_mutex); 792 break; 793 794 case CPU_DOWN_PREPARE: 795 mutex_lock(&workqueue_mutex); 796 break; 797 798 case CPU_DOWN_FAILED: 799 mutex_unlock(&workqueue_mutex); 800 break; 801 802 case CPU_DEAD: 803 list_for_each_entry(wq, &workqueues, list) 804 cleanup_workqueue_thread(wq, hotcpu); 805 list_for_each_entry(wq, &workqueues, list) 806 take_over_work(wq, hotcpu); 807 mutex_unlock(&workqueue_mutex); 808 break; 809 } 810 811 return NOTIFY_OK; 812 } 813 814 void init_workqueues(void) 815 { 816 singlethread_cpu = first_cpu(cpu_possible_map); 817 hotcpu_notifier(workqueue_cpu_callback, 0); 818 keventd_wq = create_workqueue("events"); 819 BUG_ON(!keventd_wq); 820 } 821 822