xref: /linux/kernel/workqueue.c (revision 5e8d780d745c1619aba81fe7166c5a4b5cad2b84)
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton <andrewm@uow.edu.au>
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
17  */
18 
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 
32 /*
33  * The per-CPU workqueue (if single thread, we always use the first
34  * possible cpu).
35  *
36  * The sequence counters are for flush_scheduled_work().  It wants to wait
37  * until until all currently-scheduled works are completed, but it doesn't
38  * want to be livelocked by new, incoming ones.  So it waits until
39  * remove_sequence is >= the insert_sequence which pertained when
40  * flush_scheduled_work() was called.
41  */
42 struct cpu_workqueue_struct {
43 
44 	spinlock_t lock;
45 
46 	long remove_sequence;	/* Least-recently added (next to run) */
47 	long insert_sequence;	/* Next to add */
48 
49 	struct list_head worklist;
50 	wait_queue_head_t more_work;
51 	wait_queue_head_t work_done;
52 
53 	struct workqueue_struct *wq;
54 	task_t *thread;
55 
56 	int run_depth;		/* Detect run_workqueue() recursion depth */
57 } ____cacheline_aligned;
58 
59 /*
60  * The externally visible workqueue abstraction is an array of
61  * per-CPU workqueues:
62  */
63 struct workqueue_struct {
64 	struct cpu_workqueue_struct *cpu_wq;
65 	const char *name;
66 	struct list_head list; 	/* Empty if single thread */
67 };
68 
69 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
70    threads to each one as cpus come/go. */
71 static DEFINE_SPINLOCK(workqueue_lock);
72 static LIST_HEAD(workqueues);
73 
74 static int singlethread_cpu;
75 
76 /* If it's single threaded, it isn't in the list of workqueues. */
77 static inline int is_single_threaded(struct workqueue_struct *wq)
78 {
79 	return list_empty(&wq->list);
80 }
81 
82 /* Preempt must be disabled. */
83 static void __queue_work(struct cpu_workqueue_struct *cwq,
84 			 struct work_struct *work)
85 {
86 	unsigned long flags;
87 
88 	spin_lock_irqsave(&cwq->lock, flags);
89 	work->wq_data = cwq;
90 	list_add_tail(&work->entry, &cwq->worklist);
91 	cwq->insert_sequence++;
92 	wake_up(&cwq->more_work);
93 	spin_unlock_irqrestore(&cwq->lock, flags);
94 }
95 
96 /*
97  * Queue work on a workqueue. Return non-zero if it was successfully
98  * added.
99  *
100  * We queue the work to the CPU it was submitted, but there is no
101  * guarantee that it will be processed by that CPU.
102  */
103 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
104 {
105 	int ret = 0, cpu = get_cpu();
106 
107 	if (!test_and_set_bit(0, &work->pending)) {
108 		if (unlikely(is_single_threaded(wq)))
109 			cpu = singlethread_cpu;
110 		BUG_ON(!list_empty(&work->entry));
111 		__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
112 		ret = 1;
113 	}
114 	put_cpu();
115 	return ret;
116 }
117 
118 static void delayed_work_timer_fn(unsigned long __data)
119 {
120 	struct work_struct *work = (struct work_struct *)__data;
121 	struct workqueue_struct *wq = work->wq_data;
122 	int cpu = smp_processor_id();
123 
124 	if (unlikely(is_single_threaded(wq)))
125 		cpu = singlethread_cpu;
126 
127 	__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
128 }
129 
130 int fastcall queue_delayed_work(struct workqueue_struct *wq,
131 			struct work_struct *work, unsigned long delay)
132 {
133 	int ret = 0;
134 	struct timer_list *timer = &work->timer;
135 
136 	if (!test_and_set_bit(0, &work->pending)) {
137 		BUG_ON(timer_pending(timer));
138 		BUG_ON(!list_empty(&work->entry));
139 
140 		/* This stores wq for the moment, for the timer_fn */
141 		work->wq_data = wq;
142 		timer->expires = jiffies + delay;
143 		timer->data = (unsigned long)work;
144 		timer->function = delayed_work_timer_fn;
145 		add_timer(timer);
146 		ret = 1;
147 	}
148 	return ret;
149 }
150 
151 static void run_workqueue(struct cpu_workqueue_struct *cwq)
152 {
153 	unsigned long flags;
154 
155 	/*
156 	 * Keep taking off work from the queue until
157 	 * done.
158 	 */
159 	spin_lock_irqsave(&cwq->lock, flags);
160 	cwq->run_depth++;
161 	if (cwq->run_depth > 3) {
162 		/* morton gets to eat his hat */
163 		printk("%s: recursion depth exceeded: %d\n",
164 			__FUNCTION__, cwq->run_depth);
165 		dump_stack();
166 	}
167 	while (!list_empty(&cwq->worklist)) {
168 		struct work_struct *work = list_entry(cwq->worklist.next,
169 						struct work_struct, entry);
170 		void (*f) (void *) = work->func;
171 		void *data = work->data;
172 
173 		list_del_init(cwq->worklist.next);
174 		spin_unlock_irqrestore(&cwq->lock, flags);
175 
176 		BUG_ON(work->wq_data != cwq);
177 		clear_bit(0, &work->pending);
178 		f(data);
179 
180 		spin_lock_irqsave(&cwq->lock, flags);
181 		cwq->remove_sequence++;
182 		wake_up(&cwq->work_done);
183 	}
184 	cwq->run_depth--;
185 	spin_unlock_irqrestore(&cwq->lock, flags);
186 }
187 
188 static int worker_thread(void *__cwq)
189 {
190 	struct cpu_workqueue_struct *cwq = __cwq;
191 	DECLARE_WAITQUEUE(wait, current);
192 	struct k_sigaction sa;
193 	sigset_t blocked;
194 
195 	current->flags |= PF_NOFREEZE;
196 
197 	set_user_nice(current, -5);
198 
199 	/* Block and flush all signals */
200 	sigfillset(&blocked);
201 	sigprocmask(SIG_BLOCK, &blocked, NULL);
202 	flush_signals(current);
203 
204 	/* SIG_IGN makes children autoreap: see do_notify_parent(). */
205 	sa.sa.sa_handler = SIG_IGN;
206 	sa.sa.sa_flags = 0;
207 	siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
208 	do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
209 
210 	set_current_state(TASK_INTERRUPTIBLE);
211 	while (!kthread_should_stop()) {
212 		add_wait_queue(&cwq->more_work, &wait);
213 		if (list_empty(&cwq->worklist))
214 			schedule();
215 		else
216 			__set_current_state(TASK_RUNNING);
217 		remove_wait_queue(&cwq->more_work, &wait);
218 
219 		if (!list_empty(&cwq->worklist))
220 			run_workqueue(cwq);
221 		set_current_state(TASK_INTERRUPTIBLE);
222 	}
223 	__set_current_state(TASK_RUNNING);
224 	return 0;
225 }
226 
227 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
228 {
229 	if (cwq->thread == current) {
230 		/*
231 		 * Probably keventd trying to flush its own queue. So simply run
232 		 * it by hand rather than deadlocking.
233 		 */
234 		run_workqueue(cwq);
235 	} else {
236 		DEFINE_WAIT(wait);
237 		long sequence_needed;
238 
239 		spin_lock_irq(&cwq->lock);
240 		sequence_needed = cwq->insert_sequence;
241 
242 		while (sequence_needed - cwq->remove_sequence > 0) {
243 			prepare_to_wait(&cwq->work_done, &wait,
244 					TASK_UNINTERRUPTIBLE);
245 			spin_unlock_irq(&cwq->lock);
246 			schedule();
247 			spin_lock_irq(&cwq->lock);
248 		}
249 		finish_wait(&cwq->work_done, &wait);
250 		spin_unlock_irq(&cwq->lock);
251 	}
252 }
253 
254 /*
255  * flush_workqueue - ensure that any scheduled work has run to completion.
256  *
257  * Forces execution of the workqueue and blocks until its completion.
258  * This is typically used in driver shutdown handlers.
259  *
260  * This function will sample each workqueue's current insert_sequence number and
261  * will sleep until the head sequence is greater than or equal to that.  This
262  * means that we sleep until all works which were queued on entry have been
263  * handled, but we are not livelocked by new incoming ones.
264  *
265  * This function used to run the workqueues itself.  Now we just wait for the
266  * helper threads to do it.
267  */
268 void fastcall flush_workqueue(struct workqueue_struct *wq)
269 {
270 	might_sleep();
271 
272 	if (is_single_threaded(wq)) {
273 		/* Always use first cpu's area. */
274 		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
275 	} else {
276 		int cpu;
277 
278 		lock_cpu_hotplug();
279 		for_each_online_cpu(cpu)
280 			flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
281 		unlock_cpu_hotplug();
282 	}
283 }
284 
285 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
286 						   int cpu)
287 {
288 	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
289 	struct task_struct *p;
290 
291 	spin_lock_init(&cwq->lock);
292 	cwq->wq = wq;
293 	cwq->thread = NULL;
294 	cwq->insert_sequence = 0;
295 	cwq->remove_sequence = 0;
296 	INIT_LIST_HEAD(&cwq->worklist);
297 	init_waitqueue_head(&cwq->more_work);
298 	init_waitqueue_head(&cwq->work_done);
299 
300 	if (is_single_threaded(wq))
301 		p = kthread_create(worker_thread, cwq, "%s", wq->name);
302 	else
303 		p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
304 	if (IS_ERR(p))
305 		return NULL;
306 	cwq->thread = p;
307 	return p;
308 }
309 
310 struct workqueue_struct *__create_workqueue(const char *name,
311 					    int singlethread)
312 {
313 	int cpu, destroy = 0;
314 	struct workqueue_struct *wq;
315 	struct task_struct *p;
316 
317 	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
318 	if (!wq)
319 		return NULL;
320 
321 	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
322 	if (!wq->cpu_wq) {
323 		kfree(wq);
324 		return NULL;
325 	}
326 
327 	wq->name = name;
328 	/* We don't need the distraction of CPUs appearing and vanishing. */
329 	lock_cpu_hotplug();
330 	if (singlethread) {
331 		INIT_LIST_HEAD(&wq->list);
332 		p = create_workqueue_thread(wq, singlethread_cpu);
333 		if (!p)
334 			destroy = 1;
335 		else
336 			wake_up_process(p);
337 	} else {
338 		spin_lock(&workqueue_lock);
339 		list_add(&wq->list, &workqueues);
340 		spin_unlock(&workqueue_lock);
341 		for_each_online_cpu(cpu) {
342 			p = create_workqueue_thread(wq, cpu);
343 			if (p) {
344 				kthread_bind(p, cpu);
345 				wake_up_process(p);
346 			} else
347 				destroy = 1;
348 		}
349 	}
350 	unlock_cpu_hotplug();
351 
352 	/*
353 	 * Was there any error during startup? If yes then clean up:
354 	 */
355 	if (destroy) {
356 		destroy_workqueue(wq);
357 		wq = NULL;
358 	}
359 	return wq;
360 }
361 
362 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
363 {
364 	struct cpu_workqueue_struct *cwq;
365 	unsigned long flags;
366 	struct task_struct *p;
367 
368 	cwq = per_cpu_ptr(wq->cpu_wq, cpu);
369 	spin_lock_irqsave(&cwq->lock, flags);
370 	p = cwq->thread;
371 	cwq->thread = NULL;
372 	spin_unlock_irqrestore(&cwq->lock, flags);
373 	if (p)
374 		kthread_stop(p);
375 }
376 
377 void destroy_workqueue(struct workqueue_struct *wq)
378 {
379 	int cpu;
380 
381 	flush_workqueue(wq);
382 
383 	/* We don't need the distraction of CPUs appearing and vanishing. */
384 	lock_cpu_hotplug();
385 	if (is_single_threaded(wq))
386 		cleanup_workqueue_thread(wq, singlethread_cpu);
387 	else {
388 		for_each_online_cpu(cpu)
389 			cleanup_workqueue_thread(wq, cpu);
390 		spin_lock(&workqueue_lock);
391 		list_del(&wq->list);
392 		spin_unlock(&workqueue_lock);
393 	}
394 	unlock_cpu_hotplug();
395 	free_percpu(wq->cpu_wq);
396 	kfree(wq);
397 }
398 
399 static struct workqueue_struct *keventd_wq;
400 
401 int fastcall schedule_work(struct work_struct *work)
402 {
403 	return queue_work(keventd_wq, work);
404 }
405 
406 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
407 {
408 	return queue_delayed_work(keventd_wq, work, delay);
409 }
410 
411 int schedule_delayed_work_on(int cpu,
412 			struct work_struct *work, unsigned long delay)
413 {
414 	int ret = 0;
415 	struct timer_list *timer = &work->timer;
416 
417 	if (!test_and_set_bit(0, &work->pending)) {
418 		BUG_ON(timer_pending(timer));
419 		BUG_ON(!list_empty(&work->entry));
420 		/* This stores keventd_wq for the moment, for the timer_fn */
421 		work->wq_data = keventd_wq;
422 		timer->expires = jiffies + delay;
423 		timer->data = (unsigned long)work;
424 		timer->function = delayed_work_timer_fn;
425 		add_timer_on(timer, cpu);
426 		ret = 1;
427 	}
428 	return ret;
429 }
430 
431 /**
432  * schedule_on_each_cpu - call a function on each online CPU from keventd
433  * @func: the function to call
434  * @info: a pointer to pass to func()
435  *
436  * Returns zero on success.
437  * Returns -ve errno on failure.
438  *
439  * Appears to be racy against CPU hotplug.
440  *
441  * schedule_on_each_cpu() is very slow.
442  */
443 int schedule_on_each_cpu(void (*func)(void *info), void *info)
444 {
445 	int cpu;
446 	struct work_struct *works;
447 
448 	works = alloc_percpu(struct work_struct);
449 	if (!works)
450 		return -ENOMEM;
451 
452 	for_each_online_cpu(cpu) {
453 		INIT_WORK(per_cpu_ptr(works, cpu), func, info);
454 		__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
455 				per_cpu_ptr(works, cpu));
456 	}
457 	flush_workqueue(keventd_wq);
458 	free_percpu(works);
459 	return 0;
460 }
461 
462 void flush_scheduled_work(void)
463 {
464 	flush_workqueue(keventd_wq);
465 }
466 
467 /**
468  * cancel_rearming_delayed_workqueue - reliably kill off a delayed
469  *			work whose handler rearms the delayed work.
470  * @wq:   the controlling workqueue structure
471  * @work: the delayed work struct
472  */
473 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
474 				       struct work_struct *work)
475 {
476 	while (!cancel_delayed_work(work))
477 		flush_workqueue(wq);
478 }
479 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
480 
481 /**
482  * cancel_rearming_delayed_work - reliably kill off a delayed keventd
483  *			work whose handler rearms the delayed work.
484  * @work: the delayed work struct
485  */
486 void cancel_rearming_delayed_work(struct work_struct *work)
487 {
488 	cancel_rearming_delayed_workqueue(keventd_wq, work);
489 }
490 EXPORT_SYMBOL(cancel_rearming_delayed_work);
491 
492 /**
493  * execute_in_process_context - reliably execute the routine with user context
494  * @fn:		the function to execute
495  * @data:	data to pass to the function
496  * @ew:		guaranteed storage for the execute work structure (must
497  *		be available when the work executes)
498  *
499  * Executes the function immediately if process context is available,
500  * otherwise schedules the function for delayed execution.
501  *
502  * Returns:	0 - function was executed
503  *		1 - function was scheduled for execution
504  */
505 int execute_in_process_context(void (*fn)(void *data), void *data,
506 			       struct execute_work *ew)
507 {
508 	if (!in_interrupt()) {
509 		fn(data);
510 		return 0;
511 	}
512 
513 	INIT_WORK(&ew->work, fn, data);
514 	schedule_work(&ew->work);
515 
516 	return 1;
517 }
518 EXPORT_SYMBOL_GPL(execute_in_process_context);
519 
520 int keventd_up(void)
521 {
522 	return keventd_wq != NULL;
523 }
524 
525 int current_is_keventd(void)
526 {
527 	struct cpu_workqueue_struct *cwq;
528 	int cpu = smp_processor_id();	/* preempt-safe: keventd is per-cpu */
529 	int ret = 0;
530 
531 	BUG_ON(!keventd_wq);
532 
533 	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
534 	if (current == cwq->thread)
535 		ret = 1;
536 
537 	return ret;
538 
539 }
540 
541 #ifdef CONFIG_HOTPLUG_CPU
542 /* Take the work from this (downed) CPU. */
543 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
544 {
545 	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
546 	struct list_head list;
547 	struct work_struct *work;
548 
549 	spin_lock_irq(&cwq->lock);
550 	list_replace_init(&cwq->worklist, &list);
551 
552 	while (!list_empty(&list)) {
553 		printk("Taking work for %s\n", wq->name);
554 		work = list_entry(list.next,struct work_struct,entry);
555 		list_del(&work->entry);
556 		__queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
557 	}
558 	spin_unlock_irq(&cwq->lock);
559 }
560 
561 /* We're holding the cpucontrol mutex here */
562 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
563 				  unsigned long action,
564 				  void *hcpu)
565 {
566 	unsigned int hotcpu = (unsigned long)hcpu;
567 	struct workqueue_struct *wq;
568 
569 	switch (action) {
570 	case CPU_UP_PREPARE:
571 		/* Create a new workqueue thread for it. */
572 		list_for_each_entry(wq, &workqueues, list) {
573 			if (!create_workqueue_thread(wq, hotcpu)) {
574 				printk("workqueue for %i failed\n", hotcpu);
575 				return NOTIFY_BAD;
576 			}
577 		}
578 		break;
579 
580 	case CPU_ONLINE:
581 		/* Kick off worker threads. */
582 		list_for_each_entry(wq, &workqueues, list) {
583 			struct cpu_workqueue_struct *cwq;
584 
585 			cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
586 			kthread_bind(cwq->thread, hotcpu);
587 			wake_up_process(cwq->thread);
588 		}
589 		break;
590 
591 	case CPU_UP_CANCELED:
592 		list_for_each_entry(wq, &workqueues, list) {
593 			if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
594 				continue;
595 			/* Unbind so it can run. */
596 			kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
597 				     any_online_cpu(cpu_online_map));
598 			cleanup_workqueue_thread(wq, hotcpu);
599 		}
600 		break;
601 
602 	case CPU_DEAD:
603 		list_for_each_entry(wq, &workqueues, list)
604 			cleanup_workqueue_thread(wq, hotcpu);
605 		list_for_each_entry(wq, &workqueues, list)
606 			take_over_work(wq, hotcpu);
607 		break;
608 	}
609 
610 	return NOTIFY_OK;
611 }
612 #endif
613 
614 void init_workqueues(void)
615 {
616 	singlethread_cpu = first_cpu(cpu_possible_map);
617 	hotcpu_notifier(workqueue_cpu_callback, 0);
618 	keventd_wq = create_workqueue("events");
619 	BUG_ON(!keventd_wq);
620 }
621 
622 EXPORT_SYMBOL_GPL(__create_workqueue);
623 EXPORT_SYMBOL_GPL(queue_work);
624 EXPORT_SYMBOL_GPL(queue_delayed_work);
625 EXPORT_SYMBOL_GPL(flush_workqueue);
626 EXPORT_SYMBOL_GPL(destroy_workqueue);
627 
628 EXPORT_SYMBOL(schedule_work);
629 EXPORT_SYMBOL(schedule_delayed_work);
630 EXPORT_SYMBOL(schedule_delayed_work_on);
631 EXPORT_SYMBOL(flush_scheduled_work);
632