xref: /linux/kernel/workqueue.c (revision f3d9478b2ce468c3115b02ecae7e975990697f15)
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton <andrewm@uow.edu.au>
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
17  */
18 
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 #include <linux/hardirq.h>
31 
32 /*
33  * The per-CPU workqueue (if single thread, we always use the first
34  * possible cpu).
35  *
36  * The sequence counters are for flush_scheduled_work().  It wants to wait
37  * until until all currently-scheduled works are completed, but it doesn't
38  * want to be livelocked by new, incoming ones.  So it waits until
39  * remove_sequence is >= the insert_sequence which pertained when
40  * flush_scheduled_work() was called.
41  */
42 struct cpu_workqueue_struct {
43 
44 	spinlock_t lock;
45 
46 	long remove_sequence;	/* Least-recently added (next to run) */
47 	long insert_sequence;	/* Next to add */
48 
49 	struct list_head worklist;
50 	wait_queue_head_t more_work;
51 	wait_queue_head_t work_done;
52 
53 	struct workqueue_struct *wq;
54 	task_t *thread;
55 
56 	int run_depth;		/* Detect run_workqueue() recursion depth */
57 } ____cacheline_aligned;
58 
59 /*
60  * The externally visible workqueue abstraction is an array of
61  * per-CPU workqueues:
62  */
63 struct workqueue_struct {
64 	struct cpu_workqueue_struct *cpu_wq;
65 	const char *name;
66 	struct list_head list; 	/* Empty if single thread */
67 };
68 
69 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
70    threads to each one as cpus come/go. */
71 static DEFINE_SPINLOCK(workqueue_lock);
72 static LIST_HEAD(workqueues);
73 
74 static int singlethread_cpu;
75 
76 /* If it's single threaded, it isn't in the list of workqueues. */
77 static inline int is_single_threaded(struct workqueue_struct *wq)
78 {
79 	return list_empty(&wq->list);
80 }
81 
82 /* Preempt must be disabled. */
83 static void __queue_work(struct cpu_workqueue_struct *cwq,
84 			 struct work_struct *work)
85 {
86 	unsigned long flags;
87 
88 	spin_lock_irqsave(&cwq->lock, flags);
89 	work->wq_data = cwq;
90 	list_add_tail(&work->entry, &cwq->worklist);
91 	cwq->insert_sequence++;
92 	wake_up(&cwq->more_work);
93 	spin_unlock_irqrestore(&cwq->lock, flags);
94 }
95 
96 /*
97  * Queue work on a workqueue. Return non-zero if it was successfully
98  * added.
99  *
100  * We queue the work to the CPU it was submitted, but there is no
101  * guarantee that it will be processed by that CPU.
102  */
103 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
104 {
105 	int ret = 0, cpu = get_cpu();
106 
107 	if (!test_and_set_bit(0, &work->pending)) {
108 		if (unlikely(is_single_threaded(wq)))
109 			cpu = singlethread_cpu;
110 		BUG_ON(!list_empty(&work->entry));
111 		__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
112 		ret = 1;
113 	}
114 	put_cpu();
115 	return ret;
116 }
117 
118 static void delayed_work_timer_fn(unsigned long __data)
119 {
120 	struct work_struct *work = (struct work_struct *)__data;
121 	struct workqueue_struct *wq = work->wq_data;
122 	int cpu = smp_processor_id();
123 
124 	if (unlikely(is_single_threaded(wq)))
125 		cpu = singlethread_cpu;
126 
127 	__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
128 }
129 
130 int fastcall queue_delayed_work(struct workqueue_struct *wq,
131 			struct work_struct *work, unsigned long delay)
132 {
133 	int ret = 0;
134 	struct timer_list *timer = &work->timer;
135 
136 	if (!test_and_set_bit(0, &work->pending)) {
137 		BUG_ON(timer_pending(timer));
138 		BUG_ON(!list_empty(&work->entry));
139 
140 		/* This stores wq for the moment, for the timer_fn */
141 		work->wq_data = wq;
142 		timer->expires = jiffies + delay;
143 		timer->data = (unsigned long)work;
144 		timer->function = delayed_work_timer_fn;
145 		add_timer(timer);
146 		ret = 1;
147 	}
148 	return ret;
149 }
150 
151 static void run_workqueue(struct cpu_workqueue_struct *cwq)
152 {
153 	unsigned long flags;
154 
155 	/*
156 	 * Keep taking off work from the queue until
157 	 * done.
158 	 */
159 	spin_lock_irqsave(&cwq->lock, flags);
160 	cwq->run_depth++;
161 	if (cwq->run_depth > 3) {
162 		/* morton gets to eat his hat */
163 		printk("%s: recursion depth exceeded: %d\n",
164 			__FUNCTION__, cwq->run_depth);
165 		dump_stack();
166 	}
167 	while (!list_empty(&cwq->worklist)) {
168 		struct work_struct *work = list_entry(cwq->worklist.next,
169 						struct work_struct, entry);
170 		void (*f) (void *) = work->func;
171 		void *data = work->data;
172 
173 		list_del_init(cwq->worklist.next);
174 		spin_unlock_irqrestore(&cwq->lock, flags);
175 
176 		BUG_ON(work->wq_data != cwq);
177 		clear_bit(0, &work->pending);
178 		f(data);
179 
180 		spin_lock_irqsave(&cwq->lock, flags);
181 		cwq->remove_sequence++;
182 		wake_up(&cwq->work_done);
183 	}
184 	cwq->run_depth--;
185 	spin_unlock_irqrestore(&cwq->lock, flags);
186 }
187 
188 static int worker_thread(void *__cwq)
189 {
190 	struct cpu_workqueue_struct *cwq = __cwq;
191 	DECLARE_WAITQUEUE(wait, current);
192 	struct k_sigaction sa;
193 	sigset_t blocked;
194 
195 	current->flags |= PF_NOFREEZE;
196 
197 	set_user_nice(current, -5);
198 
199 	/* Block and flush all signals */
200 	sigfillset(&blocked);
201 	sigprocmask(SIG_BLOCK, &blocked, NULL);
202 	flush_signals(current);
203 
204 	/* SIG_IGN makes children autoreap: see do_notify_parent(). */
205 	sa.sa.sa_handler = SIG_IGN;
206 	sa.sa.sa_flags = 0;
207 	siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
208 	do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
209 
210 	set_current_state(TASK_INTERRUPTIBLE);
211 	while (!kthread_should_stop()) {
212 		add_wait_queue(&cwq->more_work, &wait);
213 		if (list_empty(&cwq->worklist))
214 			schedule();
215 		else
216 			__set_current_state(TASK_RUNNING);
217 		remove_wait_queue(&cwq->more_work, &wait);
218 
219 		if (!list_empty(&cwq->worklist))
220 			run_workqueue(cwq);
221 		set_current_state(TASK_INTERRUPTIBLE);
222 	}
223 	__set_current_state(TASK_RUNNING);
224 	return 0;
225 }
226 
227 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
228 {
229 	if (cwq->thread == current) {
230 		/*
231 		 * Probably keventd trying to flush its own queue. So simply run
232 		 * it by hand rather than deadlocking.
233 		 */
234 		run_workqueue(cwq);
235 	} else {
236 		DEFINE_WAIT(wait);
237 		long sequence_needed;
238 
239 		spin_lock_irq(&cwq->lock);
240 		sequence_needed = cwq->insert_sequence;
241 
242 		while (sequence_needed - cwq->remove_sequence > 0) {
243 			prepare_to_wait(&cwq->work_done, &wait,
244 					TASK_UNINTERRUPTIBLE);
245 			spin_unlock_irq(&cwq->lock);
246 			schedule();
247 			spin_lock_irq(&cwq->lock);
248 		}
249 		finish_wait(&cwq->work_done, &wait);
250 		spin_unlock_irq(&cwq->lock);
251 	}
252 }
253 
254 /*
255  * flush_workqueue - ensure that any scheduled work has run to completion.
256  *
257  * Forces execution of the workqueue and blocks until its completion.
258  * This is typically used in driver shutdown handlers.
259  *
260  * This function will sample each workqueue's current insert_sequence number and
261  * will sleep until the head sequence is greater than or equal to that.  This
262  * means that we sleep until all works which were queued on entry have been
263  * handled, but we are not livelocked by new incoming ones.
264  *
265  * This function used to run the workqueues itself.  Now we just wait for the
266  * helper threads to do it.
267  */
268 void fastcall flush_workqueue(struct workqueue_struct *wq)
269 {
270 	might_sleep();
271 
272 	if (is_single_threaded(wq)) {
273 		/* Always use first cpu's area. */
274 		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
275 	} else {
276 		int cpu;
277 
278 		lock_cpu_hotplug();
279 		for_each_online_cpu(cpu)
280 			flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
281 		unlock_cpu_hotplug();
282 	}
283 }
284 
285 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
286 						   int cpu)
287 {
288 	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
289 	struct task_struct *p;
290 
291 	spin_lock_init(&cwq->lock);
292 	cwq->wq = wq;
293 	cwq->thread = NULL;
294 	cwq->insert_sequence = 0;
295 	cwq->remove_sequence = 0;
296 	INIT_LIST_HEAD(&cwq->worklist);
297 	init_waitqueue_head(&cwq->more_work);
298 	init_waitqueue_head(&cwq->work_done);
299 
300 	if (is_single_threaded(wq))
301 		p = kthread_create(worker_thread, cwq, "%s", wq->name);
302 	else
303 		p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
304 	if (IS_ERR(p))
305 		return NULL;
306 	cwq->thread = p;
307 	return p;
308 }
309 
310 struct workqueue_struct *__create_workqueue(const char *name,
311 					    int singlethread)
312 {
313 	int cpu, destroy = 0;
314 	struct workqueue_struct *wq;
315 	struct task_struct *p;
316 
317 	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
318 	if (!wq)
319 		return NULL;
320 
321 	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
322 	if (!wq->cpu_wq) {
323 		kfree(wq);
324 		return NULL;
325 	}
326 
327 	wq->name = name;
328 	/* We don't need the distraction of CPUs appearing and vanishing. */
329 	lock_cpu_hotplug();
330 	if (singlethread) {
331 		INIT_LIST_HEAD(&wq->list);
332 		p = create_workqueue_thread(wq, singlethread_cpu);
333 		if (!p)
334 			destroy = 1;
335 		else
336 			wake_up_process(p);
337 	} else {
338 		spin_lock(&workqueue_lock);
339 		list_add(&wq->list, &workqueues);
340 		spin_unlock(&workqueue_lock);
341 		for_each_online_cpu(cpu) {
342 			p = create_workqueue_thread(wq, cpu);
343 			if (p) {
344 				kthread_bind(p, cpu);
345 				wake_up_process(p);
346 			} else
347 				destroy = 1;
348 		}
349 	}
350 	unlock_cpu_hotplug();
351 
352 	/*
353 	 * Was there any error during startup? If yes then clean up:
354 	 */
355 	if (destroy) {
356 		destroy_workqueue(wq);
357 		wq = NULL;
358 	}
359 	return wq;
360 }
361 
362 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
363 {
364 	struct cpu_workqueue_struct *cwq;
365 	unsigned long flags;
366 	struct task_struct *p;
367 
368 	cwq = per_cpu_ptr(wq->cpu_wq, cpu);
369 	spin_lock_irqsave(&cwq->lock, flags);
370 	p = cwq->thread;
371 	cwq->thread = NULL;
372 	spin_unlock_irqrestore(&cwq->lock, flags);
373 	if (p)
374 		kthread_stop(p);
375 }
376 
377 void destroy_workqueue(struct workqueue_struct *wq)
378 {
379 	int cpu;
380 
381 	flush_workqueue(wq);
382 
383 	/* We don't need the distraction of CPUs appearing and vanishing. */
384 	lock_cpu_hotplug();
385 	if (is_single_threaded(wq))
386 		cleanup_workqueue_thread(wq, singlethread_cpu);
387 	else {
388 		for_each_online_cpu(cpu)
389 			cleanup_workqueue_thread(wq, cpu);
390 		spin_lock(&workqueue_lock);
391 		list_del(&wq->list);
392 		spin_unlock(&workqueue_lock);
393 	}
394 	unlock_cpu_hotplug();
395 	free_percpu(wq->cpu_wq);
396 	kfree(wq);
397 }
398 
399 static struct workqueue_struct *keventd_wq;
400 
401 int fastcall schedule_work(struct work_struct *work)
402 {
403 	return queue_work(keventd_wq, work);
404 }
405 
406 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
407 {
408 	return queue_delayed_work(keventd_wq, work, delay);
409 }
410 
411 int schedule_delayed_work_on(int cpu,
412 			struct work_struct *work, unsigned long delay)
413 {
414 	int ret = 0;
415 	struct timer_list *timer = &work->timer;
416 
417 	if (!test_and_set_bit(0, &work->pending)) {
418 		BUG_ON(timer_pending(timer));
419 		BUG_ON(!list_empty(&work->entry));
420 		/* This stores keventd_wq for the moment, for the timer_fn */
421 		work->wq_data = keventd_wq;
422 		timer->expires = jiffies + delay;
423 		timer->data = (unsigned long)work;
424 		timer->function = delayed_work_timer_fn;
425 		add_timer_on(timer, cpu);
426 		ret = 1;
427 	}
428 	return ret;
429 }
430 
431 int schedule_on_each_cpu(void (*func) (void *info), void *info)
432 {
433 	int cpu;
434 	struct work_struct *work;
435 
436 	work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL);
437 
438 	if (!work)
439 		return -ENOMEM;
440 	for_each_online_cpu(cpu) {
441 		INIT_WORK(work + cpu, func, info);
442 		__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
443 				work + cpu);
444 	}
445 	flush_workqueue(keventd_wq);
446 	kfree(work);
447 	return 0;
448 }
449 
450 void flush_scheduled_work(void)
451 {
452 	flush_workqueue(keventd_wq);
453 }
454 
455 /**
456  * cancel_rearming_delayed_workqueue - reliably kill off a delayed
457  *			work whose handler rearms the delayed work.
458  * @wq:   the controlling workqueue structure
459  * @work: the delayed work struct
460  */
461 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
462 				       struct work_struct *work)
463 {
464 	while (!cancel_delayed_work(work))
465 		flush_workqueue(wq);
466 }
467 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
468 
469 /**
470  * cancel_rearming_delayed_work - reliably kill off a delayed keventd
471  *			work whose handler rearms the delayed work.
472  * @work: the delayed work struct
473  */
474 void cancel_rearming_delayed_work(struct work_struct *work)
475 {
476 	cancel_rearming_delayed_workqueue(keventd_wq, work);
477 }
478 EXPORT_SYMBOL(cancel_rearming_delayed_work);
479 
480 /**
481  * execute_in_process_context - reliably execute the routine with user context
482  * @fn:		the function to execute
483  * @data:	data to pass to the function
484  * @ew:		guaranteed storage for the execute work structure (must
485  *		be available when the work executes)
486  *
487  * Executes the function immediately if process context is available,
488  * otherwise schedules the function for delayed execution.
489  *
490  * Returns:	0 - function was executed
491  *		1 - function was scheduled for execution
492  */
493 int execute_in_process_context(void (*fn)(void *data), void *data,
494 			       struct execute_work *ew)
495 {
496 	if (!in_interrupt()) {
497 		fn(data);
498 		return 0;
499 	}
500 
501 	INIT_WORK(&ew->work, fn, data);
502 	schedule_work(&ew->work);
503 
504 	return 1;
505 }
506 EXPORT_SYMBOL_GPL(execute_in_process_context);
507 
508 int keventd_up(void)
509 {
510 	return keventd_wq != NULL;
511 }
512 
513 int current_is_keventd(void)
514 {
515 	struct cpu_workqueue_struct *cwq;
516 	int cpu = smp_processor_id();	/* preempt-safe: keventd is per-cpu */
517 	int ret = 0;
518 
519 	BUG_ON(!keventd_wq);
520 
521 	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
522 	if (current == cwq->thread)
523 		ret = 1;
524 
525 	return ret;
526 
527 }
528 
529 #ifdef CONFIG_HOTPLUG_CPU
530 /* Take the work from this (downed) CPU. */
531 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
532 {
533 	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
534 	LIST_HEAD(list);
535 	struct work_struct *work;
536 
537 	spin_lock_irq(&cwq->lock);
538 	list_splice_init(&cwq->worklist, &list);
539 
540 	while (!list_empty(&list)) {
541 		printk("Taking work for %s\n", wq->name);
542 		work = list_entry(list.next,struct work_struct,entry);
543 		list_del(&work->entry);
544 		__queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
545 	}
546 	spin_unlock_irq(&cwq->lock);
547 }
548 
549 /* We're holding the cpucontrol mutex here */
550 static int workqueue_cpu_callback(struct notifier_block *nfb,
551 				  unsigned long action,
552 				  void *hcpu)
553 {
554 	unsigned int hotcpu = (unsigned long)hcpu;
555 	struct workqueue_struct *wq;
556 
557 	switch (action) {
558 	case CPU_UP_PREPARE:
559 		/* Create a new workqueue thread for it. */
560 		list_for_each_entry(wq, &workqueues, list) {
561 			if (!create_workqueue_thread(wq, hotcpu)) {
562 				printk("workqueue for %i failed\n", hotcpu);
563 				return NOTIFY_BAD;
564 			}
565 		}
566 		break;
567 
568 	case CPU_ONLINE:
569 		/* Kick off worker threads. */
570 		list_for_each_entry(wq, &workqueues, list) {
571 			struct cpu_workqueue_struct *cwq;
572 
573 			cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
574 			kthread_bind(cwq->thread, hotcpu);
575 			wake_up_process(cwq->thread);
576 		}
577 		break;
578 
579 	case CPU_UP_CANCELED:
580 		list_for_each_entry(wq, &workqueues, list) {
581 			/* Unbind so it can run. */
582 			kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
583 				     any_online_cpu(cpu_online_map));
584 			cleanup_workqueue_thread(wq, hotcpu);
585 		}
586 		break;
587 
588 	case CPU_DEAD:
589 		list_for_each_entry(wq, &workqueues, list)
590 			cleanup_workqueue_thread(wq, hotcpu);
591 		list_for_each_entry(wq, &workqueues, list)
592 			take_over_work(wq, hotcpu);
593 		break;
594 	}
595 
596 	return NOTIFY_OK;
597 }
598 #endif
599 
600 void init_workqueues(void)
601 {
602 	singlethread_cpu = first_cpu(cpu_possible_map);
603 	hotcpu_notifier(workqueue_cpu_callback, 0);
604 	keventd_wq = create_workqueue("events");
605 	BUG_ON(!keventd_wq);
606 }
607 
608 EXPORT_SYMBOL_GPL(__create_workqueue);
609 EXPORT_SYMBOL_GPL(queue_work);
610 EXPORT_SYMBOL_GPL(queue_delayed_work);
611 EXPORT_SYMBOL_GPL(flush_workqueue);
612 EXPORT_SYMBOL_GPL(destroy_workqueue);
613 
614 EXPORT_SYMBOL(schedule_work);
615 EXPORT_SYMBOL(schedule_delayed_work);
616 EXPORT_SYMBOL(schedule_delayed_work_on);
617 EXPORT_SYMBOL(flush_scheduled_work);
618