xref: /linux/net/sunrpc/sched.c (revision b889fcf63cb62e7fdb7816565e28f44dbe4a76a5)
1 /*
2  * linux/net/sunrpc/sched.c
3  *
4  * Scheduling for synchronous and asynchronous RPC requests.
5  *
6  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7  *
8  * TCP NFS related read + write fixes
9  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  */
11 
12 #include <linux/module.h>
13 
14 #include <linux/sched.h>
15 #include <linux/interrupt.h>
16 #include <linux/slab.h>
17 #include <linux/mempool.h>
18 #include <linux/smp.h>
19 #include <linux/spinlock.h>
20 #include <linux/mutex.h>
21 #include <linux/freezer.h>
22 
23 #include <linux/sunrpc/clnt.h>
24 
25 #include "sunrpc.h"
26 
27 #ifdef RPC_DEBUG
28 #define RPCDBG_FACILITY		RPCDBG_SCHED
29 #endif
30 
31 #define CREATE_TRACE_POINTS
32 #include <trace/events/sunrpc.h>
33 
34 /*
35  * RPC slabs and memory pools
36  */
37 #define RPC_BUFFER_MAXSIZE	(2048)
38 #define RPC_BUFFER_POOLSIZE	(8)
39 #define RPC_TASK_POOLSIZE	(8)
40 static struct kmem_cache	*rpc_task_slabp __read_mostly;
41 static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
42 static mempool_t	*rpc_task_mempool __read_mostly;
43 static mempool_t	*rpc_buffer_mempool __read_mostly;
44 
45 static void			rpc_async_schedule(struct work_struct *);
46 static void			 rpc_release_task(struct rpc_task *task);
47 static void __rpc_queue_timer_fn(unsigned long ptr);
48 
49 /*
50  * RPC tasks sit here while waiting for conditions to improve.
51  */
52 static struct rpc_wait_queue delay_queue;
53 
54 /*
55  * rpciod-related stuff
56  */
57 struct workqueue_struct *rpciod_workqueue;
58 
59 /*
60  * Disable the timer for a given RPC task. Should be called with
61  * queue->lock and bh_disabled in order to avoid races within
62  * rpc_run_timer().
63  */
64 static void
65 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
66 {
67 	if (task->tk_timeout == 0)
68 		return;
69 	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
70 	task->tk_timeout = 0;
71 	list_del(&task->u.tk_wait.timer_list);
72 	if (list_empty(&queue->timer_list.list))
73 		del_timer(&queue->timer_list.timer);
74 }
75 
76 static void
77 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
78 {
79 	queue->timer_list.expires = expires;
80 	mod_timer(&queue->timer_list.timer, expires);
81 }
82 
83 /*
84  * Set up a timer for the current task.
85  */
86 static void
87 __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
88 {
89 	if (!task->tk_timeout)
90 		return;
91 
92 	dprintk("RPC: %5u setting alarm for %lu ms\n",
93 			task->tk_pid, task->tk_timeout * 1000 / HZ);
94 
95 	task->u.tk_wait.expires = jiffies + task->tk_timeout;
96 	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
97 		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
98 	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
99 }
100 
101 static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
102 {
103 	queue->priority = priority;
104 }
105 
106 static void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
107 {
108 	queue->owner = pid;
109 	queue->nr = RPC_BATCH_COUNT;
110 }
111 
112 static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
113 {
114 	rpc_set_waitqueue_priority(queue, queue->maxpriority);
115 	rpc_set_waitqueue_owner(queue, 0);
116 }
117 
118 /*
119  * Add new request to a priority queue.
120  */
121 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
122 		struct rpc_task *task,
123 		unsigned char queue_priority)
124 {
125 	struct list_head *q;
126 	struct rpc_task *t;
127 
128 	INIT_LIST_HEAD(&task->u.tk_wait.links);
129 	if (unlikely(queue_priority > queue->maxpriority))
130 		queue_priority = queue->maxpriority;
131 	if (queue_priority > queue->priority)
132 		rpc_set_waitqueue_priority(queue, queue_priority);
133 	q = &queue->tasks[queue_priority];
134 	list_for_each_entry(t, q, u.tk_wait.list) {
135 		if (t->tk_owner == task->tk_owner) {
136 			list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
137 			return;
138 		}
139 	}
140 	list_add_tail(&task->u.tk_wait.list, q);
141 }
142 
143 /*
144  * Add new request to wait queue.
145  *
146  * Swapper tasks always get inserted at the head of the queue.
147  * This should avoid many nasty memory deadlocks and hopefully
148  * improve overall performance.
149  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
150  */
151 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
152 		struct rpc_task *task,
153 		unsigned char queue_priority)
154 {
155 	WARN_ON_ONCE(RPC_IS_QUEUED(task));
156 	if (RPC_IS_QUEUED(task))
157 		return;
158 
159 	if (RPC_IS_PRIORITY(queue))
160 		__rpc_add_wait_queue_priority(queue, task, queue_priority);
161 	else if (RPC_IS_SWAPPER(task))
162 		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
163 	else
164 		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
165 	task->tk_waitqueue = queue;
166 	queue->qlen++;
167 	rpc_set_queued(task);
168 
169 	dprintk("RPC: %5u added to queue %p \"%s\"\n",
170 			task->tk_pid, queue, rpc_qname(queue));
171 }
172 
173 /*
174  * Remove request from a priority queue.
175  */
176 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
177 {
178 	struct rpc_task *t;
179 
180 	if (!list_empty(&task->u.tk_wait.links)) {
181 		t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
182 		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
183 		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
184 	}
185 }
186 
187 /*
188  * Remove request from queue.
189  * Note: must be called with spin lock held.
190  */
191 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
192 {
193 	__rpc_disable_timer(queue, task);
194 	if (RPC_IS_PRIORITY(queue))
195 		__rpc_remove_wait_queue_priority(task);
196 	list_del(&task->u.tk_wait.list);
197 	queue->qlen--;
198 	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
199 			task->tk_pid, queue, rpc_qname(queue));
200 }
201 
202 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
203 {
204 	int i;
205 
206 	spin_lock_init(&queue->lock);
207 	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
208 		INIT_LIST_HEAD(&queue->tasks[i]);
209 	queue->maxpriority = nr_queues - 1;
210 	rpc_reset_waitqueue_priority(queue);
211 	queue->qlen = 0;
212 	setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
213 	INIT_LIST_HEAD(&queue->timer_list.list);
214 	rpc_assign_waitqueue_name(queue, qname);
215 }
216 
217 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
218 {
219 	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
220 }
221 EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
222 
223 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
224 {
225 	__rpc_init_priority_wait_queue(queue, qname, 1);
226 }
227 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
228 
229 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
230 {
231 	del_timer_sync(&queue->timer_list.timer);
232 }
233 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
234 
235 static int rpc_wait_bit_killable(void *word)
236 {
237 	if (fatal_signal_pending(current))
238 		return -ERESTARTSYS;
239 	freezable_schedule();
240 	return 0;
241 }
242 
243 #ifdef RPC_DEBUG
244 static void rpc_task_set_debuginfo(struct rpc_task *task)
245 {
246 	static atomic_t rpc_pid;
247 
248 	task->tk_pid = atomic_inc_return(&rpc_pid);
249 }
250 #else
251 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
252 {
253 }
254 #endif
255 
256 static void rpc_set_active(struct rpc_task *task)
257 {
258 	trace_rpc_task_begin(task->tk_client, task, NULL);
259 
260 	rpc_task_set_debuginfo(task);
261 	set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
262 }
263 
264 /*
265  * Mark an RPC call as having completed by clearing the 'active' bit
266  * and then waking up all tasks that were sleeping.
267  */
268 static int rpc_complete_task(struct rpc_task *task)
269 {
270 	void *m = &task->tk_runstate;
271 	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
272 	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
273 	unsigned long flags;
274 	int ret;
275 
276 	trace_rpc_task_complete(task->tk_client, task, NULL);
277 
278 	spin_lock_irqsave(&wq->lock, flags);
279 	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
280 	ret = atomic_dec_and_test(&task->tk_count);
281 	if (waitqueue_active(wq))
282 		__wake_up_locked_key(wq, TASK_NORMAL, &k);
283 	spin_unlock_irqrestore(&wq->lock, flags);
284 	return ret;
285 }
286 
287 /*
288  * Allow callers to wait for completion of an RPC call
289  *
290  * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
291  * to enforce taking of the wq->lock and hence avoid races with
292  * rpc_complete_task().
293  */
294 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
295 {
296 	if (action == NULL)
297 		action = rpc_wait_bit_killable;
298 	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
299 			action, TASK_KILLABLE);
300 }
301 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
302 
303 /*
304  * Make an RPC task runnable.
305  *
306  * Note: If the task is ASYNC, and is being made runnable after sitting on an
307  * rpc_wait_queue, this must be called with the queue spinlock held to protect
308  * the wait queue operation.
309  */
310 static void rpc_make_runnable(struct rpc_task *task)
311 {
312 	rpc_clear_queued(task);
313 	if (rpc_test_and_set_running(task))
314 		return;
315 	if (RPC_IS_ASYNC(task)) {
316 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
317 		queue_work(rpciod_workqueue, &task->u.tk_work);
318 	} else
319 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
320 }
321 
322 /*
323  * Prepare for sleeping on a wait queue.
324  * By always appending tasks to the list we ensure FIFO behavior.
325  * NB: An RPC task will only receive interrupt-driven events as long
326  * as it's on a wait queue.
327  */
328 static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
329 		struct rpc_task *task,
330 		rpc_action action,
331 		unsigned char queue_priority)
332 {
333 	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
334 			task->tk_pid, rpc_qname(q), jiffies);
335 
336 	trace_rpc_task_sleep(task->tk_client, task, q);
337 
338 	__rpc_add_wait_queue(q, task, queue_priority);
339 
340 	WARN_ON_ONCE(task->tk_callback != NULL);
341 	task->tk_callback = action;
342 	__rpc_add_timer(q, task);
343 }
344 
345 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
346 				rpc_action action)
347 {
348 	/* We shouldn't ever put an inactive task to sleep */
349 	WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
350 	if (!RPC_IS_ACTIVATED(task)) {
351 		task->tk_status = -EIO;
352 		rpc_put_task_async(task);
353 		return;
354 	}
355 
356 	/*
357 	 * Protect the queue operations.
358 	 */
359 	spin_lock_bh(&q->lock);
360 	__rpc_sleep_on_priority(q, task, action, task->tk_priority);
361 	spin_unlock_bh(&q->lock);
362 }
363 EXPORT_SYMBOL_GPL(rpc_sleep_on);
364 
365 void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
366 		rpc_action action, int priority)
367 {
368 	/* We shouldn't ever put an inactive task to sleep */
369 	WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
370 	if (!RPC_IS_ACTIVATED(task)) {
371 		task->tk_status = -EIO;
372 		rpc_put_task_async(task);
373 		return;
374 	}
375 
376 	/*
377 	 * Protect the queue operations.
378 	 */
379 	spin_lock_bh(&q->lock);
380 	__rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
381 	spin_unlock_bh(&q->lock);
382 }
383 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
384 
385 /**
386  * __rpc_do_wake_up_task - wake up a single rpc_task
387  * @queue: wait queue
388  * @task: task to be woken up
389  *
390  * Caller must hold queue->lock, and have cleared the task queued flag.
391  */
392 static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
393 {
394 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
395 			task->tk_pid, jiffies);
396 
397 	/* Has the task been executed yet? If not, we cannot wake it up! */
398 	if (!RPC_IS_ACTIVATED(task)) {
399 		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
400 		return;
401 	}
402 
403 	trace_rpc_task_wakeup(task->tk_client, task, queue);
404 
405 	__rpc_remove_wait_queue(queue, task);
406 
407 	rpc_make_runnable(task);
408 
409 	dprintk("RPC:       __rpc_wake_up_task done\n");
410 }
411 
412 /*
413  * Wake up a queued task while the queue lock is being held
414  */
415 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
416 {
417 	if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
418 		__rpc_do_wake_up_task(queue, task);
419 }
420 
421 /*
422  * Tests whether rpc queue is empty
423  */
424 int rpc_queue_empty(struct rpc_wait_queue *queue)
425 {
426 	int res;
427 
428 	spin_lock_bh(&queue->lock);
429 	res = queue->qlen;
430 	spin_unlock_bh(&queue->lock);
431 	return res == 0;
432 }
433 EXPORT_SYMBOL_GPL(rpc_queue_empty);
434 
435 /*
436  * Wake up a task on a specific queue
437  */
438 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
439 {
440 	spin_lock_bh(&queue->lock);
441 	rpc_wake_up_task_queue_locked(queue, task);
442 	spin_unlock_bh(&queue->lock);
443 }
444 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
445 
446 /*
447  * Wake up the next task on a priority queue.
448  */
449 static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
450 {
451 	struct list_head *q;
452 	struct rpc_task *task;
453 
454 	/*
455 	 * Service a batch of tasks from a single owner.
456 	 */
457 	q = &queue->tasks[queue->priority];
458 	if (!list_empty(q)) {
459 		task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
460 		if (queue->owner == task->tk_owner) {
461 			if (--queue->nr)
462 				goto out;
463 			list_move_tail(&task->u.tk_wait.list, q);
464 		}
465 		/*
466 		 * Check if we need to switch queues.
467 		 */
468 		goto new_owner;
469 	}
470 
471 	/*
472 	 * Service the next queue.
473 	 */
474 	do {
475 		if (q == &queue->tasks[0])
476 			q = &queue->tasks[queue->maxpriority];
477 		else
478 			q = q - 1;
479 		if (!list_empty(q)) {
480 			task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
481 			goto new_queue;
482 		}
483 	} while (q != &queue->tasks[queue->priority]);
484 
485 	rpc_reset_waitqueue_priority(queue);
486 	return NULL;
487 
488 new_queue:
489 	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
490 new_owner:
491 	rpc_set_waitqueue_owner(queue, task->tk_owner);
492 out:
493 	return task;
494 }
495 
496 static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
497 {
498 	if (RPC_IS_PRIORITY(queue))
499 		return __rpc_find_next_queued_priority(queue);
500 	if (!list_empty(&queue->tasks[0]))
501 		return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
502 	return NULL;
503 }
504 
505 /*
506  * Wake up the first task on the wait queue.
507  */
508 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
509 		bool (*func)(struct rpc_task *, void *), void *data)
510 {
511 	struct rpc_task	*task = NULL;
512 
513 	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
514 			queue, rpc_qname(queue));
515 	spin_lock_bh(&queue->lock);
516 	task = __rpc_find_next_queued(queue);
517 	if (task != NULL) {
518 		if (func(task, data))
519 			rpc_wake_up_task_queue_locked(queue, task);
520 		else
521 			task = NULL;
522 	}
523 	spin_unlock_bh(&queue->lock);
524 
525 	return task;
526 }
527 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
528 
529 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
530 {
531 	return true;
532 }
533 
534 /*
535  * Wake up the next task on the wait queue.
536 */
537 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
538 {
539 	return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
540 }
541 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
542 
543 /**
544  * rpc_wake_up - wake up all rpc_tasks
545  * @queue: rpc_wait_queue on which the tasks are sleeping
546  *
547  * Grabs queue->lock
548  */
549 void rpc_wake_up(struct rpc_wait_queue *queue)
550 {
551 	struct list_head *head;
552 
553 	spin_lock_bh(&queue->lock);
554 	head = &queue->tasks[queue->maxpriority];
555 	for (;;) {
556 		while (!list_empty(head)) {
557 			struct rpc_task *task;
558 			task = list_first_entry(head,
559 					struct rpc_task,
560 					u.tk_wait.list);
561 			rpc_wake_up_task_queue_locked(queue, task);
562 		}
563 		if (head == &queue->tasks[0])
564 			break;
565 		head--;
566 	}
567 	spin_unlock_bh(&queue->lock);
568 }
569 EXPORT_SYMBOL_GPL(rpc_wake_up);
570 
571 /**
572  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
573  * @queue: rpc_wait_queue on which the tasks are sleeping
574  * @status: status value to set
575  *
576  * Grabs queue->lock
577  */
578 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
579 {
580 	struct list_head *head;
581 
582 	spin_lock_bh(&queue->lock);
583 	head = &queue->tasks[queue->maxpriority];
584 	for (;;) {
585 		while (!list_empty(head)) {
586 			struct rpc_task *task;
587 			task = list_first_entry(head,
588 					struct rpc_task,
589 					u.tk_wait.list);
590 			task->tk_status = status;
591 			rpc_wake_up_task_queue_locked(queue, task);
592 		}
593 		if (head == &queue->tasks[0])
594 			break;
595 		head--;
596 	}
597 	spin_unlock_bh(&queue->lock);
598 }
599 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
600 
601 static void __rpc_queue_timer_fn(unsigned long ptr)
602 {
603 	struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
604 	struct rpc_task *task, *n;
605 	unsigned long expires, now, timeo;
606 
607 	spin_lock(&queue->lock);
608 	expires = now = jiffies;
609 	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
610 		timeo = task->u.tk_wait.expires;
611 		if (time_after_eq(now, timeo)) {
612 			dprintk("RPC: %5u timeout\n", task->tk_pid);
613 			task->tk_status = -ETIMEDOUT;
614 			rpc_wake_up_task_queue_locked(queue, task);
615 			continue;
616 		}
617 		if (expires == now || time_after(expires, timeo))
618 			expires = timeo;
619 	}
620 	if (!list_empty(&queue->timer_list.list))
621 		rpc_set_queue_timer(queue, expires);
622 	spin_unlock(&queue->lock);
623 }
624 
625 static void __rpc_atrun(struct rpc_task *task)
626 {
627 	task->tk_status = 0;
628 }
629 
630 /*
631  * Run a task at a later time
632  */
633 void rpc_delay(struct rpc_task *task, unsigned long delay)
634 {
635 	task->tk_timeout = delay;
636 	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
637 }
638 EXPORT_SYMBOL_GPL(rpc_delay);
639 
640 /*
641  * Helper to call task->tk_ops->rpc_call_prepare
642  */
643 void rpc_prepare_task(struct rpc_task *task)
644 {
645 	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
646 }
647 
648 static void
649 rpc_init_task_statistics(struct rpc_task *task)
650 {
651 	/* Initialize retry counters */
652 	task->tk_garb_retry = 2;
653 	task->tk_cred_retry = 2;
654 	task->tk_rebind_retry = 2;
655 
656 	/* starting timestamp */
657 	task->tk_start = ktime_get();
658 }
659 
660 static void
661 rpc_reset_task_statistics(struct rpc_task *task)
662 {
663 	task->tk_timeouts = 0;
664 	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT);
665 
666 	rpc_init_task_statistics(task);
667 }
668 
669 /*
670  * Helper that calls task->tk_ops->rpc_call_done if it exists
671  */
672 void rpc_exit_task(struct rpc_task *task)
673 {
674 	task->tk_action = NULL;
675 	if (task->tk_ops->rpc_call_done != NULL) {
676 		task->tk_ops->rpc_call_done(task, task->tk_calldata);
677 		if (task->tk_action != NULL) {
678 			WARN_ON(RPC_ASSASSINATED(task));
679 			/* Always release the RPC slot and buffer memory */
680 			xprt_release(task);
681 			rpc_reset_task_statistics(task);
682 		}
683 	}
684 }
685 
686 void rpc_exit(struct rpc_task *task, int status)
687 {
688 	task->tk_status = status;
689 	task->tk_action = rpc_exit_task;
690 	if (RPC_IS_QUEUED(task))
691 		rpc_wake_up_queued_task(task->tk_waitqueue, task);
692 }
693 EXPORT_SYMBOL_GPL(rpc_exit);
694 
695 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
696 {
697 	if (ops->rpc_release != NULL)
698 		ops->rpc_release(calldata);
699 }
700 
701 /*
702  * This is the RPC `scheduler' (or rather, the finite state machine).
703  */
704 static void __rpc_execute(struct rpc_task *task)
705 {
706 	struct rpc_wait_queue *queue;
707 	int task_is_async = RPC_IS_ASYNC(task);
708 	int status = 0;
709 
710 	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
711 			task->tk_pid, task->tk_flags);
712 
713 	WARN_ON_ONCE(RPC_IS_QUEUED(task));
714 	if (RPC_IS_QUEUED(task))
715 		return;
716 
717 	for (;;) {
718 		void (*do_action)(struct rpc_task *);
719 
720 		/*
721 		 * Execute any pending callback first.
722 		 */
723 		do_action = task->tk_callback;
724 		task->tk_callback = NULL;
725 		if (do_action == NULL) {
726 			/*
727 			 * Perform the next FSM step.
728 			 * tk_action may be NULL if the task has been killed.
729 			 * In particular, note that rpc_killall_tasks may
730 			 * do this at any time, so beware when dereferencing.
731 			 */
732 			do_action = task->tk_action;
733 			if (do_action == NULL)
734 				break;
735 		}
736 		trace_rpc_task_run_action(task->tk_client, task, task->tk_action);
737 		do_action(task);
738 
739 		/*
740 		 * Lockless check for whether task is sleeping or not.
741 		 */
742 		if (!RPC_IS_QUEUED(task))
743 			continue;
744 		/*
745 		 * The queue->lock protects against races with
746 		 * rpc_make_runnable().
747 		 *
748 		 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
749 		 * rpc_task, rpc_make_runnable() can assign it to a
750 		 * different workqueue. We therefore cannot assume that the
751 		 * rpc_task pointer may still be dereferenced.
752 		 */
753 		queue = task->tk_waitqueue;
754 		spin_lock_bh(&queue->lock);
755 		if (!RPC_IS_QUEUED(task)) {
756 			spin_unlock_bh(&queue->lock);
757 			continue;
758 		}
759 		rpc_clear_running(task);
760 		spin_unlock_bh(&queue->lock);
761 		if (task_is_async)
762 			return;
763 
764 		/* sync task: sleep here */
765 		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
766 		status = out_of_line_wait_on_bit(&task->tk_runstate,
767 				RPC_TASK_QUEUED, rpc_wait_bit_killable,
768 				TASK_KILLABLE);
769 		if (status == -ERESTARTSYS) {
770 			/*
771 			 * When a sync task receives a signal, it exits with
772 			 * -ERESTARTSYS. In order to catch any callbacks that
773 			 * clean up after sleeping on some queue, we don't
774 			 * break the loop here, but go around once more.
775 			 */
776 			dprintk("RPC: %5u got signal\n", task->tk_pid);
777 			task->tk_flags |= RPC_TASK_KILLED;
778 			rpc_exit(task, -ERESTARTSYS);
779 		}
780 		rpc_set_running(task);
781 		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
782 	}
783 
784 	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
785 			task->tk_status);
786 	/* Release all resources associated with the task */
787 	rpc_release_task(task);
788 }
789 
790 /*
791  * User-visible entry point to the scheduler.
792  *
793  * This may be called recursively if e.g. an async NFS task updates
794  * the attributes and finds that dirty pages must be flushed.
795  * NOTE: Upon exit of this function the task is guaranteed to be
796  *	 released. In particular note that tk_release() will have
797  *	 been called, so your task memory may have been freed.
798  */
799 void rpc_execute(struct rpc_task *task)
800 {
801 	rpc_set_active(task);
802 	rpc_make_runnable(task);
803 	if (!RPC_IS_ASYNC(task))
804 		__rpc_execute(task);
805 }
806 
807 static void rpc_async_schedule(struct work_struct *work)
808 {
809 	current->flags |= PF_FSTRANS;
810 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
811 	current->flags &= ~PF_FSTRANS;
812 }
813 
814 /**
815  * rpc_malloc - allocate an RPC buffer
816  * @task: RPC task that will use this buffer
817  * @size: requested byte size
818  *
819  * To prevent rpciod from hanging, this allocator never sleeps,
820  * returning NULL if the request cannot be serviced immediately.
821  * The caller can arrange to sleep in a way that is safe for rpciod.
822  *
823  * Most requests are 'small' (under 2KiB) and can be serviced from a
824  * mempool, ensuring that NFS reads and writes can always proceed,
825  * and that there is good locality of reference for these buffers.
826  *
827  * In order to avoid memory starvation triggering more writebacks of
828  * NFS requests, we avoid using GFP_KERNEL.
829  */
830 void *rpc_malloc(struct rpc_task *task, size_t size)
831 {
832 	struct rpc_buffer *buf;
833 	gfp_t gfp = GFP_NOWAIT;
834 
835 	if (RPC_IS_SWAPPER(task))
836 		gfp |= __GFP_MEMALLOC;
837 
838 	size += sizeof(struct rpc_buffer);
839 	if (size <= RPC_BUFFER_MAXSIZE)
840 		buf = mempool_alloc(rpc_buffer_mempool, gfp);
841 	else
842 		buf = kmalloc(size, gfp);
843 
844 	if (!buf)
845 		return NULL;
846 
847 	buf->len = size;
848 	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
849 			task->tk_pid, size, buf);
850 	return &buf->data;
851 }
852 EXPORT_SYMBOL_GPL(rpc_malloc);
853 
854 /**
855  * rpc_free - free buffer allocated via rpc_malloc
856  * @buffer: buffer to free
857  *
858  */
859 void rpc_free(void *buffer)
860 {
861 	size_t size;
862 	struct rpc_buffer *buf;
863 
864 	if (!buffer)
865 		return;
866 
867 	buf = container_of(buffer, struct rpc_buffer, data);
868 	size = buf->len;
869 
870 	dprintk("RPC:       freeing buffer of size %zu at %p\n",
871 			size, buf);
872 
873 	if (size <= RPC_BUFFER_MAXSIZE)
874 		mempool_free(buf, rpc_buffer_mempool);
875 	else
876 		kfree(buf);
877 }
878 EXPORT_SYMBOL_GPL(rpc_free);
879 
880 /*
881  * Creation and deletion of RPC task structures
882  */
883 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
884 {
885 	memset(task, 0, sizeof(*task));
886 	atomic_set(&task->tk_count, 1);
887 	task->tk_flags  = task_setup_data->flags;
888 	task->tk_ops = task_setup_data->callback_ops;
889 	task->tk_calldata = task_setup_data->callback_data;
890 	INIT_LIST_HEAD(&task->tk_task);
891 
892 	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
893 	task->tk_owner = current->tgid;
894 
895 	/* Initialize workqueue for async tasks */
896 	task->tk_workqueue = task_setup_data->workqueue;
897 
898 	if (task->tk_ops->rpc_call_prepare != NULL)
899 		task->tk_action = rpc_prepare_task;
900 
901 	rpc_init_task_statistics(task);
902 
903 	dprintk("RPC:       new task initialized, procpid %u\n",
904 				task_pid_nr(current));
905 }
906 
907 static struct rpc_task *
908 rpc_alloc_task(void)
909 {
910 	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO);
911 }
912 
913 /*
914  * Create a new task for the specified client.
915  */
916 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
917 {
918 	struct rpc_task	*task = setup_data->task;
919 	unsigned short flags = 0;
920 
921 	if (task == NULL) {
922 		task = rpc_alloc_task();
923 		if (task == NULL) {
924 			rpc_release_calldata(setup_data->callback_ops,
925 					setup_data->callback_data);
926 			return ERR_PTR(-ENOMEM);
927 		}
928 		flags = RPC_TASK_DYNAMIC;
929 	}
930 
931 	rpc_init_task(task, setup_data);
932 	task->tk_flags |= flags;
933 	dprintk("RPC:       allocated task %p\n", task);
934 	return task;
935 }
936 
937 static void rpc_free_task(struct rpc_task *task)
938 {
939 	const struct rpc_call_ops *tk_ops = task->tk_ops;
940 	void *calldata = task->tk_calldata;
941 
942 	if (task->tk_flags & RPC_TASK_DYNAMIC) {
943 		dprintk("RPC: %5u freeing task\n", task->tk_pid);
944 		mempool_free(task, rpc_task_mempool);
945 	}
946 	rpc_release_calldata(tk_ops, calldata);
947 }
948 
949 static void rpc_async_release(struct work_struct *work)
950 {
951 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
952 }
953 
954 static void rpc_release_resources_task(struct rpc_task *task)
955 {
956 	if (task->tk_rqstp)
957 		xprt_release(task);
958 	if (task->tk_msg.rpc_cred) {
959 		put_rpccred(task->tk_msg.rpc_cred);
960 		task->tk_msg.rpc_cred = NULL;
961 	}
962 	rpc_task_release_client(task);
963 }
964 
965 static void rpc_final_put_task(struct rpc_task *task,
966 		struct workqueue_struct *q)
967 {
968 	if (q != NULL) {
969 		INIT_WORK(&task->u.tk_work, rpc_async_release);
970 		queue_work(q, &task->u.tk_work);
971 	} else
972 		rpc_free_task(task);
973 }
974 
975 static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
976 {
977 	if (atomic_dec_and_test(&task->tk_count)) {
978 		rpc_release_resources_task(task);
979 		rpc_final_put_task(task, q);
980 	}
981 }
982 
983 void rpc_put_task(struct rpc_task *task)
984 {
985 	rpc_do_put_task(task, NULL);
986 }
987 EXPORT_SYMBOL_GPL(rpc_put_task);
988 
989 void rpc_put_task_async(struct rpc_task *task)
990 {
991 	rpc_do_put_task(task, task->tk_workqueue);
992 }
993 EXPORT_SYMBOL_GPL(rpc_put_task_async);
994 
995 static void rpc_release_task(struct rpc_task *task)
996 {
997 	dprintk("RPC: %5u release task\n", task->tk_pid);
998 
999 	WARN_ON_ONCE(RPC_IS_QUEUED(task));
1000 
1001 	rpc_release_resources_task(task);
1002 
1003 	/*
1004 	 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
1005 	 * so it should be safe to use task->tk_count as a test for whether
1006 	 * or not any other processes still hold references to our rpc_task.
1007 	 */
1008 	if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
1009 		/* Wake up anyone who may be waiting for task completion */
1010 		if (!rpc_complete_task(task))
1011 			return;
1012 	} else {
1013 		if (!atomic_dec_and_test(&task->tk_count))
1014 			return;
1015 	}
1016 	rpc_final_put_task(task, task->tk_workqueue);
1017 }
1018 
1019 int rpciod_up(void)
1020 {
1021 	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
1022 }
1023 
1024 void rpciod_down(void)
1025 {
1026 	module_put(THIS_MODULE);
1027 }
1028 
1029 /*
1030  * Start up the rpciod workqueue.
1031  */
1032 static int rpciod_start(void)
1033 {
1034 	struct workqueue_struct *wq;
1035 
1036 	/*
1037 	 * Create the rpciod thread and wait for it to start.
1038 	 */
1039 	dprintk("RPC:       creating workqueue rpciod\n");
1040 	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1);
1041 	rpciod_workqueue = wq;
1042 	return rpciod_workqueue != NULL;
1043 }
1044 
1045 static void rpciod_stop(void)
1046 {
1047 	struct workqueue_struct *wq = NULL;
1048 
1049 	if (rpciod_workqueue == NULL)
1050 		return;
1051 	dprintk("RPC:       destroying workqueue rpciod\n");
1052 
1053 	wq = rpciod_workqueue;
1054 	rpciod_workqueue = NULL;
1055 	destroy_workqueue(wq);
1056 }
1057 
1058 void
1059 rpc_destroy_mempool(void)
1060 {
1061 	rpciod_stop();
1062 	if (rpc_buffer_mempool)
1063 		mempool_destroy(rpc_buffer_mempool);
1064 	if (rpc_task_mempool)
1065 		mempool_destroy(rpc_task_mempool);
1066 	if (rpc_task_slabp)
1067 		kmem_cache_destroy(rpc_task_slabp);
1068 	if (rpc_buffer_slabp)
1069 		kmem_cache_destroy(rpc_buffer_slabp);
1070 	rpc_destroy_wait_queue(&delay_queue);
1071 }
1072 
1073 int
1074 rpc_init_mempool(void)
1075 {
1076 	/*
1077 	 * The following is not strictly a mempool initialisation,
1078 	 * but there is no harm in doing it here
1079 	 */
1080 	rpc_init_wait_queue(&delay_queue, "delayq");
1081 	if (!rpciod_start())
1082 		goto err_nomem;
1083 
1084 	rpc_task_slabp = kmem_cache_create("rpc_tasks",
1085 					     sizeof(struct rpc_task),
1086 					     0, SLAB_HWCACHE_ALIGN,
1087 					     NULL);
1088 	if (!rpc_task_slabp)
1089 		goto err_nomem;
1090 	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1091 					     RPC_BUFFER_MAXSIZE,
1092 					     0, SLAB_HWCACHE_ALIGN,
1093 					     NULL);
1094 	if (!rpc_buffer_slabp)
1095 		goto err_nomem;
1096 	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1097 						    rpc_task_slabp);
1098 	if (!rpc_task_mempool)
1099 		goto err_nomem;
1100 	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1101 						      rpc_buffer_slabp);
1102 	if (!rpc_buffer_mempool)
1103 		goto err_nomem;
1104 	return 0;
1105 err_nomem:
1106 	rpc_destroy_mempool();
1107 	return -ENOMEM;
1108 }
1109