xref: /linux/fs/btrfs/async-thread.c (revision 843aef4930b9953c9ca624a990b201440304b56f)
1 /*
2  * Copyright (C) 2007 Oracle.  All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public
6  * License v2 as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11  * General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public
14  * License along with this program; if not, write to the
15  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16  * Boston, MA 021110-1307, USA.
17  */
18 
19 #include <linux/version.h>
20 #include <linux/kthread.h>
21 #include <linux/list.h>
22 #include <linux/spinlock.h>
23 # include <linux/freezer.h>
24 #include "async-thread.h"
25 
26 #define WORK_QUEUED_BIT 0
27 #define WORK_DONE_BIT 1
28 #define WORK_ORDER_DONE_BIT 2
29 
30 /*
31  * container for the kthread task pointer and the list of pending work
32  * One of these is allocated per thread.
33  */
34 struct btrfs_worker_thread {
35 	/* pool we belong to */
36 	struct btrfs_workers *workers;
37 
38 	/* list of struct btrfs_work that are waiting for service */
39 	struct list_head pending;
40 
41 	/* list of worker threads from struct btrfs_workers */
42 	struct list_head worker_list;
43 
44 	/* kthread */
45 	struct task_struct *task;
46 
47 	/* number of things on the pending list */
48 	atomic_t num_pending;
49 
50 	unsigned long sequence;
51 
52 	/* protects the pending list. */
53 	spinlock_t lock;
54 
55 	/* set to non-zero when this thread is already awake and kicking */
56 	int working;
57 
58 	/* are we currently idle */
59 	int idle;
60 };
61 
62 /*
63  * helper function to move a thread onto the idle list after it
64  * has finished some requests.
65  */
66 static void check_idle_worker(struct btrfs_worker_thread *worker)
67 {
68 	if (!worker->idle && atomic_read(&worker->num_pending) <
69 	    worker->workers->idle_thresh / 2) {
70 		unsigned long flags;
71 		spin_lock_irqsave(&worker->workers->lock, flags);
72 		worker->idle = 1;
73 		list_move(&worker->worker_list, &worker->workers->idle_list);
74 		spin_unlock_irqrestore(&worker->workers->lock, flags);
75 	}
76 }
77 
78 /*
79  * helper function to move a thread off the idle list after new
80  * pending work is added.
81  */
82 static void check_busy_worker(struct btrfs_worker_thread *worker)
83 {
84 	if (worker->idle && atomic_read(&worker->num_pending) >=
85 	    worker->workers->idle_thresh) {
86 		unsigned long flags;
87 		spin_lock_irqsave(&worker->workers->lock, flags);
88 		worker->idle = 0;
89 		list_move_tail(&worker->worker_list,
90 			       &worker->workers->worker_list);
91 		spin_unlock_irqrestore(&worker->workers->lock, flags);
92 	}
93 }
94 
95 static noinline int run_ordered_completions(struct btrfs_workers *workers,
96 					    struct btrfs_work *work)
97 {
98 	unsigned long flags;
99 
100 	if (!workers->ordered)
101 		return 0;
102 
103 	set_bit(WORK_DONE_BIT, &work->flags);
104 
105 	spin_lock_irqsave(&workers->lock, flags);
106 
107 	while (!list_empty(&workers->order_list)) {
108 		work = list_entry(workers->order_list.next,
109 				  struct btrfs_work, order_list);
110 
111 		if (!test_bit(WORK_DONE_BIT, &work->flags))
112 			break;
113 
114 		/* we are going to call the ordered done function, but
115 		 * we leave the work item on the list as a barrier so
116 		 * that later work items that are done don't have their
117 		 * functions called before this one returns
118 		 */
119 		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
120 			break;
121 
122 		spin_unlock_irqrestore(&workers->lock, flags);
123 
124 		work->ordered_func(work);
125 
126 		/* now take the lock again and call the freeing code */
127 		spin_lock_irqsave(&workers->lock, flags);
128 		list_del(&work->order_list);
129 		work->ordered_free(work);
130 	}
131 
132 	spin_unlock_irqrestore(&workers->lock, flags);
133 	return 0;
134 }
135 
136 /*
137  * main loop for servicing work items
138  */
139 static int worker_loop(void *arg)
140 {
141 	struct btrfs_worker_thread *worker = arg;
142 	struct list_head *cur;
143 	struct btrfs_work *work;
144 	do {
145 		spin_lock_irq(&worker->lock);
146 		while (!list_empty(&worker->pending)) {
147 			cur = worker->pending.next;
148 			work = list_entry(cur, struct btrfs_work, list);
149 			list_del(&work->list);
150 			clear_bit(WORK_QUEUED_BIT, &work->flags);
151 
152 			work->worker = worker;
153 			spin_unlock_irq(&worker->lock);
154 
155 			work->func(work);
156 
157 			atomic_dec(&worker->num_pending);
158 			/*
159 			 * unless this is an ordered work queue,
160 			 * 'work' was probably freed by func above.
161 			 */
162 			run_ordered_completions(worker->workers, work);
163 
164 			spin_lock_irq(&worker->lock);
165 			check_idle_worker(worker);
166 
167 		}
168 		worker->working = 0;
169 		if (freezing(current)) {
170 			refrigerator();
171 		} else {
172 			set_current_state(TASK_INTERRUPTIBLE);
173 			spin_unlock_irq(&worker->lock);
174 			if (!kthread_should_stop())
175 				schedule();
176 			__set_current_state(TASK_RUNNING);
177 		}
178 	} while (!kthread_should_stop());
179 	return 0;
180 }
181 
182 /*
183  * this will wait for all the worker threads to shutdown
184  */
185 int btrfs_stop_workers(struct btrfs_workers *workers)
186 {
187 	struct list_head *cur;
188 	struct btrfs_worker_thread *worker;
189 
190 	list_splice_init(&workers->idle_list, &workers->worker_list);
191 	while (!list_empty(&workers->worker_list)) {
192 		cur = workers->worker_list.next;
193 		worker = list_entry(cur, struct btrfs_worker_thread,
194 				    worker_list);
195 		kthread_stop(worker->task);
196 		list_del(&worker->worker_list);
197 		kfree(worker);
198 	}
199 	return 0;
200 }
201 
202 /*
203  * simple init on struct btrfs_workers
204  */
205 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
206 {
207 	workers->num_workers = 0;
208 	INIT_LIST_HEAD(&workers->worker_list);
209 	INIT_LIST_HEAD(&workers->idle_list);
210 	INIT_LIST_HEAD(&workers->order_list);
211 	spin_lock_init(&workers->lock);
212 	workers->max_workers = max;
213 	workers->idle_thresh = 32;
214 	workers->name = name;
215 	workers->ordered = 0;
216 }
217 
218 /*
219  * starts new worker threads.  This does not enforce the max worker
220  * count in case you need to temporarily go past it.
221  */
222 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
223 {
224 	struct btrfs_worker_thread *worker;
225 	int ret = 0;
226 	int i;
227 
228 	for (i = 0; i < num_workers; i++) {
229 		worker = kzalloc(sizeof(*worker), GFP_NOFS);
230 		if (!worker) {
231 			ret = -ENOMEM;
232 			goto fail;
233 		}
234 
235 		INIT_LIST_HEAD(&worker->pending);
236 		INIT_LIST_HEAD(&worker->worker_list);
237 		spin_lock_init(&worker->lock);
238 		atomic_set(&worker->num_pending, 0);
239 		worker->task = kthread_run(worker_loop, worker,
240 					   "btrfs-%s-%d", workers->name,
241 					   workers->num_workers + i);
242 		worker->workers = workers;
243 		if (IS_ERR(worker->task)) {
244 			kfree(worker);
245 			ret = PTR_ERR(worker->task);
246 			goto fail;
247 		}
248 
249 		spin_lock_irq(&workers->lock);
250 		list_add_tail(&worker->worker_list, &workers->idle_list);
251 		worker->idle = 1;
252 		workers->num_workers++;
253 		spin_unlock_irq(&workers->lock);
254 	}
255 	return 0;
256 fail:
257 	btrfs_stop_workers(workers);
258 	return ret;
259 }
260 
261 /*
262  * run through the list and find a worker thread that doesn't have a lot
263  * to do right now.  This can return null if we aren't yet at the thread
264  * count limit and all of the threads are busy.
265  */
266 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
267 {
268 	struct btrfs_worker_thread *worker;
269 	struct list_head *next;
270 	int enforce_min = workers->num_workers < workers->max_workers;
271 
272 	/*
273 	 * if we find an idle thread, don't move it to the end of the
274 	 * idle list.  This improves the chance that the next submission
275 	 * will reuse the same thread, and maybe catch it while it is still
276 	 * working
277 	 */
278 	if (!list_empty(&workers->idle_list)) {
279 		next = workers->idle_list.next;
280 		worker = list_entry(next, struct btrfs_worker_thread,
281 				    worker_list);
282 		return worker;
283 	}
284 	if (enforce_min || list_empty(&workers->worker_list))
285 		return NULL;
286 
287 	/*
288 	 * if we pick a busy task, move the task to the end of the list.
289 	 * hopefully this will keep things somewhat evenly balanced.
290 	 * Do the move in batches based on the sequence number.  This groups
291 	 * requests submitted at roughly the same time onto the same worker.
292 	 */
293 	next = workers->worker_list.next;
294 	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
295 	atomic_inc(&worker->num_pending);
296 	worker->sequence++;
297 
298 	if (worker->sequence % workers->idle_thresh == 0)
299 		list_move_tail(next, &workers->worker_list);
300 	return worker;
301 }
302 
303 /*
304  * selects a worker thread to take the next job.  This will either find
305  * an idle worker, start a new worker up to the max count, or just return
306  * one of the existing busy workers.
307  */
308 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
309 {
310 	struct btrfs_worker_thread *worker;
311 	unsigned long flags;
312 
313 again:
314 	spin_lock_irqsave(&workers->lock, flags);
315 	worker = next_worker(workers);
316 	spin_unlock_irqrestore(&workers->lock, flags);
317 
318 	if (!worker) {
319 		spin_lock_irqsave(&workers->lock, flags);
320 		if (workers->num_workers >= workers->max_workers) {
321 			struct list_head *fallback = NULL;
322 			/*
323 			 * we have failed to find any workers, just
324 			 * return the force one
325 			 */
326 			if (!list_empty(&workers->worker_list))
327 				fallback = workers->worker_list.next;
328 			if (!list_empty(&workers->idle_list))
329 				fallback = workers->idle_list.next;
330 			BUG_ON(!fallback);
331 			worker = list_entry(fallback,
332 				  struct btrfs_worker_thread, worker_list);
333 			spin_unlock_irqrestore(&workers->lock, flags);
334 		} else {
335 			spin_unlock_irqrestore(&workers->lock, flags);
336 			/* we're below the limit, start another worker */
337 			btrfs_start_workers(workers, 1);
338 			goto again;
339 		}
340 	}
341 	return worker;
342 }
343 
344 /*
345  * btrfs_requeue_work just puts the work item back on the tail of the list
346  * it was taken from.  It is intended for use with long running work functions
347  * that make some progress and want to give the cpu up for others.
348  */
349 int btrfs_requeue_work(struct btrfs_work *work)
350 {
351 	struct btrfs_worker_thread *worker = work->worker;
352 	unsigned long flags;
353 
354 	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
355 		goto out;
356 
357 	spin_lock_irqsave(&worker->lock, flags);
358 	atomic_inc(&worker->num_pending);
359 	list_add_tail(&work->list, &worker->pending);
360 
361 	/* by definition we're busy, take ourselves off the idle
362 	 * list
363 	 */
364 	if (worker->idle) {
365 		spin_lock_irqsave(&worker->workers->lock, flags);
366 		worker->idle = 0;
367 		list_move_tail(&worker->worker_list,
368 			       &worker->workers->worker_list);
369 		spin_unlock_irqrestore(&worker->workers->lock, flags);
370 	}
371 
372 	spin_unlock_irqrestore(&worker->lock, flags);
373 
374 out:
375 	return 0;
376 }
377 
378 /*
379  * places a struct btrfs_work into the pending queue of one of the kthreads
380  */
381 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
382 {
383 	struct btrfs_worker_thread *worker;
384 	unsigned long flags;
385 	int wake = 0;
386 
387 	/* don't requeue something already on a list */
388 	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
389 		goto out;
390 
391 	worker = find_worker(workers);
392 	if (workers->ordered) {
393 		spin_lock_irqsave(&workers->lock, flags);
394 		list_add_tail(&work->order_list, &workers->order_list);
395 		spin_unlock_irqrestore(&workers->lock, flags);
396 	} else {
397 		INIT_LIST_HEAD(&work->order_list);
398 	}
399 
400 	spin_lock_irqsave(&worker->lock, flags);
401 	atomic_inc(&worker->num_pending);
402 	check_busy_worker(worker);
403 	list_add_tail(&work->list, &worker->pending);
404 
405 	/*
406 	 * avoid calling into wake_up_process if this thread has already
407 	 * been kicked
408 	 */
409 	if (!worker->working)
410 		wake = 1;
411 	worker->working = 1;
412 
413 	spin_unlock_irqrestore(&worker->lock, flags);
414 
415 	if (wake)
416 		wake_up_process(worker->task);
417 out:
418 	return 0;
419 }
420