1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 32 /* 33 * The per-CPU workqueue (if single thread, we always use the first 34 * possible cpu). 35 * 36 * The sequence counters are for flush_scheduled_work(). It wants to wait 37 * until until all currently-scheduled works are completed, but it doesn't 38 * want to be livelocked by new, incoming ones. So it waits until 39 * remove_sequence is >= the insert_sequence which pertained when 40 * flush_scheduled_work() was called. 41 */ 42 struct cpu_workqueue_struct { 43 44 spinlock_t lock; 45 46 long remove_sequence; /* Least-recently added (next to run) */ 47 long insert_sequence; /* Next to add */ 48 49 struct list_head worklist; 50 wait_queue_head_t more_work; 51 wait_queue_head_t work_done; 52 53 struct workqueue_struct *wq; 54 task_t *thread; 55 56 int run_depth; /* Detect run_workqueue() recursion depth */ 57 } ____cacheline_aligned; 58 59 /* 60 * The externally visible workqueue abstraction is an array of 61 * per-CPU workqueues: 62 */ 63 struct workqueue_struct { 64 struct cpu_workqueue_struct *cpu_wq; 65 const char *name; 66 struct list_head list; /* Empty if single thread */ 67 }; 68 69 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 70 threads to each one as cpus come/go. */ 71 static DEFINE_SPINLOCK(workqueue_lock); 72 static LIST_HEAD(workqueues); 73 74 static int singlethread_cpu; 75 76 /* If it's single threaded, it isn't in the list of workqueues. */ 77 static inline int is_single_threaded(struct workqueue_struct *wq) 78 { 79 return list_empty(&wq->list); 80 } 81 82 /* Preempt must be disabled. */ 83 static void __queue_work(struct cpu_workqueue_struct *cwq, 84 struct work_struct *work) 85 { 86 unsigned long flags; 87 88 spin_lock_irqsave(&cwq->lock, flags); 89 work->wq_data = cwq; 90 list_add_tail(&work->entry, &cwq->worklist); 91 cwq->insert_sequence++; 92 wake_up(&cwq->more_work); 93 spin_unlock_irqrestore(&cwq->lock, flags); 94 } 95 96 /* 97 * Queue work on a workqueue. Return non-zero if it was successfully 98 * added. 99 * 100 * We queue the work to the CPU it was submitted, but there is no 101 * guarantee that it will be processed by that CPU. 102 */ 103 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 104 { 105 int ret = 0, cpu = get_cpu(); 106 107 if (!test_and_set_bit(0, &work->pending)) { 108 if (unlikely(is_single_threaded(wq))) 109 cpu = singlethread_cpu; 110 BUG_ON(!list_empty(&work->entry)); 111 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 112 ret = 1; 113 } 114 put_cpu(); 115 return ret; 116 } 117 118 static void delayed_work_timer_fn(unsigned long __data) 119 { 120 struct work_struct *work = (struct work_struct *)__data; 121 struct workqueue_struct *wq = work->wq_data; 122 int cpu = smp_processor_id(); 123 124 if (unlikely(is_single_threaded(wq))) 125 cpu = singlethread_cpu; 126 127 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 128 } 129 130 int fastcall queue_delayed_work(struct workqueue_struct *wq, 131 struct work_struct *work, unsigned long delay) 132 { 133 int ret = 0; 134 struct timer_list *timer = &work->timer; 135 136 if (!test_and_set_bit(0, &work->pending)) { 137 BUG_ON(timer_pending(timer)); 138 BUG_ON(!list_empty(&work->entry)); 139 140 /* This stores wq for the moment, for the timer_fn */ 141 work->wq_data = wq; 142 timer->expires = jiffies + delay; 143 timer->data = (unsigned long)work; 144 timer->function = delayed_work_timer_fn; 145 add_timer(timer); 146 ret = 1; 147 } 148 return ret; 149 } 150 151 static void run_workqueue(struct cpu_workqueue_struct *cwq) 152 { 153 unsigned long flags; 154 155 /* 156 * Keep taking off work from the queue until 157 * done. 158 */ 159 spin_lock_irqsave(&cwq->lock, flags); 160 cwq->run_depth++; 161 if (cwq->run_depth > 3) { 162 /* morton gets to eat his hat */ 163 printk("%s: recursion depth exceeded: %d\n", 164 __FUNCTION__, cwq->run_depth); 165 dump_stack(); 166 } 167 while (!list_empty(&cwq->worklist)) { 168 struct work_struct *work = list_entry(cwq->worklist.next, 169 struct work_struct, entry); 170 void (*f) (void *) = work->func; 171 void *data = work->data; 172 173 list_del_init(cwq->worklist.next); 174 spin_unlock_irqrestore(&cwq->lock, flags); 175 176 BUG_ON(work->wq_data != cwq); 177 clear_bit(0, &work->pending); 178 f(data); 179 180 spin_lock_irqsave(&cwq->lock, flags); 181 cwq->remove_sequence++; 182 wake_up(&cwq->work_done); 183 } 184 cwq->run_depth--; 185 spin_unlock_irqrestore(&cwq->lock, flags); 186 } 187 188 static int worker_thread(void *__cwq) 189 { 190 struct cpu_workqueue_struct *cwq = __cwq; 191 DECLARE_WAITQUEUE(wait, current); 192 struct k_sigaction sa; 193 sigset_t blocked; 194 195 current->flags |= PF_NOFREEZE; 196 197 set_user_nice(current, -5); 198 199 /* Block and flush all signals */ 200 sigfillset(&blocked); 201 sigprocmask(SIG_BLOCK, &blocked, NULL); 202 flush_signals(current); 203 204 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 205 sa.sa.sa_handler = SIG_IGN; 206 sa.sa.sa_flags = 0; 207 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 208 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 209 210 set_current_state(TASK_INTERRUPTIBLE); 211 while (!kthread_should_stop()) { 212 add_wait_queue(&cwq->more_work, &wait); 213 if (list_empty(&cwq->worklist)) 214 schedule(); 215 else 216 __set_current_state(TASK_RUNNING); 217 remove_wait_queue(&cwq->more_work, &wait); 218 219 if (!list_empty(&cwq->worklist)) 220 run_workqueue(cwq); 221 set_current_state(TASK_INTERRUPTIBLE); 222 } 223 __set_current_state(TASK_RUNNING); 224 return 0; 225 } 226 227 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 228 { 229 if (cwq->thread == current) { 230 /* 231 * Probably keventd trying to flush its own queue. So simply run 232 * it by hand rather than deadlocking. 233 */ 234 run_workqueue(cwq); 235 } else { 236 DEFINE_WAIT(wait); 237 long sequence_needed; 238 239 spin_lock_irq(&cwq->lock); 240 sequence_needed = cwq->insert_sequence; 241 242 while (sequence_needed - cwq->remove_sequence > 0) { 243 prepare_to_wait(&cwq->work_done, &wait, 244 TASK_UNINTERRUPTIBLE); 245 spin_unlock_irq(&cwq->lock); 246 schedule(); 247 spin_lock_irq(&cwq->lock); 248 } 249 finish_wait(&cwq->work_done, &wait); 250 spin_unlock_irq(&cwq->lock); 251 } 252 } 253 254 /* 255 * flush_workqueue - ensure that any scheduled work has run to completion. 256 * 257 * Forces execution of the workqueue and blocks until its completion. 258 * This is typically used in driver shutdown handlers. 259 * 260 * This function will sample each workqueue's current insert_sequence number and 261 * will sleep until the head sequence is greater than or equal to that. This 262 * means that we sleep until all works which were queued on entry have been 263 * handled, but we are not livelocked by new incoming ones. 264 * 265 * This function used to run the workqueues itself. Now we just wait for the 266 * helper threads to do it. 267 */ 268 void fastcall flush_workqueue(struct workqueue_struct *wq) 269 { 270 might_sleep(); 271 272 if (is_single_threaded(wq)) { 273 /* Always use first cpu's area. */ 274 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 275 } else { 276 int cpu; 277 278 lock_cpu_hotplug(); 279 for_each_online_cpu(cpu) 280 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 281 unlock_cpu_hotplug(); 282 } 283 } 284 285 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 286 int cpu) 287 { 288 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 289 struct task_struct *p; 290 291 spin_lock_init(&cwq->lock); 292 cwq->wq = wq; 293 cwq->thread = NULL; 294 cwq->insert_sequence = 0; 295 cwq->remove_sequence = 0; 296 INIT_LIST_HEAD(&cwq->worklist); 297 init_waitqueue_head(&cwq->more_work); 298 init_waitqueue_head(&cwq->work_done); 299 300 if (is_single_threaded(wq)) 301 p = kthread_create(worker_thread, cwq, "%s", wq->name); 302 else 303 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 304 if (IS_ERR(p)) 305 return NULL; 306 cwq->thread = p; 307 return p; 308 } 309 310 struct workqueue_struct *__create_workqueue(const char *name, 311 int singlethread) 312 { 313 int cpu, destroy = 0; 314 struct workqueue_struct *wq; 315 struct task_struct *p; 316 317 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 318 if (!wq) 319 return NULL; 320 321 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 322 if (!wq->cpu_wq) { 323 kfree(wq); 324 return NULL; 325 } 326 327 wq->name = name; 328 /* We don't need the distraction of CPUs appearing and vanishing. */ 329 lock_cpu_hotplug(); 330 if (singlethread) { 331 INIT_LIST_HEAD(&wq->list); 332 p = create_workqueue_thread(wq, singlethread_cpu); 333 if (!p) 334 destroy = 1; 335 else 336 wake_up_process(p); 337 } else { 338 spin_lock(&workqueue_lock); 339 list_add(&wq->list, &workqueues); 340 spin_unlock(&workqueue_lock); 341 for_each_online_cpu(cpu) { 342 p = create_workqueue_thread(wq, cpu); 343 if (p) { 344 kthread_bind(p, cpu); 345 wake_up_process(p); 346 } else 347 destroy = 1; 348 } 349 } 350 unlock_cpu_hotplug(); 351 352 /* 353 * Was there any error during startup? If yes then clean up: 354 */ 355 if (destroy) { 356 destroy_workqueue(wq); 357 wq = NULL; 358 } 359 return wq; 360 } 361 362 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 363 { 364 struct cpu_workqueue_struct *cwq; 365 unsigned long flags; 366 struct task_struct *p; 367 368 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 369 spin_lock_irqsave(&cwq->lock, flags); 370 p = cwq->thread; 371 cwq->thread = NULL; 372 spin_unlock_irqrestore(&cwq->lock, flags); 373 if (p) 374 kthread_stop(p); 375 } 376 377 void destroy_workqueue(struct workqueue_struct *wq) 378 { 379 int cpu; 380 381 flush_workqueue(wq); 382 383 /* We don't need the distraction of CPUs appearing and vanishing. */ 384 lock_cpu_hotplug(); 385 if (is_single_threaded(wq)) 386 cleanup_workqueue_thread(wq, singlethread_cpu); 387 else { 388 for_each_online_cpu(cpu) 389 cleanup_workqueue_thread(wq, cpu); 390 spin_lock(&workqueue_lock); 391 list_del(&wq->list); 392 spin_unlock(&workqueue_lock); 393 } 394 unlock_cpu_hotplug(); 395 free_percpu(wq->cpu_wq); 396 kfree(wq); 397 } 398 399 static struct workqueue_struct *keventd_wq; 400 401 int fastcall schedule_work(struct work_struct *work) 402 { 403 return queue_work(keventd_wq, work); 404 } 405 406 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) 407 { 408 return queue_delayed_work(keventd_wq, work, delay); 409 } 410 411 int schedule_delayed_work_on(int cpu, 412 struct work_struct *work, unsigned long delay) 413 { 414 int ret = 0; 415 struct timer_list *timer = &work->timer; 416 417 if (!test_and_set_bit(0, &work->pending)) { 418 BUG_ON(timer_pending(timer)); 419 BUG_ON(!list_empty(&work->entry)); 420 /* This stores keventd_wq for the moment, for the timer_fn */ 421 work->wq_data = keventd_wq; 422 timer->expires = jiffies + delay; 423 timer->data = (unsigned long)work; 424 timer->function = delayed_work_timer_fn; 425 add_timer_on(timer, cpu); 426 ret = 1; 427 } 428 return ret; 429 } 430 431 int schedule_on_each_cpu(void (*func) (void *info), void *info) 432 { 433 int cpu; 434 struct work_struct *work; 435 436 work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL); 437 438 if (!work) 439 return -ENOMEM; 440 for_each_online_cpu(cpu) { 441 INIT_WORK(work + cpu, func, info); 442 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), 443 work + cpu); 444 } 445 flush_workqueue(keventd_wq); 446 kfree(work); 447 return 0; 448 } 449 450 void flush_scheduled_work(void) 451 { 452 flush_workqueue(keventd_wq); 453 } 454 455 /** 456 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 457 * work whose handler rearms the delayed work. 458 * @wq: the controlling workqueue structure 459 * @work: the delayed work struct 460 */ 461 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 462 struct work_struct *work) 463 { 464 while (!cancel_delayed_work(work)) 465 flush_workqueue(wq); 466 } 467 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 468 469 /** 470 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 471 * work whose handler rearms the delayed work. 472 * @work: the delayed work struct 473 */ 474 void cancel_rearming_delayed_work(struct work_struct *work) 475 { 476 cancel_rearming_delayed_workqueue(keventd_wq, work); 477 } 478 EXPORT_SYMBOL(cancel_rearming_delayed_work); 479 480 /** 481 * execute_in_process_context - reliably execute the routine with user context 482 * @fn: the function to execute 483 * @data: data to pass to the function 484 * @ew: guaranteed storage for the execute work structure (must 485 * be available when the work executes) 486 * 487 * Executes the function immediately if process context is available, 488 * otherwise schedules the function for delayed execution. 489 * 490 * Returns: 0 - function was executed 491 * 1 - function was scheduled for execution 492 */ 493 int execute_in_process_context(void (*fn)(void *data), void *data, 494 struct execute_work *ew) 495 { 496 if (!in_interrupt()) { 497 fn(data); 498 return 0; 499 } 500 501 INIT_WORK(&ew->work, fn, data); 502 schedule_work(&ew->work); 503 504 return 1; 505 } 506 EXPORT_SYMBOL_GPL(execute_in_process_context); 507 508 int keventd_up(void) 509 { 510 return keventd_wq != NULL; 511 } 512 513 int current_is_keventd(void) 514 { 515 struct cpu_workqueue_struct *cwq; 516 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 517 int ret = 0; 518 519 BUG_ON(!keventd_wq); 520 521 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 522 if (current == cwq->thread) 523 ret = 1; 524 525 return ret; 526 527 } 528 529 #ifdef CONFIG_HOTPLUG_CPU 530 /* Take the work from this (downed) CPU. */ 531 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 532 { 533 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 534 LIST_HEAD(list); 535 struct work_struct *work; 536 537 spin_lock_irq(&cwq->lock); 538 list_splice_init(&cwq->worklist, &list); 539 540 while (!list_empty(&list)) { 541 printk("Taking work for %s\n", wq->name); 542 work = list_entry(list.next,struct work_struct,entry); 543 list_del(&work->entry); 544 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 545 } 546 spin_unlock_irq(&cwq->lock); 547 } 548 549 /* We're holding the cpucontrol mutex here */ 550 static int workqueue_cpu_callback(struct notifier_block *nfb, 551 unsigned long action, 552 void *hcpu) 553 { 554 unsigned int hotcpu = (unsigned long)hcpu; 555 struct workqueue_struct *wq; 556 557 switch (action) { 558 case CPU_UP_PREPARE: 559 /* Create a new workqueue thread for it. */ 560 list_for_each_entry(wq, &workqueues, list) { 561 if (!create_workqueue_thread(wq, hotcpu)) { 562 printk("workqueue for %i failed\n", hotcpu); 563 return NOTIFY_BAD; 564 } 565 } 566 break; 567 568 case CPU_ONLINE: 569 /* Kick off worker threads. */ 570 list_for_each_entry(wq, &workqueues, list) { 571 struct cpu_workqueue_struct *cwq; 572 573 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 574 kthread_bind(cwq->thread, hotcpu); 575 wake_up_process(cwq->thread); 576 } 577 break; 578 579 case CPU_UP_CANCELED: 580 list_for_each_entry(wq, &workqueues, list) { 581 /* Unbind so it can run. */ 582 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 583 any_online_cpu(cpu_online_map)); 584 cleanup_workqueue_thread(wq, hotcpu); 585 } 586 break; 587 588 case CPU_DEAD: 589 list_for_each_entry(wq, &workqueues, list) 590 cleanup_workqueue_thread(wq, hotcpu); 591 list_for_each_entry(wq, &workqueues, list) 592 take_over_work(wq, hotcpu); 593 break; 594 } 595 596 return NOTIFY_OK; 597 } 598 #endif 599 600 void init_workqueues(void) 601 { 602 singlethread_cpu = first_cpu(cpu_possible_map); 603 hotcpu_notifier(workqueue_cpu_callback, 0); 604 keventd_wq = create_workqueue("events"); 605 BUG_ON(!keventd_wq); 606 } 607 608 EXPORT_SYMBOL_GPL(__create_workqueue); 609 EXPORT_SYMBOL_GPL(queue_work); 610 EXPORT_SYMBOL_GPL(queue_delayed_work); 611 EXPORT_SYMBOL_GPL(flush_workqueue); 612 EXPORT_SYMBOL_GPL(destroy_workqueue); 613 614 EXPORT_SYMBOL(schedule_work); 615 EXPORT_SYMBOL(schedule_delayed_work); 616 EXPORT_SYMBOL(schedule_delayed_work_on); 617 EXPORT_SYMBOL(flush_scheduled_work); 618