1 /* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12 #include <linux/module.h> 13 14 #include <linux/sched.h> 15 #include <linux/interrupt.h> 16 #include <linux/slab.h> 17 #include <linux/mempool.h> 18 #include <linux/smp.h> 19 #include <linux/smp_lock.h> 20 #include <linux/spinlock.h> 21 #include <linux/mutex.h> 22 23 #include <linux/sunrpc/clnt.h> 24 25 #ifdef RPC_DEBUG 26 #define RPCDBG_FACILITY RPCDBG_SCHED 27 #define RPC_TASK_MAGIC_ID 0xf00baa 28 static int rpc_task_id; 29 #endif 30 31 /* 32 * RPC slabs and memory pools 33 */ 34 #define RPC_BUFFER_MAXSIZE (2048) 35 #define RPC_BUFFER_POOLSIZE (8) 36 #define RPC_TASK_POOLSIZE (8) 37 static struct kmem_cache *rpc_task_slabp __read_mostly; 38 static struct kmem_cache *rpc_buffer_slabp __read_mostly; 39 static mempool_t *rpc_task_mempool __read_mostly; 40 static mempool_t *rpc_buffer_mempool __read_mostly; 41 42 static void __rpc_default_timer(struct rpc_task *task); 43 static void rpciod_killall(void); 44 static void rpc_async_schedule(struct work_struct *); 45 static void rpc_release_task(struct rpc_task *task); 46 47 /* 48 * RPC tasks sit here while waiting for conditions to improve. 49 */ 50 static RPC_WAITQ(delay_queue, "delayq"); 51 52 /* 53 * All RPC tasks are linked into this list 54 */ 55 static LIST_HEAD(all_tasks); 56 57 /* 58 * rpciod-related stuff 59 */ 60 static DEFINE_MUTEX(rpciod_mutex); 61 static unsigned int rpciod_users; 62 struct workqueue_struct *rpciod_workqueue; 63 64 /* 65 * Spinlock for other critical sections of code. 66 */ 67 static DEFINE_SPINLOCK(rpc_sched_lock); 68 69 /* 70 * Disable the timer for a given RPC task. Should be called with 71 * queue->lock and bh_disabled in order to avoid races within 72 * rpc_run_timer(). 73 */ 74 static inline void 75 __rpc_disable_timer(struct rpc_task *task) 76 { 77 dprintk("RPC: %5u disabling timer\n", task->tk_pid); 78 task->tk_timeout_fn = NULL; 79 task->tk_timeout = 0; 80 } 81 82 /* 83 * Run a timeout function. 84 * We use the callback in order to allow __rpc_wake_up_task() 85 * and friends to disable the timer synchronously on SMP systems 86 * without calling del_timer_sync(). The latter could cause a 87 * deadlock if called while we're holding spinlocks... 88 */ 89 static void rpc_run_timer(struct rpc_task *task) 90 { 91 void (*callback)(struct rpc_task *); 92 93 callback = task->tk_timeout_fn; 94 task->tk_timeout_fn = NULL; 95 if (callback && RPC_IS_QUEUED(task)) { 96 dprintk("RPC: %5u running timer\n", task->tk_pid); 97 callback(task); 98 } 99 smp_mb__before_clear_bit(); 100 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 101 smp_mb__after_clear_bit(); 102 } 103 104 /* 105 * Set up a timer for the current task. 106 */ 107 static inline void 108 __rpc_add_timer(struct rpc_task *task, rpc_action timer) 109 { 110 if (!task->tk_timeout) 111 return; 112 113 dprintk("RPC: %5u setting alarm for %lu ms\n", 114 task->tk_pid, task->tk_timeout * 1000 / HZ); 115 116 if (timer) 117 task->tk_timeout_fn = timer; 118 else 119 task->tk_timeout_fn = __rpc_default_timer; 120 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 121 mod_timer(&task->tk_timer, jiffies + task->tk_timeout); 122 } 123 124 /* 125 * Delete any timer for the current task. Because we use del_timer_sync(), 126 * this function should never be called while holding queue->lock. 127 */ 128 static void 129 rpc_delete_timer(struct rpc_task *task) 130 { 131 if (RPC_IS_QUEUED(task)) 132 return; 133 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) { 134 del_singleshot_timer_sync(&task->tk_timer); 135 dprintk("RPC: %5u deleting timer\n", task->tk_pid); 136 } 137 } 138 139 /* 140 * Add new request to a priority queue. 141 */ 142 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task) 143 { 144 struct list_head *q; 145 struct rpc_task *t; 146 147 INIT_LIST_HEAD(&task->u.tk_wait.links); 148 q = &queue->tasks[task->tk_priority]; 149 if (unlikely(task->tk_priority > queue->maxpriority)) 150 q = &queue->tasks[queue->maxpriority]; 151 list_for_each_entry(t, q, u.tk_wait.list) { 152 if (t->tk_cookie == task->tk_cookie) { 153 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 154 return; 155 } 156 } 157 list_add_tail(&task->u.tk_wait.list, q); 158 } 159 160 /* 161 * Add new request to wait queue. 162 * 163 * Swapper tasks always get inserted at the head of the queue. 164 * This should avoid many nasty memory deadlocks and hopefully 165 * improve overall performance. 166 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 167 */ 168 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 169 { 170 BUG_ON (RPC_IS_QUEUED(task)); 171 172 if (RPC_IS_PRIORITY(queue)) 173 __rpc_add_wait_queue_priority(queue, task); 174 else if (RPC_IS_SWAPPER(task)) 175 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 176 else 177 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 178 task->u.tk_wait.rpc_waitq = queue; 179 queue->qlen++; 180 rpc_set_queued(task); 181 182 dprintk("RPC: %5u added to queue %p \"%s\"\n", 183 task->tk_pid, queue, rpc_qname(queue)); 184 } 185 186 /* 187 * Remove request from a priority queue. 188 */ 189 static void __rpc_remove_wait_queue_priority(struct rpc_task *task) 190 { 191 struct rpc_task *t; 192 193 if (!list_empty(&task->u.tk_wait.links)) { 194 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); 195 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 196 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 197 } 198 list_del(&task->u.tk_wait.list); 199 } 200 201 /* 202 * Remove request from queue. 203 * Note: must be called with spin lock held. 204 */ 205 static void __rpc_remove_wait_queue(struct rpc_task *task) 206 { 207 struct rpc_wait_queue *queue; 208 queue = task->u.tk_wait.rpc_waitq; 209 210 if (RPC_IS_PRIORITY(queue)) 211 __rpc_remove_wait_queue_priority(task); 212 else 213 list_del(&task->u.tk_wait.list); 214 queue->qlen--; 215 dprintk("RPC: %5u removed from queue %p \"%s\"\n", 216 task->tk_pid, queue, rpc_qname(queue)); 217 } 218 219 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) 220 { 221 queue->priority = priority; 222 queue->count = 1 << (priority * 2); 223 } 224 225 static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) 226 { 227 queue->cookie = cookie; 228 queue->nr = RPC_BATCH_COUNT; 229 } 230 231 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 232 { 233 rpc_set_waitqueue_priority(queue, queue->maxpriority); 234 rpc_set_waitqueue_cookie(queue, 0); 235 } 236 237 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) 238 { 239 int i; 240 241 spin_lock_init(&queue->lock); 242 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 243 INIT_LIST_HEAD(&queue->tasks[i]); 244 queue->maxpriority = maxprio; 245 rpc_reset_waitqueue_priority(queue); 246 #ifdef RPC_DEBUG 247 queue->name = qname; 248 #endif 249 } 250 251 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 252 { 253 __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); 254 } 255 256 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 257 { 258 __rpc_init_priority_wait_queue(queue, qname, 0); 259 } 260 EXPORT_SYMBOL(rpc_init_wait_queue); 261 262 static int rpc_wait_bit_interruptible(void *word) 263 { 264 if (signal_pending(current)) 265 return -ERESTARTSYS; 266 schedule(); 267 return 0; 268 } 269 270 static void rpc_set_active(struct rpc_task *task) 271 { 272 if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0) 273 return; 274 spin_lock(&rpc_sched_lock); 275 #ifdef RPC_DEBUG 276 task->tk_magic = RPC_TASK_MAGIC_ID; 277 task->tk_pid = rpc_task_id++; 278 #endif 279 /* Add to global list of all tasks */ 280 list_add_tail(&task->tk_task, &all_tasks); 281 spin_unlock(&rpc_sched_lock); 282 } 283 284 /* 285 * Mark an RPC call as having completed by clearing the 'active' bit 286 */ 287 static void rpc_mark_complete_task(struct rpc_task *task) 288 { 289 smp_mb__before_clear_bit(); 290 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 291 smp_mb__after_clear_bit(); 292 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); 293 } 294 295 /* 296 * Allow callers to wait for completion of an RPC call 297 */ 298 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 299 { 300 if (action == NULL) 301 action = rpc_wait_bit_interruptible; 302 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 303 action, TASK_INTERRUPTIBLE); 304 } 305 EXPORT_SYMBOL(__rpc_wait_for_completion_task); 306 307 /* 308 * Make an RPC task runnable. 309 * 310 * Note: If the task is ASYNC, this must be called with 311 * the spinlock held to protect the wait queue operation. 312 */ 313 static void rpc_make_runnable(struct rpc_task *task) 314 { 315 BUG_ON(task->tk_timeout_fn); 316 rpc_clear_queued(task); 317 if (rpc_test_and_set_running(task)) 318 return; 319 /* We might have raced */ 320 if (RPC_IS_QUEUED(task)) { 321 rpc_clear_running(task); 322 return; 323 } 324 if (RPC_IS_ASYNC(task)) { 325 int status; 326 327 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 328 status = queue_work(task->tk_workqueue, &task->u.tk_work); 329 if (status < 0) { 330 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 331 task->tk_status = status; 332 return; 333 } 334 } else 335 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 336 } 337 338 /* 339 * Prepare for sleeping on a wait queue. 340 * By always appending tasks to the list we ensure FIFO behavior. 341 * NB: An RPC task will only receive interrupt-driven events as long 342 * as it's on a wait queue. 343 */ 344 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 345 rpc_action action, rpc_action timer) 346 { 347 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", 348 task->tk_pid, rpc_qname(q), jiffies); 349 350 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { 351 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); 352 return; 353 } 354 355 __rpc_add_wait_queue(q, task); 356 357 BUG_ON(task->tk_callback != NULL); 358 task->tk_callback = action; 359 __rpc_add_timer(task, timer); 360 } 361 362 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 363 rpc_action action, rpc_action timer) 364 { 365 /* Mark the task as being activated if so needed */ 366 rpc_set_active(task); 367 368 /* 369 * Protect the queue operations. 370 */ 371 spin_lock_bh(&q->lock); 372 __rpc_sleep_on(q, task, action, timer); 373 spin_unlock_bh(&q->lock); 374 } 375 376 /** 377 * __rpc_do_wake_up_task - wake up a single rpc_task 378 * @task: task to be woken up 379 * 380 * Caller must hold queue->lock, and have cleared the task queued flag. 381 */ 382 static void __rpc_do_wake_up_task(struct rpc_task *task) 383 { 384 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", 385 task->tk_pid, jiffies); 386 387 #ifdef RPC_DEBUG 388 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 389 #endif 390 /* Has the task been executed yet? If not, we cannot wake it up! */ 391 if (!RPC_IS_ACTIVATED(task)) { 392 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 393 return; 394 } 395 396 __rpc_disable_timer(task); 397 __rpc_remove_wait_queue(task); 398 399 rpc_make_runnable(task); 400 401 dprintk("RPC: __rpc_wake_up_task done\n"); 402 } 403 404 /* 405 * Wake up the specified task 406 */ 407 static void __rpc_wake_up_task(struct rpc_task *task) 408 { 409 if (rpc_start_wakeup(task)) { 410 if (RPC_IS_QUEUED(task)) 411 __rpc_do_wake_up_task(task); 412 rpc_finish_wakeup(task); 413 } 414 } 415 416 /* 417 * Default timeout handler if none specified by user 418 */ 419 static void 420 __rpc_default_timer(struct rpc_task *task) 421 { 422 dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid); 423 task->tk_status = -ETIMEDOUT; 424 rpc_wake_up_task(task); 425 } 426 427 /* 428 * Wake up the specified task 429 */ 430 void rpc_wake_up_task(struct rpc_task *task) 431 { 432 rcu_read_lock_bh(); 433 if (rpc_start_wakeup(task)) { 434 if (RPC_IS_QUEUED(task)) { 435 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; 436 437 /* Note: we're already in a bh-safe context */ 438 spin_lock(&queue->lock); 439 __rpc_do_wake_up_task(task); 440 spin_unlock(&queue->lock); 441 } 442 rpc_finish_wakeup(task); 443 } 444 rcu_read_unlock_bh(); 445 } 446 447 /* 448 * Wake up the next task on a priority queue. 449 */ 450 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue) 451 { 452 struct list_head *q; 453 struct rpc_task *task; 454 455 /* 456 * Service a batch of tasks from a single cookie. 457 */ 458 q = &queue->tasks[queue->priority]; 459 if (!list_empty(q)) { 460 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 461 if (queue->cookie == task->tk_cookie) { 462 if (--queue->nr) 463 goto out; 464 list_move_tail(&task->u.tk_wait.list, q); 465 } 466 /* 467 * Check if we need to switch queues. 468 */ 469 if (--queue->count) 470 goto new_cookie; 471 } 472 473 /* 474 * Service the next queue. 475 */ 476 do { 477 if (q == &queue->tasks[0]) 478 q = &queue->tasks[queue->maxpriority]; 479 else 480 q = q - 1; 481 if (!list_empty(q)) { 482 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 483 goto new_queue; 484 } 485 } while (q != &queue->tasks[queue->priority]); 486 487 rpc_reset_waitqueue_priority(queue); 488 return NULL; 489 490 new_queue: 491 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 492 new_cookie: 493 rpc_set_waitqueue_cookie(queue, task->tk_cookie); 494 out: 495 __rpc_wake_up_task(task); 496 return task; 497 } 498 499 /* 500 * Wake up the next task on the wait queue. 501 */ 502 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) 503 { 504 struct rpc_task *task = NULL; 505 506 dprintk("RPC: wake_up_next(%p \"%s\")\n", 507 queue, rpc_qname(queue)); 508 rcu_read_lock_bh(); 509 spin_lock(&queue->lock); 510 if (RPC_IS_PRIORITY(queue)) 511 task = __rpc_wake_up_next_priority(queue); 512 else { 513 task_for_first(task, &queue->tasks[0]) 514 __rpc_wake_up_task(task); 515 } 516 spin_unlock(&queue->lock); 517 rcu_read_unlock_bh(); 518 519 return task; 520 } 521 522 /** 523 * rpc_wake_up - wake up all rpc_tasks 524 * @queue: rpc_wait_queue on which the tasks are sleeping 525 * 526 * Grabs queue->lock 527 */ 528 void rpc_wake_up(struct rpc_wait_queue *queue) 529 { 530 struct rpc_task *task, *next; 531 struct list_head *head; 532 533 rcu_read_lock_bh(); 534 spin_lock(&queue->lock); 535 head = &queue->tasks[queue->maxpriority]; 536 for (;;) { 537 list_for_each_entry_safe(task, next, head, u.tk_wait.list) 538 __rpc_wake_up_task(task); 539 if (head == &queue->tasks[0]) 540 break; 541 head--; 542 } 543 spin_unlock(&queue->lock); 544 rcu_read_unlock_bh(); 545 } 546 547 /** 548 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 549 * @queue: rpc_wait_queue on which the tasks are sleeping 550 * @status: status value to set 551 * 552 * Grabs queue->lock 553 */ 554 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 555 { 556 struct rpc_task *task, *next; 557 struct list_head *head; 558 559 rcu_read_lock_bh(); 560 spin_lock(&queue->lock); 561 head = &queue->tasks[queue->maxpriority]; 562 for (;;) { 563 list_for_each_entry_safe(task, next, head, u.tk_wait.list) { 564 task->tk_status = status; 565 __rpc_wake_up_task(task); 566 } 567 if (head == &queue->tasks[0]) 568 break; 569 head--; 570 } 571 spin_unlock(&queue->lock); 572 rcu_read_unlock_bh(); 573 } 574 575 static void __rpc_atrun(struct rpc_task *task) 576 { 577 rpc_wake_up_task(task); 578 } 579 580 /* 581 * Run a task at a later time 582 */ 583 void rpc_delay(struct rpc_task *task, unsigned long delay) 584 { 585 task->tk_timeout = delay; 586 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 587 } 588 589 /* 590 * Helper to call task->tk_ops->rpc_call_prepare 591 */ 592 static void rpc_prepare_task(struct rpc_task *task) 593 { 594 lock_kernel(); 595 task->tk_ops->rpc_call_prepare(task, task->tk_calldata); 596 unlock_kernel(); 597 } 598 599 /* 600 * Helper that calls task->tk_ops->rpc_call_done if it exists 601 */ 602 void rpc_exit_task(struct rpc_task *task) 603 { 604 task->tk_action = NULL; 605 if (task->tk_ops->rpc_call_done != NULL) { 606 lock_kernel(); 607 task->tk_ops->rpc_call_done(task, task->tk_calldata); 608 unlock_kernel(); 609 if (task->tk_action != NULL) { 610 WARN_ON(RPC_ASSASSINATED(task)); 611 /* Always release the RPC slot and buffer memory */ 612 xprt_release(task); 613 } 614 } 615 } 616 EXPORT_SYMBOL(rpc_exit_task); 617 618 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) 619 { 620 if (ops->rpc_release != NULL) { 621 lock_kernel(); 622 ops->rpc_release(calldata); 623 unlock_kernel(); 624 } 625 } 626 627 /* 628 * This is the RPC `scheduler' (or rather, the finite state machine). 629 */ 630 static void __rpc_execute(struct rpc_task *task) 631 { 632 int status = 0; 633 634 dprintk("RPC: %5u __rpc_execute flags=0x%x\n", 635 task->tk_pid, task->tk_flags); 636 637 BUG_ON(RPC_IS_QUEUED(task)); 638 639 for (;;) { 640 /* 641 * Garbage collection of pending timers... 642 */ 643 rpc_delete_timer(task); 644 645 /* 646 * Execute any pending callback. 647 */ 648 if (RPC_DO_CALLBACK(task)) { 649 /* Define a callback save pointer */ 650 void (*save_callback)(struct rpc_task *); 651 652 /* 653 * If a callback exists, save it, reset it, 654 * call it. 655 * The save is needed to stop from resetting 656 * another callback set within the callback handler 657 * - Dave 658 */ 659 save_callback=task->tk_callback; 660 task->tk_callback=NULL; 661 save_callback(task); 662 } 663 664 /* 665 * Perform the next FSM step. 666 * tk_action may be NULL when the task has been killed 667 * by someone else. 668 */ 669 if (!RPC_IS_QUEUED(task)) { 670 if (task->tk_action == NULL) 671 break; 672 task->tk_action(task); 673 } 674 675 /* 676 * Lockless check for whether task is sleeping or not. 677 */ 678 if (!RPC_IS_QUEUED(task)) 679 continue; 680 rpc_clear_running(task); 681 if (RPC_IS_ASYNC(task)) { 682 /* Careful! we may have raced... */ 683 if (RPC_IS_QUEUED(task)) 684 return; 685 if (rpc_test_and_set_running(task)) 686 return; 687 continue; 688 } 689 690 /* sync task: sleep here */ 691 dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid); 692 /* Note: Caller should be using rpc_clnt_sigmask() */ 693 status = out_of_line_wait_on_bit(&task->tk_runstate, 694 RPC_TASK_QUEUED, rpc_wait_bit_interruptible, 695 TASK_INTERRUPTIBLE); 696 if (status == -ERESTARTSYS) { 697 /* 698 * When a sync task receives a signal, it exits with 699 * -ERESTARTSYS. In order to catch any callbacks that 700 * clean up after sleeping on some queue, we don't 701 * break the loop here, but go around once more. 702 */ 703 dprintk("RPC: %5u got signal\n", task->tk_pid); 704 task->tk_flags |= RPC_TASK_KILLED; 705 rpc_exit(task, -ERESTARTSYS); 706 rpc_wake_up_task(task); 707 } 708 rpc_set_running(task); 709 dprintk("RPC: %5u sync task resuming\n", task->tk_pid); 710 } 711 712 dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status, 713 task->tk_status); 714 /* Release all resources associated with the task */ 715 rpc_release_task(task); 716 } 717 718 /* 719 * User-visible entry point to the scheduler. 720 * 721 * This may be called recursively if e.g. an async NFS task updates 722 * the attributes and finds that dirty pages must be flushed. 723 * NOTE: Upon exit of this function the task is guaranteed to be 724 * released. In particular note that tk_release() will have 725 * been called, so your task memory may have been freed. 726 */ 727 void rpc_execute(struct rpc_task *task) 728 { 729 rpc_set_active(task); 730 rpc_set_running(task); 731 __rpc_execute(task); 732 } 733 734 static void rpc_async_schedule(struct work_struct *work) 735 { 736 __rpc_execute(container_of(work, struct rpc_task, u.tk_work)); 737 } 738 739 /** 740 * rpc_malloc - allocate an RPC buffer 741 * @task: RPC task that will use this buffer 742 * @size: requested byte size 743 * 744 * We try to ensure that some NFS reads and writes can always proceed 745 * by using a mempool when allocating 'small' buffers. 746 * In order to avoid memory starvation triggering more writebacks of 747 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL. 748 */ 749 void * rpc_malloc(struct rpc_task *task, size_t size) 750 { 751 struct rpc_rqst *req = task->tk_rqstp; 752 gfp_t gfp; 753 754 if (task->tk_flags & RPC_TASK_SWAPPER) 755 gfp = GFP_ATOMIC; 756 else 757 gfp = GFP_NOFS; 758 759 if (size > RPC_BUFFER_MAXSIZE) { 760 req->rq_buffer = kmalloc(size, gfp); 761 if (req->rq_buffer) 762 req->rq_bufsize = size; 763 } else { 764 req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp); 765 if (req->rq_buffer) 766 req->rq_bufsize = RPC_BUFFER_MAXSIZE; 767 } 768 return req->rq_buffer; 769 } 770 771 /** 772 * rpc_free - free buffer allocated via rpc_malloc 773 * @task: RPC task with a buffer to be freed 774 * 775 */ 776 void rpc_free(struct rpc_task *task) 777 { 778 struct rpc_rqst *req = task->tk_rqstp; 779 780 if (req->rq_buffer) { 781 if (req->rq_bufsize == RPC_BUFFER_MAXSIZE) 782 mempool_free(req->rq_buffer, rpc_buffer_mempool); 783 else 784 kfree(req->rq_buffer); 785 req->rq_buffer = NULL; 786 req->rq_bufsize = 0; 787 } 788 } 789 790 /* 791 * Creation and deletion of RPC task structures 792 */ 793 void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 794 { 795 memset(task, 0, sizeof(*task)); 796 init_timer(&task->tk_timer); 797 task->tk_timer.data = (unsigned long) task; 798 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 799 atomic_set(&task->tk_count, 1); 800 task->tk_client = clnt; 801 task->tk_flags = flags; 802 task->tk_ops = tk_ops; 803 if (tk_ops->rpc_call_prepare != NULL) 804 task->tk_action = rpc_prepare_task; 805 task->tk_calldata = calldata; 806 807 /* Initialize retry counters */ 808 task->tk_garb_retry = 2; 809 task->tk_cred_retry = 2; 810 811 task->tk_priority = RPC_PRIORITY_NORMAL; 812 task->tk_cookie = (unsigned long)current; 813 814 /* Initialize workqueue for async tasks */ 815 task->tk_workqueue = rpciod_workqueue; 816 817 if (clnt) { 818 atomic_inc(&clnt->cl_users); 819 if (clnt->cl_softrtry) 820 task->tk_flags |= RPC_TASK_SOFT; 821 if (!clnt->cl_intr) 822 task->tk_flags |= RPC_TASK_NOINTR; 823 } 824 825 BUG_ON(task->tk_ops == NULL); 826 827 /* starting timestamp */ 828 task->tk_start = jiffies; 829 830 dprintk("RPC: new task initialized, procpid %u\n", 831 current->pid); 832 } 833 834 static struct rpc_task * 835 rpc_alloc_task(void) 836 { 837 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 838 } 839 840 static void rpc_free_task(struct rcu_head *rcu) 841 { 842 struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); 843 dprintk("RPC: %5u freeing task\n", task->tk_pid); 844 mempool_free(task, rpc_task_mempool); 845 } 846 847 /* 848 * Create a new task for the specified client. We have to 849 * clean up after an allocation failure, as the client may 850 * have specified "oneshot". 851 */ 852 struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 853 { 854 struct rpc_task *task; 855 856 task = rpc_alloc_task(); 857 if (!task) 858 goto cleanup; 859 860 rpc_init_task(task, clnt, flags, tk_ops, calldata); 861 862 dprintk("RPC: allocated task %p\n", task); 863 task->tk_flags |= RPC_TASK_DYNAMIC; 864 out: 865 return task; 866 867 cleanup: 868 /* Check whether to release the client */ 869 if (clnt) { 870 printk("rpc_new_task: failed, users=%d, oneshot=%d\n", 871 atomic_read(&clnt->cl_users), clnt->cl_oneshot); 872 atomic_inc(&clnt->cl_users); /* pretend we were used ... */ 873 rpc_release_client(clnt); 874 } 875 goto out; 876 } 877 878 879 void rpc_put_task(struct rpc_task *task) 880 { 881 const struct rpc_call_ops *tk_ops = task->tk_ops; 882 void *calldata = task->tk_calldata; 883 884 if (!atomic_dec_and_test(&task->tk_count)) 885 return; 886 /* Release resources */ 887 if (task->tk_rqstp) 888 xprt_release(task); 889 if (task->tk_msg.rpc_cred) 890 rpcauth_unbindcred(task); 891 if (task->tk_client) { 892 rpc_release_client(task->tk_client); 893 task->tk_client = NULL; 894 } 895 if (task->tk_flags & RPC_TASK_DYNAMIC) 896 call_rcu_bh(&task->u.tk_rcu, rpc_free_task); 897 rpc_release_calldata(tk_ops, calldata); 898 } 899 EXPORT_SYMBOL(rpc_put_task); 900 901 static void rpc_release_task(struct rpc_task *task) 902 { 903 #ifdef RPC_DEBUG 904 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 905 #endif 906 dprintk("RPC: %5u release task\n", task->tk_pid); 907 908 /* Remove from global task list */ 909 spin_lock(&rpc_sched_lock); 910 list_del(&task->tk_task); 911 spin_unlock(&rpc_sched_lock); 912 913 BUG_ON (RPC_IS_QUEUED(task)); 914 915 /* Synchronously delete any running timer */ 916 rpc_delete_timer(task); 917 918 #ifdef RPC_DEBUG 919 task->tk_magic = 0; 920 #endif 921 /* Wake up anyone who is waiting for task completion */ 922 rpc_mark_complete_task(task); 923 924 rpc_put_task(task); 925 } 926 927 /** 928 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it 929 * @clnt: pointer to RPC client 930 * @flags: RPC flags 931 * @ops: RPC call ops 932 * @data: user call data 933 */ 934 struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, 935 const struct rpc_call_ops *ops, 936 void *data) 937 { 938 struct rpc_task *task; 939 task = rpc_new_task(clnt, flags, ops, data); 940 if (task == NULL) { 941 rpc_release_calldata(ops, data); 942 return ERR_PTR(-ENOMEM); 943 } 944 atomic_inc(&task->tk_count); 945 rpc_execute(task); 946 return task; 947 } 948 EXPORT_SYMBOL(rpc_run_task); 949 950 /* 951 * Kill all tasks for the given client. 952 * XXX: kill their descendants as well? 953 */ 954 void rpc_killall_tasks(struct rpc_clnt *clnt) 955 { 956 struct rpc_task *rovr; 957 struct list_head *le; 958 959 dprintk("RPC: killing all tasks for client %p\n", clnt); 960 961 /* 962 * Spin lock all_tasks to prevent changes... 963 */ 964 spin_lock(&rpc_sched_lock); 965 alltask_for_each(rovr, le, &all_tasks) { 966 if (! RPC_IS_ACTIVATED(rovr)) 967 continue; 968 if (!clnt || rovr->tk_client == clnt) { 969 rovr->tk_flags |= RPC_TASK_KILLED; 970 rpc_exit(rovr, -EIO); 971 rpc_wake_up_task(rovr); 972 } 973 } 974 spin_unlock(&rpc_sched_lock); 975 } 976 977 static DECLARE_MUTEX_LOCKED(rpciod_running); 978 979 static void rpciod_killall(void) 980 { 981 unsigned long flags; 982 983 while (!list_empty(&all_tasks)) { 984 clear_thread_flag(TIF_SIGPENDING); 985 rpc_killall_tasks(NULL); 986 flush_workqueue(rpciod_workqueue); 987 if (!list_empty(&all_tasks)) { 988 dprintk("RPC: rpciod_killall: waiting for tasks " 989 "to exit\n"); 990 yield(); 991 } 992 } 993 994 spin_lock_irqsave(¤t->sighand->siglock, flags); 995 recalc_sigpending(); 996 spin_unlock_irqrestore(¤t->sighand->siglock, flags); 997 } 998 999 /* 1000 * Start up the rpciod process if it's not already running. 1001 */ 1002 int 1003 rpciod_up(void) 1004 { 1005 struct workqueue_struct *wq; 1006 int error = 0; 1007 1008 mutex_lock(&rpciod_mutex); 1009 dprintk("RPC: rpciod_up: users %u\n", rpciod_users); 1010 rpciod_users++; 1011 if (rpciod_workqueue) 1012 goto out; 1013 /* 1014 * If there's no pid, we should be the first user. 1015 */ 1016 if (rpciod_users > 1) 1017 printk(KERN_WARNING "rpciod_up: no workqueue, %u users??\n", rpciod_users); 1018 /* 1019 * Create the rpciod thread and wait for it to start. 1020 */ 1021 error = -ENOMEM; 1022 wq = create_workqueue("rpciod"); 1023 if (wq == NULL) { 1024 printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); 1025 rpciod_users--; 1026 goto out; 1027 } 1028 rpciod_workqueue = wq; 1029 error = 0; 1030 out: 1031 mutex_unlock(&rpciod_mutex); 1032 return error; 1033 } 1034 1035 void 1036 rpciod_down(void) 1037 { 1038 mutex_lock(&rpciod_mutex); 1039 dprintk("RPC: rpciod_down sema %u\n", rpciod_users); 1040 if (rpciod_users) { 1041 if (--rpciod_users) 1042 goto out; 1043 } else 1044 printk(KERN_WARNING "rpciod_down: no users??\n"); 1045 1046 if (!rpciod_workqueue) { 1047 dprintk("RPC: rpciod_down: Nothing to do!\n"); 1048 goto out; 1049 } 1050 rpciod_killall(); 1051 1052 destroy_workqueue(rpciod_workqueue); 1053 rpciod_workqueue = NULL; 1054 out: 1055 mutex_unlock(&rpciod_mutex); 1056 } 1057 1058 #ifdef RPC_DEBUG 1059 void rpc_show_tasks(void) 1060 { 1061 struct list_head *le; 1062 struct rpc_task *t; 1063 1064 spin_lock(&rpc_sched_lock); 1065 if (list_empty(&all_tasks)) { 1066 spin_unlock(&rpc_sched_lock); 1067 return; 1068 } 1069 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1070 "-rpcwait -action- ---ops--\n"); 1071 alltask_for_each(t, le, &all_tasks) { 1072 const char *rpc_waitq = "none"; 1073 1074 if (RPC_IS_QUEUED(t)) 1075 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1076 1077 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", 1078 t->tk_pid, 1079 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), 1080 t->tk_flags, t->tk_status, 1081 t->tk_client, 1082 (t->tk_client ? t->tk_client->cl_prog : 0), 1083 t->tk_rqstp, t->tk_timeout, 1084 rpc_waitq, 1085 t->tk_action, t->tk_ops); 1086 } 1087 spin_unlock(&rpc_sched_lock); 1088 } 1089 #endif 1090 1091 void 1092 rpc_destroy_mempool(void) 1093 { 1094 if (rpc_buffer_mempool) 1095 mempool_destroy(rpc_buffer_mempool); 1096 if (rpc_task_mempool) 1097 mempool_destroy(rpc_task_mempool); 1098 if (rpc_task_slabp) 1099 kmem_cache_destroy(rpc_task_slabp); 1100 if (rpc_buffer_slabp) 1101 kmem_cache_destroy(rpc_buffer_slabp); 1102 } 1103 1104 int 1105 rpc_init_mempool(void) 1106 { 1107 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1108 sizeof(struct rpc_task), 1109 0, SLAB_HWCACHE_ALIGN, 1110 NULL, NULL); 1111 if (!rpc_task_slabp) 1112 goto err_nomem; 1113 rpc_buffer_slabp = kmem_cache_create("rpc_buffers", 1114 RPC_BUFFER_MAXSIZE, 1115 0, SLAB_HWCACHE_ALIGN, 1116 NULL, NULL); 1117 if (!rpc_buffer_slabp) 1118 goto err_nomem; 1119 rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, 1120 rpc_task_slabp); 1121 if (!rpc_task_mempool) 1122 goto err_nomem; 1123 rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, 1124 rpc_buffer_slabp); 1125 if (!rpc_buffer_mempool) 1126 goto err_nomem; 1127 return 0; 1128 err_nomem: 1129 rpc_destroy_mempool(); 1130 return -ENOMEM; 1131 } 1132