1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 36 /* 37 * The per-CPU workqueue (if single thread, we always use the first 38 * possible cpu). 39 * 40 * The sequence counters are for flush_scheduled_work(). It wants to wait 41 * until all currently-scheduled works are completed, but it doesn't 42 * want to be livelocked by new, incoming ones. So it waits until 43 * remove_sequence is >= the insert_sequence which pertained when 44 * flush_scheduled_work() was called. 45 */ 46 struct cpu_workqueue_struct { 47 48 spinlock_t lock; 49 50 long remove_sequence; /* Least-recently added (next to run) */ 51 long insert_sequence; /* Next to add */ 52 53 struct list_head worklist; 54 wait_queue_head_t more_work; 55 wait_queue_head_t work_done; 56 57 struct workqueue_struct *wq; 58 struct task_struct *thread; 59 60 int run_depth; /* Detect run_workqueue() recursion depth */ 61 62 int freezeable; /* Freeze the thread during suspend */ 63 } ____cacheline_aligned; 64 65 /* 66 * The externally visible workqueue abstraction is an array of 67 * per-CPU workqueues: 68 */ 69 struct workqueue_struct { 70 struct cpu_workqueue_struct *cpu_wq; 71 const char *name; 72 struct list_head list; /* Empty if single thread */ 73 }; 74 75 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 76 threads to each one as cpus come/go. */ 77 static DEFINE_MUTEX(workqueue_mutex); 78 static LIST_HEAD(workqueues); 79 80 static int singlethread_cpu; 81 82 /* If it's single threaded, it isn't in the list of workqueues. */ 83 static inline int is_single_threaded(struct workqueue_struct *wq) 84 { 85 return list_empty(&wq->list); 86 } 87 88 /* 89 * Set the workqueue on which a work item is to be run 90 * - Must *only* be called if the pending flag is set 91 */ 92 static inline void set_wq_data(struct work_struct *work, void *wq) 93 { 94 unsigned long new; 95 96 BUG_ON(!work_pending(work)); 97 98 new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING); 99 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 100 atomic_long_set(&work->data, new); 101 } 102 103 static inline void *get_wq_data(struct work_struct *work) 104 { 105 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 106 } 107 108 static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work) 109 { 110 int ret = 0; 111 unsigned long flags; 112 113 spin_lock_irqsave(&cwq->lock, flags); 114 /* 115 * We need to re-validate the work info after we've gotten 116 * the cpu_workqueue lock. We can run the work now iff: 117 * 118 * - the wq_data still matches the cpu_workqueue_struct 119 * - AND the work is still marked pending 120 * - AND the work is still on a list (which will be this 121 * workqueue_struct list) 122 * 123 * All these conditions are important, because we 124 * need to protect against the work being run right 125 * now on another CPU (all but the last one might be 126 * true if it's currently running and has not been 127 * released yet, for example). 128 */ 129 if (get_wq_data(work) == cwq 130 && work_pending(work) 131 && !list_empty(&work->entry)) { 132 work_func_t f = work->func; 133 list_del_init(&work->entry); 134 spin_unlock_irqrestore(&cwq->lock, flags); 135 136 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work))) 137 work_release(work); 138 f(work); 139 140 spin_lock_irqsave(&cwq->lock, flags); 141 cwq->remove_sequence++; 142 wake_up(&cwq->work_done); 143 ret = 1; 144 } 145 spin_unlock_irqrestore(&cwq->lock, flags); 146 return ret; 147 } 148 149 /** 150 * run_scheduled_work - run scheduled work synchronously 151 * @work: work to run 152 * 153 * This checks if the work was pending, and runs it 154 * synchronously if so. It returns a boolean to indicate 155 * whether it had any scheduled work to run or not. 156 * 157 * NOTE! This _only_ works for normal work_structs. You 158 * CANNOT use this for delayed work, because the wq data 159 * for delayed work will not point properly to the per- 160 * CPU workqueue struct, but will change! 161 */ 162 int fastcall run_scheduled_work(struct work_struct *work) 163 { 164 for (;;) { 165 struct cpu_workqueue_struct *cwq; 166 167 if (!work_pending(work)) 168 return 0; 169 if (list_empty(&work->entry)) 170 return 0; 171 /* NOTE! This depends intimately on __queue_work! */ 172 cwq = get_wq_data(work); 173 if (!cwq) 174 return 0; 175 if (__run_work(cwq, work)) 176 return 1; 177 } 178 } 179 EXPORT_SYMBOL(run_scheduled_work); 180 181 /* Preempt must be disabled. */ 182 static void __queue_work(struct cpu_workqueue_struct *cwq, 183 struct work_struct *work) 184 { 185 unsigned long flags; 186 187 spin_lock_irqsave(&cwq->lock, flags); 188 set_wq_data(work, cwq); 189 list_add_tail(&work->entry, &cwq->worklist); 190 cwq->insert_sequence++; 191 wake_up(&cwq->more_work); 192 spin_unlock_irqrestore(&cwq->lock, flags); 193 } 194 195 /** 196 * queue_work - queue work on a workqueue 197 * @wq: workqueue to use 198 * @work: work to queue 199 * 200 * Returns 0 if @work was already on a queue, non-zero otherwise. 201 * 202 * We queue the work to the CPU it was submitted, but there is no 203 * guarantee that it will be processed by that CPU. 204 */ 205 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 206 { 207 int ret = 0, cpu = get_cpu(); 208 209 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 210 if (unlikely(is_single_threaded(wq))) 211 cpu = singlethread_cpu; 212 BUG_ON(!list_empty(&work->entry)); 213 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 214 ret = 1; 215 } 216 put_cpu(); 217 return ret; 218 } 219 EXPORT_SYMBOL_GPL(queue_work); 220 221 void delayed_work_timer_fn(unsigned long __data) 222 { 223 struct delayed_work *dwork = (struct delayed_work *)__data; 224 struct workqueue_struct *wq = get_wq_data(&dwork->work); 225 int cpu = smp_processor_id(); 226 227 if (unlikely(is_single_threaded(wq))) 228 cpu = singlethread_cpu; 229 230 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work); 231 } 232 233 /** 234 * queue_delayed_work - queue work on a workqueue after delay 235 * @wq: workqueue to use 236 * @dwork: delayable work to queue 237 * @delay: number of jiffies to wait before queueing 238 * 239 * Returns 0 if @work was already on a queue, non-zero otherwise. 240 */ 241 int fastcall queue_delayed_work(struct workqueue_struct *wq, 242 struct delayed_work *dwork, unsigned long delay) 243 { 244 int ret = 0; 245 struct timer_list *timer = &dwork->timer; 246 struct work_struct *work = &dwork->work; 247 248 timer_stats_timer_set_start_info(timer); 249 if (delay == 0) 250 return queue_work(wq, work); 251 252 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 253 BUG_ON(timer_pending(timer)); 254 BUG_ON(!list_empty(&work->entry)); 255 256 /* This stores wq for the moment, for the timer_fn */ 257 set_wq_data(work, wq); 258 timer->expires = jiffies + delay; 259 timer->data = (unsigned long)dwork; 260 timer->function = delayed_work_timer_fn; 261 add_timer(timer); 262 ret = 1; 263 } 264 return ret; 265 } 266 EXPORT_SYMBOL_GPL(queue_delayed_work); 267 268 /** 269 * queue_delayed_work_on - queue work on specific CPU after delay 270 * @cpu: CPU number to execute work on 271 * @wq: workqueue to use 272 * @dwork: work to queue 273 * @delay: number of jiffies to wait before queueing 274 * 275 * Returns 0 if @work was already on a queue, non-zero otherwise. 276 */ 277 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 278 struct delayed_work *dwork, unsigned long delay) 279 { 280 int ret = 0; 281 struct timer_list *timer = &dwork->timer; 282 struct work_struct *work = &dwork->work; 283 284 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 285 BUG_ON(timer_pending(timer)); 286 BUG_ON(!list_empty(&work->entry)); 287 288 /* This stores wq for the moment, for the timer_fn */ 289 set_wq_data(work, wq); 290 timer->expires = jiffies + delay; 291 timer->data = (unsigned long)dwork; 292 timer->function = delayed_work_timer_fn; 293 add_timer_on(timer, cpu); 294 ret = 1; 295 } 296 return ret; 297 } 298 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 299 300 static void run_workqueue(struct cpu_workqueue_struct *cwq) 301 { 302 unsigned long flags; 303 304 /* 305 * Keep taking off work from the queue until 306 * done. 307 */ 308 spin_lock_irqsave(&cwq->lock, flags); 309 cwq->run_depth++; 310 if (cwq->run_depth > 3) { 311 /* morton gets to eat his hat */ 312 printk("%s: recursion depth exceeded: %d\n", 313 __FUNCTION__, cwq->run_depth); 314 dump_stack(); 315 } 316 while (!list_empty(&cwq->worklist)) { 317 struct work_struct *work = list_entry(cwq->worklist.next, 318 struct work_struct, entry); 319 work_func_t f = work->func; 320 321 list_del_init(cwq->worklist.next); 322 spin_unlock_irqrestore(&cwq->lock, flags); 323 324 BUG_ON(get_wq_data(work) != cwq); 325 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work))) 326 work_release(work); 327 f(work); 328 329 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 330 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 331 "%s/0x%08x/%d\n", 332 current->comm, preempt_count(), 333 current->pid); 334 printk(KERN_ERR " last function: "); 335 print_symbol("%s\n", (unsigned long)f); 336 debug_show_held_locks(current); 337 dump_stack(); 338 } 339 340 spin_lock_irqsave(&cwq->lock, flags); 341 cwq->remove_sequence++; 342 wake_up(&cwq->work_done); 343 } 344 cwq->run_depth--; 345 spin_unlock_irqrestore(&cwq->lock, flags); 346 } 347 348 static int worker_thread(void *__cwq) 349 { 350 struct cpu_workqueue_struct *cwq = __cwq; 351 DECLARE_WAITQUEUE(wait, current); 352 struct k_sigaction sa; 353 sigset_t blocked; 354 355 if (!cwq->freezeable) 356 current->flags |= PF_NOFREEZE; 357 358 set_user_nice(current, -5); 359 360 /* Block and flush all signals */ 361 sigfillset(&blocked); 362 sigprocmask(SIG_BLOCK, &blocked, NULL); 363 flush_signals(current); 364 365 /* 366 * We inherited MPOL_INTERLEAVE from the booting kernel. 367 * Set MPOL_DEFAULT to insure node local allocations. 368 */ 369 numa_default_policy(); 370 371 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 372 sa.sa.sa_handler = SIG_IGN; 373 sa.sa.sa_flags = 0; 374 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 375 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 376 377 set_current_state(TASK_INTERRUPTIBLE); 378 while (!kthread_should_stop()) { 379 if (cwq->freezeable) 380 try_to_freeze(); 381 382 add_wait_queue(&cwq->more_work, &wait); 383 if (list_empty(&cwq->worklist)) 384 schedule(); 385 else 386 __set_current_state(TASK_RUNNING); 387 remove_wait_queue(&cwq->more_work, &wait); 388 389 if (!list_empty(&cwq->worklist)) 390 run_workqueue(cwq); 391 set_current_state(TASK_INTERRUPTIBLE); 392 } 393 __set_current_state(TASK_RUNNING); 394 return 0; 395 } 396 397 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 398 { 399 if (cwq->thread == current) { 400 /* 401 * Probably keventd trying to flush its own queue. So simply run 402 * it by hand rather than deadlocking. 403 */ 404 run_workqueue(cwq); 405 } else { 406 DEFINE_WAIT(wait); 407 long sequence_needed; 408 409 spin_lock_irq(&cwq->lock); 410 sequence_needed = cwq->insert_sequence; 411 412 while (sequence_needed - cwq->remove_sequence > 0) { 413 prepare_to_wait(&cwq->work_done, &wait, 414 TASK_UNINTERRUPTIBLE); 415 spin_unlock_irq(&cwq->lock); 416 schedule(); 417 spin_lock_irq(&cwq->lock); 418 } 419 finish_wait(&cwq->work_done, &wait); 420 spin_unlock_irq(&cwq->lock); 421 } 422 } 423 424 /** 425 * flush_workqueue - ensure that any scheduled work has run to completion. 426 * @wq: workqueue to flush 427 * 428 * Forces execution of the workqueue and blocks until its completion. 429 * This is typically used in driver shutdown handlers. 430 * 431 * This function will sample each workqueue's current insert_sequence number and 432 * will sleep until the head sequence is greater than or equal to that. This 433 * means that we sleep until all works which were queued on entry have been 434 * handled, but we are not livelocked by new incoming ones. 435 * 436 * This function used to run the workqueues itself. Now we just wait for the 437 * helper threads to do it. 438 */ 439 void fastcall flush_workqueue(struct workqueue_struct *wq) 440 { 441 might_sleep(); 442 443 if (is_single_threaded(wq)) { 444 /* Always use first cpu's area. */ 445 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 446 } else { 447 int cpu; 448 449 mutex_lock(&workqueue_mutex); 450 for_each_online_cpu(cpu) 451 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 452 mutex_unlock(&workqueue_mutex); 453 } 454 } 455 EXPORT_SYMBOL_GPL(flush_workqueue); 456 457 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 458 int cpu, int freezeable) 459 { 460 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 461 struct task_struct *p; 462 463 spin_lock_init(&cwq->lock); 464 cwq->wq = wq; 465 cwq->thread = NULL; 466 cwq->insert_sequence = 0; 467 cwq->remove_sequence = 0; 468 cwq->freezeable = freezeable; 469 INIT_LIST_HEAD(&cwq->worklist); 470 init_waitqueue_head(&cwq->more_work); 471 init_waitqueue_head(&cwq->work_done); 472 473 if (is_single_threaded(wq)) 474 p = kthread_create(worker_thread, cwq, "%s", wq->name); 475 else 476 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 477 if (IS_ERR(p)) 478 return NULL; 479 cwq->thread = p; 480 return p; 481 } 482 483 struct workqueue_struct *__create_workqueue(const char *name, 484 int singlethread, int freezeable) 485 { 486 int cpu, destroy = 0; 487 struct workqueue_struct *wq; 488 struct task_struct *p; 489 490 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 491 if (!wq) 492 return NULL; 493 494 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 495 if (!wq->cpu_wq) { 496 kfree(wq); 497 return NULL; 498 } 499 500 wq->name = name; 501 mutex_lock(&workqueue_mutex); 502 if (singlethread) { 503 INIT_LIST_HEAD(&wq->list); 504 p = create_workqueue_thread(wq, singlethread_cpu, freezeable); 505 if (!p) 506 destroy = 1; 507 else 508 wake_up_process(p); 509 } else { 510 list_add(&wq->list, &workqueues); 511 for_each_online_cpu(cpu) { 512 p = create_workqueue_thread(wq, cpu, freezeable); 513 if (p) { 514 kthread_bind(p, cpu); 515 wake_up_process(p); 516 } else 517 destroy = 1; 518 } 519 } 520 mutex_unlock(&workqueue_mutex); 521 522 /* 523 * Was there any error during startup? If yes then clean up: 524 */ 525 if (destroy) { 526 destroy_workqueue(wq); 527 wq = NULL; 528 } 529 return wq; 530 } 531 EXPORT_SYMBOL_GPL(__create_workqueue); 532 533 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 534 { 535 struct cpu_workqueue_struct *cwq; 536 unsigned long flags; 537 struct task_struct *p; 538 539 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 540 spin_lock_irqsave(&cwq->lock, flags); 541 p = cwq->thread; 542 cwq->thread = NULL; 543 spin_unlock_irqrestore(&cwq->lock, flags); 544 if (p) 545 kthread_stop(p); 546 } 547 548 /** 549 * destroy_workqueue - safely terminate a workqueue 550 * @wq: target workqueue 551 * 552 * Safely destroy a workqueue. All work currently pending will be done first. 553 */ 554 void destroy_workqueue(struct workqueue_struct *wq) 555 { 556 int cpu; 557 558 flush_workqueue(wq); 559 560 /* We don't need the distraction of CPUs appearing and vanishing. */ 561 mutex_lock(&workqueue_mutex); 562 if (is_single_threaded(wq)) 563 cleanup_workqueue_thread(wq, singlethread_cpu); 564 else { 565 for_each_online_cpu(cpu) 566 cleanup_workqueue_thread(wq, cpu); 567 list_del(&wq->list); 568 } 569 mutex_unlock(&workqueue_mutex); 570 free_percpu(wq->cpu_wq); 571 kfree(wq); 572 } 573 EXPORT_SYMBOL_GPL(destroy_workqueue); 574 575 static struct workqueue_struct *keventd_wq; 576 577 /** 578 * schedule_work - put work task in global workqueue 579 * @work: job to be done 580 * 581 * This puts a job in the kernel-global workqueue. 582 */ 583 int fastcall schedule_work(struct work_struct *work) 584 { 585 return queue_work(keventd_wq, work); 586 } 587 EXPORT_SYMBOL(schedule_work); 588 589 /** 590 * schedule_delayed_work - put work task in global workqueue after delay 591 * @dwork: job to be done 592 * @delay: number of jiffies to wait or 0 for immediate execution 593 * 594 * After waiting for a given time this puts a job in the kernel-global 595 * workqueue. 596 */ 597 int fastcall schedule_delayed_work(struct delayed_work *dwork, 598 unsigned long delay) 599 { 600 timer_stats_timer_set_start_info(&dwork->timer); 601 return queue_delayed_work(keventd_wq, dwork, delay); 602 } 603 EXPORT_SYMBOL(schedule_delayed_work); 604 605 /** 606 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 607 * @cpu: cpu to use 608 * @dwork: job to be done 609 * @delay: number of jiffies to wait 610 * 611 * After waiting for a given time this puts a job in the kernel-global 612 * workqueue on the specified CPU. 613 */ 614 int schedule_delayed_work_on(int cpu, 615 struct delayed_work *dwork, unsigned long delay) 616 { 617 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 618 } 619 EXPORT_SYMBOL(schedule_delayed_work_on); 620 621 /** 622 * schedule_on_each_cpu - call a function on each online CPU from keventd 623 * @func: the function to call 624 * 625 * Returns zero on success. 626 * Returns -ve errno on failure. 627 * 628 * Appears to be racy against CPU hotplug. 629 * 630 * schedule_on_each_cpu() is very slow. 631 */ 632 int schedule_on_each_cpu(work_func_t func) 633 { 634 int cpu; 635 struct work_struct *works; 636 637 works = alloc_percpu(struct work_struct); 638 if (!works) 639 return -ENOMEM; 640 641 mutex_lock(&workqueue_mutex); 642 for_each_online_cpu(cpu) { 643 struct work_struct *work = per_cpu_ptr(works, cpu); 644 645 INIT_WORK(work, func); 646 set_bit(WORK_STRUCT_PENDING, work_data_bits(work)); 647 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work); 648 } 649 mutex_unlock(&workqueue_mutex); 650 flush_workqueue(keventd_wq); 651 free_percpu(works); 652 return 0; 653 } 654 655 void flush_scheduled_work(void) 656 { 657 flush_workqueue(keventd_wq); 658 } 659 EXPORT_SYMBOL(flush_scheduled_work); 660 661 /** 662 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work. 663 * @wq: the controlling workqueue structure 664 * @dwork: the delayed work struct 665 */ 666 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 667 struct delayed_work *dwork) 668 { 669 while (!cancel_delayed_work(dwork)) 670 flush_workqueue(wq); 671 } 672 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 673 674 /** 675 * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work. 676 * @dwork: the delayed work struct 677 */ 678 void cancel_rearming_delayed_work(struct delayed_work *dwork) 679 { 680 cancel_rearming_delayed_workqueue(keventd_wq, dwork); 681 } 682 EXPORT_SYMBOL(cancel_rearming_delayed_work); 683 684 /** 685 * execute_in_process_context - reliably execute the routine with user context 686 * @fn: the function to execute 687 * @ew: guaranteed storage for the execute work structure (must 688 * be available when the work executes) 689 * 690 * Executes the function immediately if process context is available, 691 * otherwise schedules the function for delayed execution. 692 * 693 * Returns: 0 - function was executed 694 * 1 - function was scheduled for execution 695 */ 696 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 697 { 698 if (!in_interrupt()) { 699 fn(&ew->work); 700 return 0; 701 } 702 703 INIT_WORK(&ew->work, fn); 704 schedule_work(&ew->work); 705 706 return 1; 707 } 708 EXPORT_SYMBOL_GPL(execute_in_process_context); 709 710 int keventd_up(void) 711 { 712 return keventd_wq != NULL; 713 } 714 715 int current_is_keventd(void) 716 { 717 struct cpu_workqueue_struct *cwq; 718 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 719 int ret = 0; 720 721 BUG_ON(!keventd_wq); 722 723 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 724 if (current == cwq->thread) 725 ret = 1; 726 727 return ret; 728 729 } 730 731 /* Take the work from this (downed) CPU. */ 732 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 733 { 734 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 735 struct list_head list; 736 struct work_struct *work; 737 738 spin_lock_irq(&cwq->lock); 739 list_replace_init(&cwq->worklist, &list); 740 741 while (!list_empty(&list)) { 742 printk("Taking work for %s\n", wq->name); 743 work = list_entry(list.next,struct work_struct,entry); 744 list_del(&work->entry); 745 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 746 } 747 spin_unlock_irq(&cwq->lock); 748 } 749 750 /* We're holding the cpucontrol mutex here */ 751 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 752 unsigned long action, 753 void *hcpu) 754 { 755 unsigned int hotcpu = (unsigned long)hcpu; 756 struct workqueue_struct *wq; 757 758 switch (action) { 759 case CPU_UP_PREPARE: 760 mutex_lock(&workqueue_mutex); 761 /* Create a new workqueue thread for it. */ 762 list_for_each_entry(wq, &workqueues, list) { 763 if (!create_workqueue_thread(wq, hotcpu, 0)) { 764 printk("workqueue for %i failed\n", hotcpu); 765 return NOTIFY_BAD; 766 } 767 } 768 break; 769 770 case CPU_ONLINE: 771 /* Kick off worker threads. */ 772 list_for_each_entry(wq, &workqueues, list) { 773 struct cpu_workqueue_struct *cwq; 774 775 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 776 kthread_bind(cwq->thread, hotcpu); 777 wake_up_process(cwq->thread); 778 } 779 mutex_unlock(&workqueue_mutex); 780 break; 781 782 case CPU_UP_CANCELED: 783 list_for_each_entry(wq, &workqueues, list) { 784 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 785 continue; 786 /* Unbind so it can run. */ 787 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 788 any_online_cpu(cpu_online_map)); 789 cleanup_workqueue_thread(wq, hotcpu); 790 } 791 mutex_unlock(&workqueue_mutex); 792 break; 793 794 case CPU_DOWN_PREPARE: 795 mutex_lock(&workqueue_mutex); 796 break; 797 798 case CPU_DOWN_FAILED: 799 mutex_unlock(&workqueue_mutex); 800 break; 801 802 case CPU_DEAD: 803 list_for_each_entry(wq, &workqueues, list) 804 cleanup_workqueue_thread(wq, hotcpu); 805 list_for_each_entry(wq, &workqueues, list) 806 take_over_work(wq, hotcpu); 807 mutex_unlock(&workqueue_mutex); 808 break; 809 } 810 811 return NOTIFY_OK; 812 } 813 814 void init_workqueues(void) 815 { 816 singlethread_cpu = first_cpu(cpu_possible_map); 817 hotcpu_notifier(workqueue_cpu_callback, 0); 818 keventd_wq = create_workqueue("events"); 819 BUG_ON(!keventd_wq); 820 } 821 822