1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 32 /* 33 * The per-CPU workqueue (if single thread, we always use the first 34 * possible cpu). 35 * 36 * The sequence counters are for flush_scheduled_work(). It wants to wait 37 * until until all currently-scheduled works are completed, but it doesn't 38 * want to be livelocked by new, incoming ones. So it waits until 39 * remove_sequence is >= the insert_sequence which pertained when 40 * flush_scheduled_work() was called. 41 */ 42 struct cpu_workqueue_struct { 43 44 spinlock_t lock; 45 46 long remove_sequence; /* Least-recently added (next to run) */ 47 long insert_sequence; /* Next to add */ 48 49 struct list_head worklist; 50 wait_queue_head_t more_work; 51 wait_queue_head_t work_done; 52 53 struct workqueue_struct *wq; 54 struct task_struct *thread; 55 56 int run_depth; /* Detect run_workqueue() recursion depth */ 57 } ____cacheline_aligned; 58 59 /* 60 * The externally visible workqueue abstraction is an array of 61 * per-CPU workqueues: 62 */ 63 struct workqueue_struct { 64 struct cpu_workqueue_struct *cpu_wq; 65 const char *name; 66 struct list_head list; /* Empty if single thread */ 67 }; 68 69 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 70 threads to each one as cpus come/go. */ 71 static DEFINE_SPINLOCK(workqueue_lock); 72 static LIST_HEAD(workqueues); 73 74 static int singlethread_cpu; 75 76 /* If it's single threaded, it isn't in the list of workqueues. */ 77 static inline int is_single_threaded(struct workqueue_struct *wq) 78 { 79 return list_empty(&wq->list); 80 } 81 82 /* Preempt must be disabled. */ 83 static void __queue_work(struct cpu_workqueue_struct *cwq, 84 struct work_struct *work) 85 { 86 unsigned long flags; 87 88 spin_lock_irqsave(&cwq->lock, flags); 89 work->wq_data = cwq; 90 list_add_tail(&work->entry, &cwq->worklist); 91 cwq->insert_sequence++; 92 wake_up(&cwq->more_work); 93 spin_unlock_irqrestore(&cwq->lock, flags); 94 } 95 96 /** 97 * queue_work - queue work on a workqueue 98 * @wq: workqueue to use 99 * @work: work to queue 100 * 101 * Returns non-zero if it was successfully added. 102 * 103 * We queue the work to the CPU it was submitted, but there is no 104 * guarantee that it will be processed by that CPU. 105 */ 106 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 107 { 108 int ret = 0, cpu = get_cpu(); 109 110 if (!test_and_set_bit(0, &work->pending)) { 111 if (unlikely(is_single_threaded(wq))) 112 cpu = singlethread_cpu; 113 BUG_ON(!list_empty(&work->entry)); 114 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 115 ret = 1; 116 } 117 put_cpu(); 118 return ret; 119 } 120 EXPORT_SYMBOL_GPL(queue_work); 121 122 static void delayed_work_timer_fn(unsigned long __data) 123 { 124 struct work_struct *work = (struct work_struct *)__data; 125 struct workqueue_struct *wq = work->wq_data; 126 int cpu = smp_processor_id(); 127 128 if (unlikely(is_single_threaded(wq))) 129 cpu = singlethread_cpu; 130 131 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 132 } 133 134 /** 135 * queue_delayed_work - queue work on a workqueue after delay 136 * @wq: workqueue to use 137 * @work: work to queue 138 * @delay: number of jiffies to wait before queueing 139 * 140 * Returns non-zero if it was successfully added. 141 */ 142 int fastcall queue_delayed_work(struct workqueue_struct *wq, 143 struct work_struct *work, unsigned long delay) 144 { 145 int ret = 0; 146 struct timer_list *timer = &work->timer; 147 148 if (!test_and_set_bit(0, &work->pending)) { 149 BUG_ON(timer_pending(timer)); 150 BUG_ON(!list_empty(&work->entry)); 151 152 /* This stores wq for the moment, for the timer_fn */ 153 work->wq_data = wq; 154 timer->expires = jiffies + delay; 155 timer->data = (unsigned long)work; 156 timer->function = delayed_work_timer_fn; 157 add_timer(timer); 158 ret = 1; 159 } 160 return ret; 161 } 162 EXPORT_SYMBOL_GPL(queue_delayed_work); 163 164 /** 165 * queue_delayed_work_on - queue work on specific CPU after delay 166 * @cpu: CPU number to execute work on 167 * @wq: workqueue to use 168 * @work: work to queue 169 * @delay: number of jiffies to wait before queueing 170 * 171 * Returns non-zero if it was successfully added. 172 */ 173 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 174 struct work_struct *work, unsigned long delay) 175 { 176 int ret = 0; 177 struct timer_list *timer = &work->timer; 178 179 if (!test_and_set_bit(0, &work->pending)) { 180 BUG_ON(timer_pending(timer)); 181 BUG_ON(!list_empty(&work->entry)); 182 183 /* This stores wq for the moment, for the timer_fn */ 184 work->wq_data = wq; 185 timer->expires = jiffies + delay; 186 timer->data = (unsigned long)work; 187 timer->function = delayed_work_timer_fn; 188 add_timer_on(timer, cpu); 189 ret = 1; 190 } 191 return ret; 192 } 193 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 194 195 static void run_workqueue(struct cpu_workqueue_struct *cwq) 196 { 197 unsigned long flags; 198 199 /* 200 * Keep taking off work from the queue until 201 * done. 202 */ 203 spin_lock_irqsave(&cwq->lock, flags); 204 cwq->run_depth++; 205 if (cwq->run_depth > 3) { 206 /* morton gets to eat his hat */ 207 printk("%s: recursion depth exceeded: %d\n", 208 __FUNCTION__, cwq->run_depth); 209 dump_stack(); 210 } 211 while (!list_empty(&cwq->worklist)) { 212 struct work_struct *work = list_entry(cwq->worklist.next, 213 struct work_struct, entry); 214 void (*f) (void *) = work->func; 215 void *data = work->data; 216 217 list_del_init(cwq->worklist.next); 218 spin_unlock_irqrestore(&cwq->lock, flags); 219 220 BUG_ON(work->wq_data != cwq); 221 clear_bit(0, &work->pending); 222 f(data); 223 224 spin_lock_irqsave(&cwq->lock, flags); 225 cwq->remove_sequence++; 226 wake_up(&cwq->work_done); 227 } 228 cwq->run_depth--; 229 spin_unlock_irqrestore(&cwq->lock, flags); 230 } 231 232 static int worker_thread(void *__cwq) 233 { 234 struct cpu_workqueue_struct *cwq = __cwq; 235 DECLARE_WAITQUEUE(wait, current); 236 struct k_sigaction sa; 237 sigset_t blocked; 238 239 current->flags |= PF_NOFREEZE; 240 241 set_user_nice(current, -5); 242 243 /* Block and flush all signals */ 244 sigfillset(&blocked); 245 sigprocmask(SIG_BLOCK, &blocked, NULL); 246 flush_signals(current); 247 248 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 249 sa.sa.sa_handler = SIG_IGN; 250 sa.sa.sa_flags = 0; 251 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 252 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 253 254 set_current_state(TASK_INTERRUPTIBLE); 255 while (!kthread_should_stop()) { 256 add_wait_queue(&cwq->more_work, &wait); 257 if (list_empty(&cwq->worklist)) 258 schedule(); 259 else 260 __set_current_state(TASK_RUNNING); 261 remove_wait_queue(&cwq->more_work, &wait); 262 263 if (!list_empty(&cwq->worklist)) 264 run_workqueue(cwq); 265 set_current_state(TASK_INTERRUPTIBLE); 266 } 267 __set_current_state(TASK_RUNNING); 268 return 0; 269 } 270 271 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 272 { 273 if (cwq->thread == current) { 274 /* 275 * Probably keventd trying to flush its own queue. So simply run 276 * it by hand rather than deadlocking. 277 */ 278 run_workqueue(cwq); 279 } else { 280 DEFINE_WAIT(wait); 281 long sequence_needed; 282 283 spin_lock_irq(&cwq->lock); 284 sequence_needed = cwq->insert_sequence; 285 286 while (sequence_needed - cwq->remove_sequence > 0) { 287 prepare_to_wait(&cwq->work_done, &wait, 288 TASK_UNINTERRUPTIBLE); 289 spin_unlock_irq(&cwq->lock); 290 schedule(); 291 spin_lock_irq(&cwq->lock); 292 } 293 finish_wait(&cwq->work_done, &wait); 294 spin_unlock_irq(&cwq->lock); 295 } 296 } 297 298 /** 299 * flush_workqueue - ensure that any scheduled work has run to completion. 300 * @wq: workqueue to flush 301 * 302 * Forces execution of the workqueue and blocks until its completion. 303 * This is typically used in driver shutdown handlers. 304 * 305 * This function will sample each workqueue's current insert_sequence number and 306 * will sleep until the head sequence is greater than or equal to that. This 307 * means that we sleep until all works which were queued on entry have been 308 * handled, but we are not livelocked by new incoming ones. 309 * 310 * This function used to run the workqueues itself. Now we just wait for the 311 * helper threads to do it. 312 */ 313 void fastcall flush_workqueue(struct workqueue_struct *wq) 314 { 315 might_sleep(); 316 317 if (is_single_threaded(wq)) { 318 /* Always use first cpu's area. */ 319 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 320 } else { 321 int cpu; 322 323 lock_cpu_hotplug(); 324 for_each_online_cpu(cpu) 325 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 326 unlock_cpu_hotplug(); 327 } 328 } 329 EXPORT_SYMBOL_GPL(flush_workqueue); 330 331 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 332 int cpu) 333 { 334 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 335 struct task_struct *p; 336 337 spin_lock_init(&cwq->lock); 338 cwq->wq = wq; 339 cwq->thread = NULL; 340 cwq->insert_sequence = 0; 341 cwq->remove_sequence = 0; 342 INIT_LIST_HEAD(&cwq->worklist); 343 init_waitqueue_head(&cwq->more_work); 344 init_waitqueue_head(&cwq->work_done); 345 346 if (is_single_threaded(wq)) 347 p = kthread_create(worker_thread, cwq, "%s", wq->name); 348 else 349 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 350 if (IS_ERR(p)) 351 return NULL; 352 cwq->thread = p; 353 return p; 354 } 355 356 struct workqueue_struct *__create_workqueue(const char *name, 357 int singlethread) 358 { 359 int cpu, destroy = 0; 360 struct workqueue_struct *wq; 361 struct task_struct *p; 362 363 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 364 if (!wq) 365 return NULL; 366 367 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 368 if (!wq->cpu_wq) { 369 kfree(wq); 370 return NULL; 371 } 372 373 wq->name = name; 374 /* We don't need the distraction of CPUs appearing and vanishing. */ 375 lock_cpu_hotplug(); 376 if (singlethread) { 377 INIT_LIST_HEAD(&wq->list); 378 p = create_workqueue_thread(wq, singlethread_cpu); 379 if (!p) 380 destroy = 1; 381 else 382 wake_up_process(p); 383 } else { 384 spin_lock(&workqueue_lock); 385 list_add(&wq->list, &workqueues); 386 spin_unlock(&workqueue_lock); 387 for_each_online_cpu(cpu) { 388 p = create_workqueue_thread(wq, cpu); 389 if (p) { 390 kthread_bind(p, cpu); 391 wake_up_process(p); 392 } else 393 destroy = 1; 394 } 395 } 396 unlock_cpu_hotplug(); 397 398 /* 399 * Was there any error during startup? If yes then clean up: 400 */ 401 if (destroy) { 402 destroy_workqueue(wq); 403 wq = NULL; 404 } 405 return wq; 406 } 407 EXPORT_SYMBOL_GPL(__create_workqueue); 408 409 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 410 { 411 struct cpu_workqueue_struct *cwq; 412 unsigned long flags; 413 struct task_struct *p; 414 415 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 416 spin_lock_irqsave(&cwq->lock, flags); 417 p = cwq->thread; 418 cwq->thread = NULL; 419 spin_unlock_irqrestore(&cwq->lock, flags); 420 if (p) 421 kthread_stop(p); 422 } 423 424 /** 425 * destroy_workqueue - safely terminate a workqueue 426 * @wq: target workqueue 427 * 428 * Safely destroy a workqueue. All work currently pending will be done first. 429 */ 430 void destroy_workqueue(struct workqueue_struct *wq) 431 { 432 int cpu; 433 434 flush_workqueue(wq); 435 436 /* We don't need the distraction of CPUs appearing and vanishing. */ 437 lock_cpu_hotplug(); 438 if (is_single_threaded(wq)) 439 cleanup_workqueue_thread(wq, singlethread_cpu); 440 else { 441 for_each_online_cpu(cpu) 442 cleanup_workqueue_thread(wq, cpu); 443 spin_lock(&workqueue_lock); 444 list_del(&wq->list); 445 spin_unlock(&workqueue_lock); 446 } 447 unlock_cpu_hotplug(); 448 free_percpu(wq->cpu_wq); 449 kfree(wq); 450 } 451 EXPORT_SYMBOL_GPL(destroy_workqueue); 452 453 static struct workqueue_struct *keventd_wq; 454 455 /** 456 * schedule_work - put work task in global workqueue 457 * @work: job to be done 458 * 459 * This puts a job in the kernel-global workqueue. 460 */ 461 int fastcall schedule_work(struct work_struct *work) 462 { 463 return queue_work(keventd_wq, work); 464 } 465 EXPORT_SYMBOL(schedule_work); 466 467 /** 468 * schedule_delayed_work - put work task in global workqueue after delay 469 * @work: job to be done 470 * @delay: number of jiffies to wait 471 * 472 * After waiting for a given time this puts a job in the kernel-global 473 * workqueue. 474 */ 475 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) 476 { 477 return queue_delayed_work(keventd_wq, work, delay); 478 } 479 EXPORT_SYMBOL(schedule_delayed_work); 480 481 /** 482 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 483 * @cpu: cpu to use 484 * @work: job to be done 485 * @delay: number of jiffies to wait 486 * 487 * After waiting for a given time this puts a job in the kernel-global 488 * workqueue on the specified CPU. 489 */ 490 int schedule_delayed_work_on(int cpu, 491 struct work_struct *work, unsigned long delay) 492 { 493 return queue_delayed_work_on(cpu, keventd_wq, work, delay); 494 } 495 EXPORT_SYMBOL(schedule_delayed_work_on); 496 497 /** 498 * schedule_on_each_cpu - call a function on each online CPU from keventd 499 * @func: the function to call 500 * @info: a pointer to pass to func() 501 * 502 * Returns zero on success. 503 * Returns -ve errno on failure. 504 * 505 * Appears to be racy against CPU hotplug. 506 * 507 * schedule_on_each_cpu() is very slow. 508 */ 509 int schedule_on_each_cpu(void (*func)(void *info), void *info) 510 { 511 int cpu; 512 struct work_struct *works; 513 514 works = alloc_percpu(struct work_struct); 515 if (!works) 516 return -ENOMEM; 517 518 for_each_online_cpu(cpu) { 519 INIT_WORK(per_cpu_ptr(works, cpu), func, info); 520 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), 521 per_cpu_ptr(works, cpu)); 522 } 523 flush_workqueue(keventd_wq); 524 free_percpu(works); 525 return 0; 526 } 527 528 void flush_scheduled_work(void) 529 { 530 flush_workqueue(keventd_wq); 531 } 532 EXPORT_SYMBOL(flush_scheduled_work); 533 534 /** 535 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 536 * work whose handler rearms the delayed work. 537 * @wq: the controlling workqueue structure 538 * @work: the delayed work struct 539 */ 540 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 541 struct work_struct *work) 542 { 543 while (!cancel_delayed_work(work)) 544 flush_workqueue(wq); 545 } 546 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 547 548 /** 549 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 550 * work whose handler rearms the delayed work. 551 * @work: the delayed work struct 552 */ 553 void cancel_rearming_delayed_work(struct work_struct *work) 554 { 555 cancel_rearming_delayed_workqueue(keventd_wq, work); 556 } 557 EXPORT_SYMBOL(cancel_rearming_delayed_work); 558 559 /** 560 * execute_in_process_context - reliably execute the routine with user context 561 * @fn: the function to execute 562 * @data: data to pass to the function 563 * @ew: guaranteed storage for the execute work structure (must 564 * be available when the work executes) 565 * 566 * Executes the function immediately if process context is available, 567 * otherwise schedules the function for delayed execution. 568 * 569 * Returns: 0 - function was executed 570 * 1 - function was scheduled for execution 571 */ 572 int execute_in_process_context(void (*fn)(void *data), void *data, 573 struct execute_work *ew) 574 { 575 if (!in_interrupt()) { 576 fn(data); 577 return 0; 578 } 579 580 INIT_WORK(&ew->work, fn, data); 581 schedule_work(&ew->work); 582 583 return 1; 584 } 585 EXPORT_SYMBOL_GPL(execute_in_process_context); 586 587 int keventd_up(void) 588 { 589 return keventd_wq != NULL; 590 } 591 592 int current_is_keventd(void) 593 { 594 struct cpu_workqueue_struct *cwq; 595 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 596 int ret = 0; 597 598 BUG_ON(!keventd_wq); 599 600 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 601 if (current == cwq->thread) 602 ret = 1; 603 604 return ret; 605 606 } 607 608 #ifdef CONFIG_HOTPLUG_CPU 609 /* Take the work from this (downed) CPU. */ 610 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 611 { 612 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 613 struct list_head list; 614 struct work_struct *work; 615 616 spin_lock_irq(&cwq->lock); 617 list_replace_init(&cwq->worklist, &list); 618 619 while (!list_empty(&list)) { 620 printk("Taking work for %s\n", wq->name); 621 work = list_entry(list.next,struct work_struct,entry); 622 list_del(&work->entry); 623 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 624 } 625 spin_unlock_irq(&cwq->lock); 626 } 627 628 /* We're holding the cpucontrol mutex here */ 629 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 630 unsigned long action, 631 void *hcpu) 632 { 633 unsigned int hotcpu = (unsigned long)hcpu; 634 struct workqueue_struct *wq; 635 636 switch (action) { 637 case CPU_UP_PREPARE: 638 /* Create a new workqueue thread for it. */ 639 list_for_each_entry(wq, &workqueues, list) { 640 if (!create_workqueue_thread(wq, hotcpu)) { 641 printk("workqueue for %i failed\n", hotcpu); 642 return NOTIFY_BAD; 643 } 644 } 645 break; 646 647 case CPU_ONLINE: 648 /* Kick off worker threads. */ 649 list_for_each_entry(wq, &workqueues, list) { 650 struct cpu_workqueue_struct *cwq; 651 652 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 653 kthread_bind(cwq->thread, hotcpu); 654 wake_up_process(cwq->thread); 655 } 656 break; 657 658 case CPU_UP_CANCELED: 659 list_for_each_entry(wq, &workqueues, list) { 660 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 661 continue; 662 /* Unbind so it can run. */ 663 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 664 any_online_cpu(cpu_online_map)); 665 cleanup_workqueue_thread(wq, hotcpu); 666 } 667 break; 668 669 case CPU_DEAD: 670 list_for_each_entry(wq, &workqueues, list) 671 cleanup_workqueue_thread(wq, hotcpu); 672 list_for_each_entry(wq, &workqueues, list) 673 take_over_work(wq, hotcpu); 674 break; 675 } 676 677 return NOTIFY_OK; 678 } 679 #endif 680 681 void init_workqueues(void) 682 { 683 singlethread_cpu = first_cpu(cpu_possible_map); 684 hotcpu_notifier(workqueue_cpu_callback, 0); 685 keventd_wq = create_workqueue("events"); 686 BUG_ON(!keventd_wq); 687 } 688 689