1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 36 /* 37 * The per-CPU workqueue (if single thread, we always use the first 38 * possible cpu). 39 * 40 * The sequence counters are for flush_scheduled_work(). It wants to wait 41 * until all currently-scheduled works are completed, but it doesn't 42 * want to be livelocked by new, incoming ones. So it waits until 43 * remove_sequence is >= the insert_sequence which pertained when 44 * flush_scheduled_work() was called. 45 */ 46 struct cpu_workqueue_struct { 47 48 spinlock_t lock; 49 50 long remove_sequence; /* Least-recently added (next to run) */ 51 long insert_sequence; /* Next to add */ 52 53 struct list_head worklist; 54 wait_queue_head_t more_work; 55 wait_queue_head_t work_done; 56 57 struct workqueue_struct *wq; 58 struct task_struct *thread; 59 60 int run_depth; /* Detect run_workqueue() recursion depth */ 61 62 int freezeable; /* Freeze the thread during suspend */ 63 } ____cacheline_aligned; 64 65 /* 66 * The externally visible workqueue abstraction is an array of 67 * per-CPU workqueues: 68 */ 69 struct workqueue_struct { 70 struct cpu_workqueue_struct *cpu_wq; 71 const char *name; 72 struct list_head list; /* Empty if single thread */ 73 }; 74 75 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 76 threads to each one as cpus come/go. */ 77 static DEFINE_MUTEX(workqueue_mutex); 78 static LIST_HEAD(workqueues); 79 80 static int singlethread_cpu; 81 82 /* If it's single threaded, it isn't in the list of workqueues. */ 83 static inline int is_single_threaded(struct workqueue_struct *wq) 84 { 85 return list_empty(&wq->list); 86 } 87 88 /* 89 * Set the workqueue on which a work item is to be run 90 * - Must *only* be called if the pending flag is set 91 */ 92 static inline void set_wq_data(struct work_struct *work, void *wq) 93 { 94 unsigned long new; 95 96 BUG_ON(!work_pending(work)); 97 98 new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING); 99 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 100 atomic_long_set(&work->data, new); 101 } 102 103 static inline void *get_wq_data(struct work_struct *work) 104 { 105 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 106 } 107 108 static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) 109 { 110 int ret = 0; 111 unsigned long flags; 112 113 spin_lock_irqsave(&cwq->lock, flags); 114 /* 115 * We need to re-validate the work info after we've gotten 116 * the cpu_workqueue lock. We can run the work now iff: 117 * 118 * - the wq_data still matches the cpu_workqueue_struct 119 * - AND the work is still marked pending 120 * - AND the work is still on a list (which will be this 121 * workqueue_struct list) 122 * 123 * All these conditions are important, because we 124 * need to protect against the work being run right 125 * now on another CPU (all but the last one might be 126 * true if it's currently running and has not been 127 * released yet, for example). 128 */ 129 if (get_wq_data(work) == cwq 130 && work_pending(work) 131 && !list_empty(&work->entry)) { 132 work_func_t f = work->func; 133 list_del_init(&work->entry); 134 spin_unlock_irqrestore(&cwq->lock, flags); 135 136 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work))) 137 work_release(work); 138 f(work); 139 140 spin_lock_irqsave(&cwq->lock, flags); 141 cwq->remove_sequence++; 142 wake_up(&cwq->work_done); 143 ret = 1; 144 } 145 spin_unlock_irqrestore(&cwq->lock, flags); 146 return ret; 147 } 148 149 /** 150 * run_scheduled_work - run scheduled work synchronously 151 * @work: work to run 152 * 153 * This checks if the work was pending, and runs it 154 * synchronously if so. It returns a boolean to indicate 155 * whether it had any scheduled work to run or not. 156 * 157 * NOTE! This _only_ works for normal work_structs. You 158 * CANNOT use this for delayed work, because the wq data 159 * for delayed work will not point properly to the per- 160 * CPU workqueue struct, but will change! 161 */ 162 int fastcall run_scheduled_work(struct work_struct *work) 163 { 164 for (;;) { 165 struct cpu_workqueue_struct *cwq; 166 167 if (!work_pending(work)) 168 return 0; 169 if (list_empty(&work->entry)) 170 return 0; 171 /* NOTE! This depends intimately on __queue_work! */ 172 cwq = get_wq_data(work); 173 if (!cwq) 174 return 0; 175 if (__run_work(cwq, work)) 176 return 1; 177 } 178 } 179 EXPORT_SYMBOL(run_scheduled_work); 180 181 /* Preempt must be disabled. */ 182 static void __queue_work(struct cpu_workqueue_struct *cwq, 183 struct work_struct *work) 184 { 185 unsigned long flags; 186 187 spin_lock_irqsave(&cwq->lock, flags); 188 set_wq_data(work, cwq); 189 list_add_tail(&work->entry, &cwq->worklist); 190 cwq->insert_sequence++; 191 wake_up(&cwq->more_work); 192 spin_unlock_irqrestore(&cwq->lock, flags); 193 } 194 195 /** 196 * queue_work - queue work on a workqueue 197 * @wq: workqueue to use 198 * @work: work to queue 199 * 200 * Returns 0 if @work was already on a queue, non-zero otherwise. 201 * 202 * We queue the work to the CPU it was submitted, but there is no 203 * guarantee that it will be processed by that CPU. 204 */ 205 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 206 { 207 int ret = 0, cpu = get_cpu(); 208 209 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 210 if (unlikely(is_single_threaded(wq))) 211 cpu = singlethread_cpu; 212 BUG_ON(!list_empty(&work->entry)); 213 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 214 ret = 1; 215 } 216 put_cpu(); 217 return ret; 218 } 219 EXPORT_SYMBOL_GPL(queue_work); 220 221 static void delayed_work_timer_fn(unsigned long __data) 222 { 223 struct delayed_work *dwork = (struct delayed_work *)__data; 224 struct workqueue_struct *wq = get_wq_data(&dwork->work); 225 int cpu = smp_processor_id(); 226 227 if (unlikely(is_single_threaded(wq))) 228 cpu = singlethread_cpu; 229 230 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work); 231 } 232 233 /** 234 * queue_delayed_work - queue work on a workqueue after delay 235 * @wq: workqueue to use 236 * @dwork: delayable work to queue 237 * @delay: number of jiffies to wait before queueing 238 * 239 * Returns 0 if @work was already on a queue, non-zero otherwise. 240 */ 241 int fastcall queue_delayed_work(struct workqueue_struct *wq, 242 struct delayed_work *dwork, unsigned long delay) 243 { 244 int ret = 0; 245 struct timer_list *timer = &dwork->timer; 246 struct work_struct *work = &dwork->work; 247 248 if (delay == 0) 249 return queue_work(wq, work); 250 251 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 252 BUG_ON(timer_pending(timer)); 253 BUG_ON(!list_empty(&work->entry)); 254 255 /* This stores wq for the moment, for the timer_fn */ 256 set_wq_data(work, wq); 257 timer->expires = jiffies + delay; 258 timer->data = (unsigned long)dwork; 259 timer->function = delayed_work_timer_fn; 260 add_timer(timer); 261 ret = 1; 262 } 263 return ret; 264 } 265 EXPORT_SYMBOL_GPL(queue_delayed_work); 266 267 /** 268 * queue_delayed_work_on - queue work on specific CPU after delay 269 * @cpu: CPU number to execute work on 270 * @wq: workqueue to use 271 * @dwork: work to queue 272 * @delay: number of jiffies to wait before queueing 273 * 274 * Returns 0 if @work was already on a queue, non-zero otherwise. 275 */ 276 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 277 struct delayed_work *dwork, unsigned long delay) 278 { 279 int ret = 0; 280 struct timer_list *timer = &dwork->timer; 281 struct work_struct *work = &dwork->work; 282 283 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 284 BUG_ON(timer_pending(timer)); 285 BUG_ON(!list_empty(&work->entry)); 286 287 /* This stores wq for the moment, for the timer_fn */ 288 set_wq_data(work, wq); 289 timer->expires = jiffies + delay; 290 timer->data = (unsigned long)dwork; 291 timer->function = delayed_work_timer_fn; 292 add_timer_on(timer, cpu); 293 ret = 1; 294 } 295 return ret; 296 } 297 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 298 299 static void run_workqueue(struct cpu_workqueue_struct *cwq) 300 { 301 unsigned long flags; 302 303 /* 304 * Keep taking off work from the queue until 305 * done. 306 */ 307 spin_lock_irqsave(&cwq->lock, flags); 308 cwq->run_depth++; 309 if (cwq->run_depth > 3) { 310 /* morton gets to eat his hat */ 311 printk("%s: recursion depth exceeded: %d\n", 312 __FUNCTION__, cwq->run_depth); 313 dump_stack(); 314 } 315 while (!list_empty(&cwq->worklist)) { 316 struct work_struct *work = list_entry(cwq->worklist.next, 317 struct work_struct, entry); 318 work_func_t f = work->func; 319 320 list_del_init(cwq->worklist.next); 321 spin_unlock_irqrestore(&cwq->lock, flags); 322 323 BUG_ON(get_wq_data(work) != cwq); 324 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work))) 325 work_release(work); 326 f(work); 327 328 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 329 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 330 "%s/0x%08x/%d\n", 331 current->comm, preempt_count(), 332 current->pid); 333 printk(KERN_ERR " last function: "); 334 print_symbol("%s\n", (unsigned long)f); 335 debug_show_held_locks(current); 336 dump_stack(); 337 } 338 339 spin_lock_irqsave(&cwq->lock, flags); 340 cwq->remove_sequence++; 341 wake_up(&cwq->work_done); 342 } 343 cwq->run_depth--; 344 spin_unlock_irqrestore(&cwq->lock, flags); 345 } 346 347 static int worker_thread(void *__cwq) 348 { 349 struct cpu_workqueue_struct *cwq = __cwq; 350 DECLARE_WAITQUEUE(wait, current); 351 struct k_sigaction sa; 352 sigset_t blocked; 353 354 if (!cwq->freezeable) 355 current->flags |= PF_NOFREEZE; 356 357 set_user_nice(current, -5); 358 359 /* Block and flush all signals */ 360 sigfillset(&blocked); 361 sigprocmask(SIG_BLOCK, &blocked, NULL); 362 flush_signals(current); 363 364 /* 365 * We inherited MPOL_INTERLEAVE from the booting kernel. 366 * Set MPOL_DEFAULT to insure node local allocations. 367 */ 368 numa_default_policy(); 369 370 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 371 sa.sa.sa_handler = SIG_IGN; 372 sa.sa.sa_flags = 0; 373 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 374 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 375 376 set_current_state(TASK_INTERRUPTIBLE); 377 while (!kthread_should_stop()) { 378 if (cwq->freezeable) 379 try_to_freeze(); 380 381 add_wait_queue(&cwq->more_work, &wait); 382 if (list_empty(&cwq->worklist)) 383 schedule(); 384 else 385 __set_current_state(TASK_RUNNING); 386 remove_wait_queue(&cwq->more_work, &wait); 387 388 if (!list_empty(&cwq->worklist)) 389 run_workqueue(cwq); 390 set_current_state(TASK_INTERRUPTIBLE); 391 } 392 __set_current_state(TASK_RUNNING); 393 return 0; 394 } 395 396 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 397 { 398 if (cwq->thread == current) { 399 /* 400 * Probably keventd trying to flush its own queue. So simply run 401 * it by hand rather than deadlocking. 402 */ 403 run_workqueue(cwq); 404 } else { 405 DEFINE_WAIT(wait); 406 long sequence_needed; 407 408 spin_lock_irq(&cwq->lock); 409 sequence_needed = cwq->insert_sequence; 410 411 while (sequence_needed - cwq->remove_sequence > 0) { 412 prepare_to_wait(&cwq->work_done, &wait, 413 TASK_UNINTERRUPTIBLE); 414 spin_unlock_irq(&cwq->lock); 415 schedule(); 416 spin_lock_irq(&cwq->lock); 417 } 418 finish_wait(&cwq->work_done, &wait); 419 spin_unlock_irq(&cwq->lock); 420 } 421 } 422 423 /** 424 * flush_workqueue - ensure that any scheduled work has run to completion. 425 * @wq: workqueue to flush 426 * 427 * Forces execution of the workqueue and blocks until its completion. 428 * This is typically used in driver shutdown handlers. 429 * 430 * This function will sample each workqueue's current insert_sequence number and 431 * will sleep until the head sequence is greater than or equal to that. This 432 * means that we sleep until all works which were queued on entry have been 433 * handled, but we are not livelocked by new incoming ones. 434 * 435 * This function used to run the workqueues itself. Now we just wait for the 436 * helper threads to do it. 437 */ 438 void fastcall flush_workqueue(struct workqueue_struct *wq) 439 { 440 might_sleep(); 441 442 if (is_single_threaded(wq)) { 443 /* Always use first cpu's area. */ 444 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 445 } else { 446 int cpu; 447 448 mutex_lock(&workqueue_mutex); 449 for_each_online_cpu(cpu) 450 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 451 mutex_unlock(&workqueue_mutex); 452 } 453 } 454 EXPORT_SYMBOL_GPL(flush_workqueue); 455 456 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 457 int cpu, int freezeable) 458 { 459 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 460 struct task_struct *p; 461 462 spin_lock_init(&cwq->lock); 463 cwq->wq = wq; 464 cwq->thread = NULL; 465 cwq->insert_sequence = 0; 466 cwq->remove_sequence = 0; 467 cwq->freezeable = freezeable; 468 INIT_LIST_HEAD(&cwq->worklist); 469 init_waitqueue_head(&cwq->more_work); 470 init_waitqueue_head(&cwq->work_done); 471 472 if (is_single_threaded(wq)) 473 p = kthread_create(worker_thread, cwq, "%s", wq->name); 474 else 475 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 476 if (IS_ERR(p)) 477 return NULL; 478 cwq->thread = p; 479 return p; 480 } 481 482 struct workqueue_struct *__create_workqueue(const char *name, 483 int singlethread, int freezeable) 484 { 485 int cpu, destroy = 0; 486 struct workqueue_struct *wq; 487 struct task_struct *p; 488 489 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 490 if (!wq) 491 return NULL; 492 493 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 494 if (!wq->cpu_wq) { 495 kfree(wq); 496 return NULL; 497 } 498 499 wq->name = name; 500 mutex_lock(&workqueue_mutex); 501 if (singlethread) { 502 INIT_LIST_HEAD(&wq->list); 503 p = create_workqueue_thread(wq, singlethread_cpu, freezeable); 504 if (!p) 505 destroy = 1; 506 else 507 wake_up_process(p); 508 } else { 509 list_add(&wq->list, &workqueues); 510 for_each_online_cpu(cpu) { 511 p = create_workqueue_thread(wq, cpu, freezeable); 512 if (p) { 513 kthread_bind(p, cpu); 514 wake_up_process(p); 515 } else 516 destroy = 1; 517 } 518 } 519 mutex_unlock(&workqueue_mutex); 520 521 /* 522 * Was there any error during startup? If yes then clean up: 523 */ 524 if (destroy) { 525 destroy_workqueue(wq); 526 wq = NULL; 527 } 528 return wq; 529 } 530 EXPORT_SYMBOL_GPL(__create_workqueue); 531 532 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 533 { 534 struct cpu_workqueue_struct *cwq; 535 unsigned long flags; 536 struct task_struct *p; 537 538 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 539 spin_lock_irqsave(&cwq->lock, flags); 540 p = cwq->thread; 541 cwq->thread = NULL; 542 spin_unlock_irqrestore(&cwq->lock, flags); 543 if (p) 544 kthread_stop(p); 545 } 546 547 /** 548 * destroy_workqueue - safely terminate a workqueue 549 * @wq: target workqueue 550 * 551 * Safely destroy a workqueue. All work currently pending will be done first. 552 */ 553 void destroy_workqueue(struct workqueue_struct *wq) 554 { 555 int cpu; 556 557 flush_workqueue(wq); 558 559 /* We don't need the distraction of CPUs appearing and vanishing. */ 560 mutex_lock(&workqueue_mutex); 561 if (is_single_threaded(wq)) 562 cleanup_workqueue_thread(wq, singlethread_cpu); 563 else { 564 for_each_online_cpu(cpu) 565 cleanup_workqueue_thread(wq, cpu); 566 list_del(&wq->list); 567 } 568 mutex_unlock(&workqueue_mutex); 569 free_percpu(wq->cpu_wq); 570 kfree(wq); 571 } 572 EXPORT_SYMBOL_GPL(destroy_workqueue); 573 574 static struct workqueue_struct *keventd_wq; 575 576 /** 577 * schedule_work - put work task in global workqueue 578 * @work: job to be done 579 * 580 * This puts a job in the kernel-global workqueue. 581 */ 582 int fastcall schedule_work(struct work_struct *work) 583 { 584 return queue_work(keventd_wq, work); 585 } 586 EXPORT_SYMBOL(schedule_work); 587 588 /** 589 * schedule_delayed_work - put work task in global workqueue after delay 590 * @dwork: job to be done 591 * @delay: number of jiffies to wait or 0 for immediate execution 592 * 593 * After waiting for a given time this puts a job in the kernel-global 594 * workqueue. 595 */ 596 int fastcall schedule_delayed_work(struct delayed_work *dwork, unsigned long delay) 597 { 598 return queue_delayed_work(keventd_wq, dwork, delay); 599 } 600 EXPORT_SYMBOL(schedule_delayed_work); 601 602 /** 603 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 604 * @cpu: cpu to use 605 * @dwork: job to be done 606 * @delay: number of jiffies to wait 607 * 608 * After waiting for a given time this puts a job in the kernel-global 609 * workqueue on the specified CPU. 610 */ 611 int schedule_delayed_work_on(int cpu, 612 struct delayed_work *dwork, unsigned long delay) 613 { 614 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 615 } 616 EXPORT_SYMBOL(schedule_delayed_work_on); 617 618 /** 619 * schedule_on_each_cpu - call a function on each online CPU from keventd 620 * @func: the function to call 621 * 622 * Returns zero on success. 623 * Returns -ve errno on failure. 624 * 625 * Appears to be racy against CPU hotplug. 626 * 627 * schedule_on_each_cpu() is very slow. 628 */ 629 int schedule_on_each_cpu(work_func_t func) 630 { 631 int cpu; 632 struct work_struct *works; 633 634 works = alloc_percpu(struct work_struct); 635 if (!works) 636 return -ENOMEM; 637 638 mutex_lock(&workqueue_mutex); 639 for_each_online_cpu(cpu) { 640 struct work_struct *work = per_cpu_ptr(works, cpu); 641 642 INIT_WORK(work, func); 643 set_bit(WORK_STRUCT_PENDING, work_data_bits(work)); 644 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work); 645 } 646 mutex_unlock(&workqueue_mutex); 647 flush_workqueue(keventd_wq); 648 free_percpu(works); 649 return 0; 650 } 651 652 void flush_scheduled_work(void) 653 { 654 flush_workqueue(keventd_wq); 655 } 656 EXPORT_SYMBOL(flush_scheduled_work); 657 658 /** 659 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 660 * work whose handler rearms the delayed work. 661 * @wq: the controlling workqueue structure 662 * @dwork: the delayed work struct 663 */ 664 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 665 struct delayed_work *dwork) 666 { 667 while (!cancel_delayed_work(dwork)) 668 flush_workqueue(wq); 669 } 670 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 671 672 /** 673 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 674 * work whose handler rearms the delayed work. 675 * @dwork: the delayed work struct 676 */ 677 void cancel_rearming_delayed_work(struct delayed_work *dwork) 678 { 679 cancel_rearming_delayed_workqueue(keventd_wq, dwork); 680 } 681 EXPORT_SYMBOL(cancel_rearming_delayed_work); 682 683 /** 684 * execute_in_process_context - reliably execute the routine with user context 685 * @fn: the function to execute 686 * @ew: guaranteed storage for the execute work structure (must 687 * be available when the work executes) 688 * 689 * Executes the function immediately if process context is available, 690 * otherwise schedules the function for delayed execution. 691 * 692 * Returns: 0 - function was executed 693 * 1 - function was scheduled for execution 694 */ 695 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 696 { 697 if (!in_interrupt()) { 698 fn(&ew->work); 699 return 0; 700 } 701 702 INIT_WORK(&ew->work, fn); 703 schedule_work(&ew->work); 704 705 return 1; 706 } 707 EXPORT_SYMBOL_GPL(execute_in_process_context); 708 709 int keventd_up(void) 710 { 711 return keventd_wq != NULL; 712 } 713 714 int current_is_keventd(void) 715 { 716 struct cpu_workqueue_struct *cwq; 717 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 718 int ret = 0; 719 720 BUG_ON(!keventd_wq); 721 722 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 723 if (current == cwq->thread) 724 ret = 1; 725 726 return ret; 727 728 } 729 730 /* Take the work from this (downed) CPU. */ 731 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 732 { 733 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 734 struct list_head list; 735 struct work_struct *work; 736 737 spin_lock_irq(&cwq->lock); 738 list_replace_init(&cwq->worklist, &list); 739 740 while (!list_empty(&list)) { 741 printk("Taking work for %s\n", wq->name); 742 work = list_entry(list.next,struct work_struct,entry); 743 list_del(&work->entry); 744 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 745 } 746 spin_unlock_irq(&cwq->lock); 747 } 748 749 /* We're holding the cpucontrol mutex here */ 750 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 751 unsigned long action, 752 void *hcpu) 753 { 754 unsigned int hotcpu = (unsigned long)hcpu; 755 struct workqueue_struct *wq; 756 757 switch (action) { 758 case CPU_UP_PREPARE: 759 mutex_lock(&workqueue_mutex); 760 /* Create a new workqueue thread for it. */ 761 list_for_each_entry(wq, &workqueues, list) { 762 if (!create_workqueue_thread(wq, hotcpu, 0)) { 763 printk("workqueue for %i failed\n", hotcpu); 764 return NOTIFY_BAD; 765 } 766 } 767 break; 768 769 case CPU_ONLINE: 770 /* Kick off worker threads. */ 771 list_for_each_entry(wq, &workqueues, list) { 772 struct cpu_workqueue_struct *cwq; 773 774 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 775 kthread_bind(cwq->thread, hotcpu); 776 wake_up_process(cwq->thread); 777 } 778 mutex_unlock(&workqueue_mutex); 779 break; 780 781 case CPU_UP_CANCELED: 782 list_for_each_entry(wq, &workqueues, list) { 783 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 784 continue; 785 /* Unbind so it can run. */ 786 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 787 any_online_cpu(cpu_online_map)); 788 cleanup_workqueue_thread(wq, hotcpu); 789 } 790 mutex_unlock(&workqueue_mutex); 791 break; 792 793 case CPU_DOWN_PREPARE: 794 mutex_lock(&workqueue_mutex); 795 break; 796 797 case CPU_DOWN_FAILED: 798 mutex_unlock(&workqueue_mutex); 799 break; 800 801 case CPU_DEAD: 802 list_for_each_entry(wq, &workqueues, list) 803 cleanup_workqueue_thread(wq, hotcpu); 804 list_for_each_entry(wq, &workqueues, list) 805 take_over_work(wq, hotcpu); 806 mutex_unlock(&workqueue_mutex); 807 break; 808 } 809 810 return NOTIFY_OK; 811 } 812 813 void init_workqueues(void) 814 { 815 singlethread_cpu = first_cpu(cpu_possible_map); 816 hotcpu_notifier(workqueue_cpu_callback, 0); 817 keventd_wq = create_workqueue("events"); 818 BUG_ON(!keventd_wq); 819 } 820 821