1 /* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12 #include <linux/module.h> 13 14 #include <linux/sched.h> 15 #include <linux/interrupt.h> 16 #include <linux/slab.h> 17 #include <linux/mempool.h> 18 #include <linux/smp.h> 19 #include <linux/smp_lock.h> 20 #include <linux/spinlock.h> 21 #include <linux/mutex.h> 22 23 #include <linux/sunrpc/clnt.h> 24 25 #ifdef RPC_DEBUG 26 #define RPCDBG_FACILITY RPCDBG_SCHED 27 #define RPC_TASK_MAGIC_ID 0xf00baa 28 static int rpc_task_id; 29 #endif 30 31 /* 32 * RPC slabs and memory pools 33 */ 34 #define RPC_BUFFER_MAXSIZE (2048) 35 #define RPC_BUFFER_POOLSIZE (8) 36 #define RPC_TASK_POOLSIZE (8) 37 static struct kmem_cache *rpc_task_slabp __read_mostly; 38 static struct kmem_cache *rpc_buffer_slabp __read_mostly; 39 static mempool_t *rpc_task_mempool __read_mostly; 40 static mempool_t *rpc_buffer_mempool __read_mostly; 41 42 static void __rpc_default_timer(struct rpc_task *task); 43 static void rpciod_killall(void); 44 static void rpc_async_schedule(struct work_struct *); 45 static void rpc_release_task(struct rpc_task *task); 46 47 /* 48 * RPC tasks sit here while waiting for conditions to improve. 49 */ 50 static RPC_WAITQ(delay_queue, "delayq"); 51 52 /* 53 * All RPC tasks are linked into this list 54 */ 55 static LIST_HEAD(all_tasks); 56 57 /* 58 * rpciod-related stuff 59 */ 60 static DEFINE_MUTEX(rpciod_mutex); 61 static unsigned int rpciod_users; 62 struct workqueue_struct *rpciod_workqueue; 63 64 /* 65 * Spinlock for other critical sections of code. 66 */ 67 static DEFINE_SPINLOCK(rpc_sched_lock); 68 69 /* 70 * Disable the timer for a given RPC task. Should be called with 71 * queue->lock and bh_disabled in order to avoid races within 72 * rpc_run_timer(). 73 */ 74 static inline void 75 __rpc_disable_timer(struct rpc_task *task) 76 { 77 dprintk("RPC: %4d disabling timer\n", task->tk_pid); 78 task->tk_timeout_fn = NULL; 79 task->tk_timeout = 0; 80 } 81 82 /* 83 * Run a timeout function. 84 * We use the callback in order to allow __rpc_wake_up_task() 85 * and friends to disable the timer synchronously on SMP systems 86 * without calling del_timer_sync(). The latter could cause a 87 * deadlock if called while we're holding spinlocks... 88 */ 89 static void rpc_run_timer(struct rpc_task *task) 90 { 91 void (*callback)(struct rpc_task *); 92 93 callback = task->tk_timeout_fn; 94 task->tk_timeout_fn = NULL; 95 if (callback && RPC_IS_QUEUED(task)) { 96 dprintk("RPC: %4d running timer\n", task->tk_pid); 97 callback(task); 98 } 99 smp_mb__before_clear_bit(); 100 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 101 smp_mb__after_clear_bit(); 102 } 103 104 /* 105 * Set up a timer for the current task. 106 */ 107 static inline void 108 __rpc_add_timer(struct rpc_task *task, rpc_action timer) 109 { 110 if (!task->tk_timeout) 111 return; 112 113 dprintk("RPC: %4d setting alarm for %lu ms\n", 114 task->tk_pid, task->tk_timeout * 1000 / HZ); 115 116 if (timer) 117 task->tk_timeout_fn = timer; 118 else 119 task->tk_timeout_fn = __rpc_default_timer; 120 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 121 mod_timer(&task->tk_timer, jiffies + task->tk_timeout); 122 } 123 124 /* 125 * Delete any timer for the current task. Because we use del_timer_sync(), 126 * this function should never be called while holding queue->lock. 127 */ 128 static void 129 rpc_delete_timer(struct rpc_task *task) 130 { 131 if (RPC_IS_QUEUED(task)) 132 return; 133 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) { 134 del_singleshot_timer_sync(&task->tk_timer); 135 dprintk("RPC: %4d deleting timer\n", task->tk_pid); 136 } 137 } 138 139 /* 140 * Add new request to a priority queue. 141 */ 142 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task) 143 { 144 struct list_head *q; 145 struct rpc_task *t; 146 147 INIT_LIST_HEAD(&task->u.tk_wait.links); 148 q = &queue->tasks[task->tk_priority]; 149 if (unlikely(task->tk_priority > queue->maxpriority)) 150 q = &queue->tasks[queue->maxpriority]; 151 list_for_each_entry(t, q, u.tk_wait.list) { 152 if (t->tk_cookie == task->tk_cookie) { 153 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 154 return; 155 } 156 } 157 list_add_tail(&task->u.tk_wait.list, q); 158 } 159 160 /* 161 * Add new request to wait queue. 162 * 163 * Swapper tasks always get inserted at the head of the queue. 164 * This should avoid many nasty memory deadlocks and hopefully 165 * improve overall performance. 166 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 167 */ 168 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 169 { 170 BUG_ON (RPC_IS_QUEUED(task)); 171 172 if (RPC_IS_PRIORITY(queue)) 173 __rpc_add_wait_queue_priority(queue, task); 174 else if (RPC_IS_SWAPPER(task)) 175 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 176 else 177 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 178 task->u.tk_wait.rpc_waitq = queue; 179 queue->qlen++; 180 rpc_set_queued(task); 181 182 dprintk("RPC: %4d added to queue %p \"%s\"\n", 183 task->tk_pid, queue, rpc_qname(queue)); 184 } 185 186 /* 187 * Remove request from a priority queue. 188 */ 189 static void __rpc_remove_wait_queue_priority(struct rpc_task *task) 190 { 191 struct rpc_task *t; 192 193 if (!list_empty(&task->u.tk_wait.links)) { 194 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); 195 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 196 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 197 } 198 list_del(&task->u.tk_wait.list); 199 } 200 201 /* 202 * Remove request from queue. 203 * Note: must be called with spin lock held. 204 */ 205 static void __rpc_remove_wait_queue(struct rpc_task *task) 206 { 207 struct rpc_wait_queue *queue; 208 queue = task->u.tk_wait.rpc_waitq; 209 210 if (RPC_IS_PRIORITY(queue)) 211 __rpc_remove_wait_queue_priority(task); 212 else 213 list_del(&task->u.tk_wait.list); 214 queue->qlen--; 215 dprintk("RPC: %4d removed from queue %p \"%s\"\n", 216 task->tk_pid, queue, rpc_qname(queue)); 217 } 218 219 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) 220 { 221 queue->priority = priority; 222 queue->count = 1 << (priority * 2); 223 } 224 225 static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) 226 { 227 queue->cookie = cookie; 228 queue->nr = RPC_BATCH_COUNT; 229 } 230 231 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 232 { 233 rpc_set_waitqueue_priority(queue, queue->maxpriority); 234 rpc_set_waitqueue_cookie(queue, 0); 235 } 236 237 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) 238 { 239 int i; 240 241 spin_lock_init(&queue->lock); 242 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 243 INIT_LIST_HEAD(&queue->tasks[i]); 244 queue->maxpriority = maxprio; 245 rpc_reset_waitqueue_priority(queue); 246 #ifdef RPC_DEBUG 247 queue->name = qname; 248 #endif 249 } 250 251 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 252 { 253 __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); 254 } 255 256 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 257 { 258 __rpc_init_priority_wait_queue(queue, qname, 0); 259 } 260 EXPORT_SYMBOL(rpc_init_wait_queue); 261 262 static int rpc_wait_bit_interruptible(void *word) 263 { 264 if (signal_pending(current)) 265 return -ERESTARTSYS; 266 schedule(); 267 return 0; 268 } 269 270 static void rpc_set_active(struct rpc_task *task) 271 { 272 if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0) 273 return; 274 spin_lock(&rpc_sched_lock); 275 #ifdef RPC_DEBUG 276 task->tk_magic = RPC_TASK_MAGIC_ID; 277 task->tk_pid = rpc_task_id++; 278 #endif 279 /* Add to global list of all tasks */ 280 list_add_tail(&task->tk_task, &all_tasks); 281 spin_unlock(&rpc_sched_lock); 282 } 283 284 /* 285 * Mark an RPC call as having completed by clearing the 'active' bit 286 */ 287 static void rpc_mark_complete_task(struct rpc_task *task) 288 { 289 smp_mb__before_clear_bit(); 290 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 291 smp_mb__after_clear_bit(); 292 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); 293 } 294 295 /* 296 * Allow callers to wait for completion of an RPC call 297 */ 298 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 299 { 300 if (action == NULL) 301 action = rpc_wait_bit_interruptible; 302 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 303 action, TASK_INTERRUPTIBLE); 304 } 305 EXPORT_SYMBOL(__rpc_wait_for_completion_task); 306 307 /* 308 * Make an RPC task runnable. 309 * 310 * Note: If the task is ASYNC, this must be called with 311 * the spinlock held to protect the wait queue operation. 312 */ 313 static void rpc_make_runnable(struct rpc_task *task) 314 { 315 BUG_ON(task->tk_timeout_fn); 316 rpc_clear_queued(task); 317 if (rpc_test_and_set_running(task)) 318 return; 319 /* We might have raced */ 320 if (RPC_IS_QUEUED(task)) { 321 rpc_clear_running(task); 322 return; 323 } 324 if (RPC_IS_ASYNC(task)) { 325 int status; 326 327 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 328 status = queue_work(task->tk_workqueue, &task->u.tk_work); 329 if (status < 0) { 330 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 331 task->tk_status = status; 332 return; 333 } 334 } else 335 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 336 } 337 338 /* 339 * Prepare for sleeping on a wait queue. 340 * By always appending tasks to the list we ensure FIFO behavior. 341 * NB: An RPC task will only receive interrupt-driven events as long 342 * as it's on a wait queue. 343 */ 344 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 345 rpc_action action, rpc_action timer) 346 { 347 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, 348 rpc_qname(q), jiffies); 349 350 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { 351 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); 352 return; 353 } 354 355 __rpc_add_wait_queue(q, task); 356 357 BUG_ON(task->tk_callback != NULL); 358 task->tk_callback = action; 359 __rpc_add_timer(task, timer); 360 } 361 362 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 363 rpc_action action, rpc_action timer) 364 { 365 /* Mark the task as being activated if so needed */ 366 rpc_set_active(task); 367 368 /* 369 * Protect the queue operations. 370 */ 371 spin_lock_bh(&q->lock); 372 __rpc_sleep_on(q, task, action, timer); 373 spin_unlock_bh(&q->lock); 374 } 375 376 /** 377 * __rpc_do_wake_up_task - wake up a single rpc_task 378 * @task: task to be woken up 379 * 380 * Caller must hold queue->lock, and have cleared the task queued flag. 381 */ 382 static void __rpc_do_wake_up_task(struct rpc_task *task) 383 { 384 dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies); 385 386 #ifdef RPC_DEBUG 387 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 388 #endif 389 /* Has the task been executed yet? If not, we cannot wake it up! */ 390 if (!RPC_IS_ACTIVATED(task)) { 391 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 392 return; 393 } 394 395 __rpc_disable_timer(task); 396 __rpc_remove_wait_queue(task); 397 398 rpc_make_runnable(task); 399 400 dprintk("RPC: __rpc_wake_up_task done\n"); 401 } 402 403 /* 404 * Wake up the specified task 405 */ 406 static void __rpc_wake_up_task(struct rpc_task *task) 407 { 408 if (rpc_start_wakeup(task)) { 409 if (RPC_IS_QUEUED(task)) 410 __rpc_do_wake_up_task(task); 411 rpc_finish_wakeup(task); 412 } 413 } 414 415 /* 416 * Default timeout handler if none specified by user 417 */ 418 static void 419 __rpc_default_timer(struct rpc_task *task) 420 { 421 dprintk("RPC: %d timeout (default timer)\n", task->tk_pid); 422 task->tk_status = -ETIMEDOUT; 423 rpc_wake_up_task(task); 424 } 425 426 /* 427 * Wake up the specified task 428 */ 429 void rpc_wake_up_task(struct rpc_task *task) 430 { 431 rcu_read_lock_bh(); 432 if (rpc_start_wakeup(task)) { 433 if (RPC_IS_QUEUED(task)) { 434 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; 435 436 /* Note: we're already in a bh-safe context */ 437 spin_lock(&queue->lock); 438 __rpc_do_wake_up_task(task); 439 spin_unlock(&queue->lock); 440 } 441 rpc_finish_wakeup(task); 442 } 443 rcu_read_unlock_bh(); 444 } 445 446 /* 447 * Wake up the next task on a priority queue. 448 */ 449 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue) 450 { 451 struct list_head *q; 452 struct rpc_task *task; 453 454 /* 455 * Service a batch of tasks from a single cookie. 456 */ 457 q = &queue->tasks[queue->priority]; 458 if (!list_empty(q)) { 459 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 460 if (queue->cookie == task->tk_cookie) { 461 if (--queue->nr) 462 goto out; 463 list_move_tail(&task->u.tk_wait.list, q); 464 } 465 /* 466 * Check if we need to switch queues. 467 */ 468 if (--queue->count) 469 goto new_cookie; 470 } 471 472 /* 473 * Service the next queue. 474 */ 475 do { 476 if (q == &queue->tasks[0]) 477 q = &queue->tasks[queue->maxpriority]; 478 else 479 q = q - 1; 480 if (!list_empty(q)) { 481 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 482 goto new_queue; 483 } 484 } while (q != &queue->tasks[queue->priority]); 485 486 rpc_reset_waitqueue_priority(queue); 487 return NULL; 488 489 new_queue: 490 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 491 new_cookie: 492 rpc_set_waitqueue_cookie(queue, task->tk_cookie); 493 out: 494 __rpc_wake_up_task(task); 495 return task; 496 } 497 498 /* 499 * Wake up the next task on the wait queue. 500 */ 501 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) 502 { 503 struct rpc_task *task = NULL; 504 505 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); 506 rcu_read_lock_bh(); 507 spin_lock(&queue->lock); 508 if (RPC_IS_PRIORITY(queue)) 509 task = __rpc_wake_up_next_priority(queue); 510 else { 511 task_for_first(task, &queue->tasks[0]) 512 __rpc_wake_up_task(task); 513 } 514 spin_unlock(&queue->lock); 515 rcu_read_unlock_bh(); 516 517 return task; 518 } 519 520 /** 521 * rpc_wake_up - wake up all rpc_tasks 522 * @queue: rpc_wait_queue on which the tasks are sleeping 523 * 524 * Grabs queue->lock 525 */ 526 void rpc_wake_up(struct rpc_wait_queue *queue) 527 { 528 struct rpc_task *task, *next; 529 struct list_head *head; 530 531 rcu_read_lock_bh(); 532 spin_lock(&queue->lock); 533 head = &queue->tasks[queue->maxpriority]; 534 for (;;) { 535 list_for_each_entry_safe(task, next, head, u.tk_wait.list) 536 __rpc_wake_up_task(task); 537 if (head == &queue->tasks[0]) 538 break; 539 head--; 540 } 541 spin_unlock(&queue->lock); 542 rcu_read_unlock_bh(); 543 } 544 545 /** 546 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 547 * @queue: rpc_wait_queue on which the tasks are sleeping 548 * @status: status value to set 549 * 550 * Grabs queue->lock 551 */ 552 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 553 { 554 struct rpc_task *task, *next; 555 struct list_head *head; 556 557 rcu_read_lock_bh(); 558 spin_lock(&queue->lock); 559 head = &queue->tasks[queue->maxpriority]; 560 for (;;) { 561 list_for_each_entry_safe(task, next, head, u.tk_wait.list) { 562 task->tk_status = status; 563 __rpc_wake_up_task(task); 564 } 565 if (head == &queue->tasks[0]) 566 break; 567 head--; 568 } 569 spin_unlock(&queue->lock); 570 rcu_read_unlock_bh(); 571 } 572 573 static void __rpc_atrun(struct rpc_task *task) 574 { 575 rpc_wake_up_task(task); 576 } 577 578 /* 579 * Run a task at a later time 580 */ 581 void rpc_delay(struct rpc_task *task, unsigned long delay) 582 { 583 task->tk_timeout = delay; 584 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 585 } 586 587 /* 588 * Helper to call task->tk_ops->rpc_call_prepare 589 */ 590 static void rpc_prepare_task(struct rpc_task *task) 591 { 592 lock_kernel(); 593 task->tk_ops->rpc_call_prepare(task, task->tk_calldata); 594 unlock_kernel(); 595 } 596 597 /* 598 * Helper that calls task->tk_ops->rpc_call_done if it exists 599 */ 600 void rpc_exit_task(struct rpc_task *task) 601 { 602 task->tk_action = NULL; 603 if (task->tk_ops->rpc_call_done != NULL) { 604 lock_kernel(); 605 task->tk_ops->rpc_call_done(task, task->tk_calldata); 606 unlock_kernel(); 607 if (task->tk_action != NULL) { 608 WARN_ON(RPC_ASSASSINATED(task)); 609 /* Always release the RPC slot and buffer memory */ 610 xprt_release(task); 611 } 612 } 613 } 614 EXPORT_SYMBOL(rpc_exit_task); 615 616 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) 617 { 618 if (ops->rpc_release != NULL) { 619 lock_kernel(); 620 ops->rpc_release(calldata); 621 unlock_kernel(); 622 } 623 } 624 625 /* 626 * This is the RPC `scheduler' (or rather, the finite state machine). 627 */ 628 static int __rpc_execute(struct rpc_task *task) 629 { 630 int status = 0; 631 632 dprintk("RPC: %4d rpc_execute flgs %x\n", 633 task->tk_pid, task->tk_flags); 634 635 BUG_ON(RPC_IS_QUEUED(task)); 636 637 for (;;) { 638 /* 639 * Garbage collection of pending timers... 640 */ 641 rpc_delete_timer(task); 642 643 /* 644 * Execute any pending callback. 645 */ 646 if (RPC_DO_CALLBACK(task)) { 647 /* Define a callback save pointer */ 648 void (*save_callback)(struct rpc_task *); 649 650 /* 651 * If a callback exists, save it, reset it, 652 * call it. 653 * The save is needed to stop from resetting 654 * another callback set within the callback handler 655 * - Dave 656 */ 657 save_callback=task->tk_callback; 658 task->tk_callback=NULL; 659 save_callback(task); 660 } 661 662 /* 663 * Perform the next FSM step. 664 * tk_action may be NULL when the task has been killed 665 * by someone else. 666 */ 667 if (!RPC_IS_QUEUED(task)) { 668 if (task->tk_action == NULL) 669 break; 670 task->tk_action(task); 671 } 672 673 /* 674 * Lockless check for whether task is sleeping or not. 675 */ 676 if (!RPC_IS_QUEUED(task)) 677 continue; 678 rpc_clear_running(task); 679 if (RPC_IS_ASYNC(task)) { 680 /* Careful! we may have raced... */ 681 if (RPC_IS_QUEUED(task)) 682 return 0; 683 if (rpc_test_and_set_running(task)) 684 return 0; 685 continue; 686 } 687 688 /* sync task: sleep here */ 689 dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); 690 /* Note: Caller should be using rpc_clnt_sigmask() */ 691 status = out_of_line_wait_on_bit(&task->tk_runstate, 692 RPC_TASK_QUEUED, rpc_wait_bit_interruptible, 693 TASK_INTERRUPTIBLE); 694 if (status == -ERESTARTSYS) { 695 /* 696 * When a sync task receives a signal, it exits with 697 * -ERESTARTSYS. In order to catch any callbacks that 698 * clean up after sleeping on some queue, we don't 699 * break the loop here, but go around once more. 700 */ 701 dprintk("RPC: %4d got signal\n", task->tk_pid); 702 task->tk_flags |= RPC_TASK_KILLED; 703 rpc_exit(task, -ERESTARTSYS); 704 rpc_wake_up_task(task); 705 } 706 rpc_set_running(task); 707 dprintk("RPC: %4d sync task resuming\n", task->tk_pid); 708 } 709 710 dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status); 711 /* Release all resources associated with the task */ 712 rpc_release_task(task); 713 return status; 714 } 715 716 /* 717 * User-visible entry point to the scheduler. 718 * 719 * This may be called recursively if e.g. an async NFS task updates 720 * the attributes and finds that dirty pages must be flushed. 721 * NOTE: Upon exit of this function the task is guaranteed to be 722 * released. In particular note that tk_release() will have 723 * been called, so your task memory may have been freed. 724 */ 725 int 726 rpc_execute(struct rpc_task *task) 727 { 728 rpc_set_active(task); 729 rpc_set_running(task); 730 return __rpc_execute(task); 731 } 732 733 static void rpc_async_schedule(struct work_struct *work) 734 { 735 __rpc_execute(container_of(work, struct rpc_task, u.tk_work)); 736 } 737 738 /** 739 * rpc_malloc - allocate an RPC buffer 740 * @task: RPC task that will use this buffer 741 * @size: requested byte size 742 * 743 * We try to ensure that some NFS reads and writes can always proceed 744 * by using a mempool when allocating 'small' buffers. 745 * In order to avoid memory starvation triggering more writebacks of 746 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL. 747 */ 748 void * rpc_malloc(struct rpc_task *task, size_t size) 749 { 750 struct rpc_rqst *req = task->tk_rqstp; 751 gfp_t gfp; 752 753 if (task->tk_flags & RPC_TASK_SWAPPER) 754 gfp = GFP_ATOMIC; 755 else 756 gfp = GFP_NOFS; 757 758 if (size > RPC_BUFFER_MAXSIZE) { 759 req->rq_buffer = kmalloc(size, gfp); 760 if (req->rq_buffer) 761 req->rq_bufsize = size; 762 } else { 763 req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp); 764 if (req->rq_buffer) 765 req->rq_bufsize = RPC_BUFFER_MAXSIZE; 766 } 767 return req->rq_buffer; 768 } 769 770 /** 771 * rpc_free - free buffer allocated via rpc_malloc 772 * @task: RPC task with a buffer to be freed 773 * 774 */ 775 void rpc_free(struct rpc_task *task) 776 { 777 struct rpc_rqst *req = task->tk_rqstp; 778 779 if (req->rq_buffer) { 780 if (req->rq_bufsize == RPC_BUFFER_MAXSIZE) 781 mempool_free(req->rq_buffer, rpc_buffer_mempool); 782 else 783 kfree(req->rq_buffer); 784 req->rq_buffer = NULL; 785 req->rq_bufsize = 0; 786 } 787 } 788 789 /* 790 * Creation and deletion of RPC task structures 791 */ 792 void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 793 { 794 memset(task, 0, sizeof(*task)); 795 init_timer(&task->tk_timer); 796 task->tk_timer.data = (unsigned long) task; 797 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 798 atomic_set(&task->tk_count, 1); 799 task->tk_client = clnt; 800 task->tk_flags = flags; 801 task->tk_ops = tk_ops; 802 if (tk_ops->rpc_call_prepare != NULL) 803 task->tk_action = rpc_prepare_task; 804 task->tk_calldata = calldata; 805 806 /* Initialize retry counters */ 807 task->tk_garb_retry = 2; 808 task->tk_cred_retry = 2; 809 810 task->tk_priority = RPC_PRIORITY_NORMAL; 811 task->tk_cookie = (unsigned long)current; 812 813 /* Initialize workqueue for async tasks */ 814 task->tk_workqueue = rpciod_workqueue; 815 816 if (clnt) { 817 atomic_inc(&clnt->cl_users); 818 if (clnt->cl_softrtry) 819 task->tk_flags |= RPC_TASK_SOFT; 820 if (!clnt->cl_intr) 821 task->tk_flags |= RPC_TASK_NOINTR; 822 } 823 824 BUG_ON(task->tk_ops == NULL); 825 826 /* starting timestamp */ 827 task->tk_start = jiffies; 828 829 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, 830 current->pid); 831 } 832 833 static struct rpc_task * 834 rpc_alloc_task(void) 835 { 836 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 837 } 838 839 static void rpc_free_task(struct rcu_head *rcu) 840 { 841 struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); 842 dprintk("RPC: %4d freeing task\n", task->tk_pid); 843 mempool_free(task, rpc_task_mempool); 844 } 845 846 /* 847 * Create a new task for the specified client. We have to 848 * clean up after an allocation failure, as the client may 849 * have specified "oneshot". 850 */ 851 struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 852 { 853 struct rpc_task *task; 854 855 task = rpc_alloc_task(); 856 if (!task) 857 goto cleanup; 858 859 rpc_init_task(task, clnt, flags, tk_ops, calldata); 860 861 dprintk("RPC: %4d allocated task\n", task->tk_pid); 862 task->tk_flags |= RPC_TASK_DYNAMIC; 863 out: 864 return task; 865 866 cleanup: 867 /* Check whether to release the client */ 868 if (clnt) { 869 printk("rpc_new_task: failed, users=%d, oneshot=%d\n", 870 atomic_read(&clnt->cl_users), clnt->cl_oneshot); 871 atomic_inc(&clnt->cl_users); /* pretend we were used ... */ 872 rpc_release_client(clnt); 873 } 874 goto out; 875 } 876 877 878 void rpc_put_task(struct rpc_task *task) 879 { 880 const struct rpc_call_ops *tk_ops = task->tk_ops; 881 void *calldata = task->tk_calldata; 882 883 if (!atomic_dec_and_test(&task->tk_count)) 884 return; 885 /* Release resources */ 886 if (task->tk_rqstp) 887 xprt_release(task); 888 if (task->tk_msg.rpc_cred) 889 rpcauth_unbindcred(task); 890 if (task->tk_client) { 891 rpc_release_client(task->tk_client); 892 task->tk_client = NULL; 893 } 894 if (task->tk_flags & RPC_TASK_DYNAMIC) 895 call_rcu_bh(&task->u.tk_rcu, rpc_free_task); 896 rpc_release_calldata(tk_ops, calldata); 897 } 898 EXPORT_SYMBOL(rpc_put_task); 899 900 static void rpc_release_task(struct rpc_task *task) 901 { 902 #ifdef RPC_DEBUG 903 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 904 #endif 905 dprintk("RPC: %4d release task\n", task->tk_pid); 906 907 /* Remove from global task list */ 908 spin_lock(&rpc_sched_lock); 909 list_del(&task->tk_task); 910 spin_unlock(&rpc_sched_lock); 911 912 BUG_ON (RPC_IS_QUEUED(task)); 913 914 /* Synchronously delete any running timer */ 915 rpc_delete_timer(task); 916 917 #ifdef RPC_DEBUG 918 task->tk_magic = 0; 919 #endif 920 /* Wake up anyone who is waiting for task completion */ 921 rpc_mark_complete_task(task); 922 923 rpc_put_task(task); 924 } 925 926 /** 927 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it 928 * @clnt: pointer to RPC client 929 * @flags: RPC flags 930 * @ops: RPC call ops 931 * @data: user call data 932 */ 933 struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, 934 const struct rpc_call_ops *ops, 935 void *data) 936 { 937 struct rpc_task *task; 938 task = rpc_new_task(clnt, flags, ops, data); 939 if (task == NULL) { 940 rpc_release_calldata(ops, data); 941 return ERR_PTR(-ENOMEM); 942 } 943 atomic_inc(&task->tk_count); 944 rpc_execute(task); 945 return task; 946 } 947 EXPORT_SYMBOL(rpc_run_task); 948 949 /* 950 * Kill all tasks for the given client. 951 * XXX: kill their descendants as well? 952 */ 953 void rpc_killall_tasks(struct rpc_clnt *clnt) 954 { 955 struct rpc_task *rovr; 956 struct list_head *le; 957 958 dprintk("RPC: killing all tasks for client %p\n", clnt); 959 960 /* 961 * Spin lock all_tasks to prevent changes... 962 */ 963 spin_lock(&rpc_sched_lock); 964 alltask_for_each(rovr, le, &all_tasks) { 965 if (! RPC_IS_ACTIVATED(rovr)) 966 continue; 967 if (!clnt || rovr->tk_client == clnt) { 968 rovr->tk_flags |= RPC_TASK_KILLED; 969 rpc_exit(rovr, -EIO); 970 rpc_wake_up_task(rovr); 971 } 972 } 973 spin_unlock(&rpc_sched_lock); 974 } 975 976 static DECLARE_MUTEX_LOCKED(rpciod_running); 977 978 static void rpciod_killall(void) 979 { 980 unsigned long flags; 981 982 while (!list_empty(&all_tasks)) { 983 clear_thread_flag(TIF_SIGPENDING); 984 rpc_killall_tasks(NULL); 985 flush_workqueue(rpciod_workqueue); 986 if (!list_empty(&all_tasks)) { 987 dprintk("rpciod_killall: waiting for tasks to exit\n"); 988 yield(); 989 } 990 } 991 992 spin_lock_irqsave(¤t->sighand->siglock, flags); 993 recalc_sigpending(); 994 spin_unlock_irqrestore(¤t->sighand->siglock, flags); 995 } 996 997 /* 998 * Start up the rpciod process if it's not already running. 999 */ 1000 int 1001 rpciod_up(void) 1002 { 1003 struct workqueue_struct *wq; 1004 int error = 0; 1005 1006 mutex_lock(&rpciod_mutex); 1007 dprintk("rpciod_up: users %d\n", rpciod_users); 1008 rpciod_users++; 1009 if (rpciod_workqueue) 1010 goto out; 1011 /* 1012 * If there's no pid, we should be the first user. 1013 */ 1014 if (rpciod_users > 1) 1015 printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users); 1016 /* 1017 * Create the rpciod thread and wait for it to start. 1018 */ 1019 error = -ENOMEM; 1020 wq = create_workqueue("rpciod"); 1021 if (wq == NULL) { 1022 printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); 1023 rpciod_users--; 1024 goto out; 1025 } 1026 rpciod_workqueue = wq; 1027 error = 0; 1028 out: 1029 mutex_unlock(&rpciod_mutex); 1030 return error; 1031 } 1032 1033 void 1034 rpciod_down(void) 1035 { 1036 mutex_lock(&rpciod_mutex); 1037 dprintk("rpciod_down sema %d\n", rpciod_users); 1038 if (rpciod_users) { 1039 if (--rpciod_users) 1040 goto out; 1041 } else 1042 printk(KERN_WARNING "rpciod_down: no users??\n"); 1043 1044 if (!rpciod_workqueue) { 1045 dprintk("rpciod_down: Nothing to do!\n"); 1046 goto out; 1047 } 1048 rpciod_killall(); 1049 1050 destroy_workqueue(rpciod_workqueue); 1051 rpciod_workqueue = NULL; 1052 out: 1053 mutex_unlock(&rpciod_mutex); 1054 } 1055 1056 #ifdef RPC_DEBUG 1057 void rpc_show_tasks(void) 1058 { 1059 struct list_head *le; 1060 struct rpc_task *t; 1061 1062 spin_lock(&rpc_sched_lock); 1063 if (list_empty(&all_tasks)) { 1064 spin_unlock(&rpc_sched_lock); 1065 return; 1066 } 1067 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1068 "-rpcwait -action- ---ops--\n"); 1069 alltask_for_each(t, le, &all_tasks) { 1070 const char *rpc_waitq = "none"; 1071 1072 if (RPC_IS_QUEUED(t)) 1073 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1074 1075 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", 1076 t->tk_pid, 1077 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), 1078 t->tk_flags, t->tk_status, 1079 t->tk_client, 1080 (t->tk_client ? t->tk_client->cl_prog : 0), 1081 t->tk_rqstp, t->tk_timeout, 1082 rpc_waitq, 1083 t->tk_action, t->tk_ops); 1084 } 1085 spin_unlock(&rpc_sched_lock); 1086 } 1087 #endif 1088 1089 void 1090 rpc_destroy_mempool(void) 1091 { 1092 if (rpc_buffer_mempool) 1093 mempool_destroy(rpc_buffer_mempool); 1094 if (rpc_task_mempool) 1095 mempool_destroy(rpc_task_mempool); 1096 if (rpc_task_slabp) 1097 kmem_cache_destroy(rpc_task_slabp); 1098 if (rpc_buffer_slabp) 1099 kmem_cache_destroy(rpc_buffer_slabp); 1100 } 1101 1102 int 1103 rpc_init_mempool(void) 1104 { 1105 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1106 sizeof(struct rpc_task), 1107 0, SLAB_HWCACHE_ALIGN, 1108 NULL, NULL); 1109 if (!rpc_task_slabp) 1110 goto err_nomem; 1111 rpc_buffer_slabp = kmem_cache_create("rpc_buffers", 1112 RPC_BUFFER_MAXSIZE, 1113 0, SLAB_HWCACHE_ALIGN, 1114 NULL, NULL); 1115 if (!rpc_buffer_slabp) 1116 goto err_nomem; 1117 rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, 1118 rpc_task_slabp); 1119 if (!rpc_task_mempool) 1120 goto err_nomem; 1121 rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, 1122 rpc_buffer_slabp); 1123 if (!rpc_buffer_mempool) 1124 goto err_nomem; 1125 return 0; 1126 err_nomem: 1127 rpc_destroy_mempool(); 1128 return -ENOMEM; 1129 } 1130