1 /* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12 #include <linux/module.h> 13 14 #include <linux/sched.h> 15 #include <linux/interrupt.h> 16 #include <linux/slab.h> 17 #include <linux/mempool.h> 18 #include <linux/smp.h> 19 #include <linux/smp_lock.h> 20 #include <linux/spinlock.h> 21 #include <linux/mutex.h> 22 23 #include <linux/sunrpc/clnt.h> 24 25 #ifdef RPC_DEBUG 26 #define RPCDBG_FACILITY RPCDBG_SCHED 27 #define RPC_TASK_MAGIC_ID 0xf00baa 28 static int rpc_task_id; 29 #endif 30 31 /* 32 * RPC slabs and memory pools 33 */ 34 #define RPC_BUFFER_MAXSIZE (2048) 35 #define RPC_BUFFER_POOLSIZE (8) 36 #define RPC_TASK_POOLSIZE (8) 37 static struct kmem_cache *rpc_task_slabp __read_mostly; 38 static struct kmem_cache *rpc_buffer_slabp __read_mostly; 39 static mempool_t *rpc_task_mempool __read_mostly; 40 static mempool_t *rpc_buffer_mempool __read_mostly; 41 42 static void __rpc_default_timer(struct rpc_task *task); 43 static void rpciod_killall(void); 44 static void rpc_async_schedule(struct work_struct *); 45 static void rpc_release_task(struct rpc_task *task); 46 47 /* 48 * RPC tasks sit here while waiting for conditions to improve. 49 */ 50 static RPC_WAITQ(delay_queue, "delayq"); 51 52 /* 53 * All RPC tasks are linked into this list 54 */ 55 static LIST_HEAD(all_tasks); 56 57 /* 58 * rpciod-related stuff 59 */ 60 static DEFINE_MUTEX(rpciod_mutex); 61 static unsigned int rpciod_users; 62 struct workqueue_struct *rpciod_workqueue; 63 64 /* 65 * Spinlock for other critical sections of code. 66 */ 67 static DEFINE_SPINLOCK(rpc_sched_lock); 68 69 /* 70 * Disable the timer for a given RPC task. Should be called with 71 * queue->lock and bh_disabled in order to avoid races within 72 * rpc_run_timer(). 73 */ 74 static inline void 75 __rpc_disable_timer(struct rpc_task *task) 76 { 77 dprintk("RPC: %5u disabling timer\n", task->tk_pid); 78 task->tk_timeout_fn = NULL; 79 task->tk_timeout = 0; 80 } 81 82 /* 83 * Run a timeout function. 84 * We use the callback in order to allow __rpc_wake_up_task() 85 * and friends to disable the timer synchronously on SMP systems 86 * without calling del_timer_sync(). The latter could cause a 87 * deadlock if called while we're holding spinlocks... 88 */ 89 static void rpc_run_timer(struct rpc_task *task) 90 { 91 void (*callback)(struct rpc_task *); 92 93 callback = task->tk_timeout_fn; 94 task->tk_timeout_fn = NULL; 95 if (callback && RPC_IS_QUEUED(task)) { 96 dprintk("RPC: %5u running timer\n", task->tk_pid); 97 callback(task); 98 } 99 smp_mb__before_clear_bit(); 100 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 101 smp_mb__after_clear_bit(); 102 } 103 104 /* 105 * Set up a timer for the current task. 106 */ 107 static inline void 108 __rpc_add_timer(struct rpc_task *task, rpc_action timer) 109 { 110 if (!task->tk_timeout) 111 return; 112 113 dprintk("RPC: %5u setting alarm for %lu ms\n", 114 task->tk_pid, task->tk_timeout * 1000 / HZ); 115 116 if (timer) 117 task->tk_timeout_fn = timer; 118 else 119 task->tk_timeout_fn = __rpc_default_timer; 120 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 121 mod_timer(&task->tk_timer, jiffies + task->tk_timeout); 122 } 123 124 /* 125 * Delete any timer for the current task. Because we use del_timer_sync(), 126 * this function should never be called while holding queue->lock. 127 */ 128 static void 129 rpc_delete_timer(struct rpc_task *task) 130 { 131 if (RPC_IS_QUEUED(task)) 132 return; 133 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) { 134 del_singleshot_timer_sync(&task->tk_timer); 135 dprintk("RPC: %5u deleting timer\n", task->tk_pid); 136 } 137 } 138 139 /* 140 * Add new request to a priority queue. 141 */ 142 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task) 143 { 144 struct list_head *q; 145 struct rpc_task *t; 146 147 INIT_LIST_HEAD(&task->u.tk_wait.links); 148 q = &queue->tasks[task->tk_priority]; 149 if (unlikely(task->tk_priority > queue->maxpriority)) 150 q = &queue->tasks[queue->maxpriority]; 151 list_for_each_entry(t, q, u.tk_wait.list) { 152 if (t->tk_cookie == task->tk_cookie) { 153 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 154 return; 155 } 156 } 157 list_add_tail(&task->u.tk_wait.list, q); 158 } 159 160 /* 161 * Add new request to wait queue. 162 * 163 * Swapper tasks always get inserted at the head of the queue. 164 * This should avoid many nasty memory deadlocks and hopefully 165 * improve overall performance. 166 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 167 */ 168 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 169 { 170 BUG_ON (RPC_IS_QUEUED(task)); 171 172 if (RPC_IS_PRIORITY(queue)) 173 __rpc_add_wait_queue_priority(queue, task); 174 else if (RPC_IS_SWAPPER(task)) 175 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 176 else 177 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 178 task->u.tk_wait.rpc_waitq = queue; 179 queue->qlen++; 180 rpc_set_queued(task); 181 182 dprintk("RPC: %5u added to queue %p \"%s\"\n", 183 task->tk_pid, queue, rpc_qname(queue)); 184 } 185 186 /* 187 * Remove request from a priority queue. 188 */ 189 static void __rpc_remove_wait_queue_priority(struct rpc_task *task) 190 { 191 struct rpc_task *t; 192 193 if (!list_empty(&task->u.tk_wait.links)) { 194 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); 195 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 196 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 197 } 198 list_del(&task->u.tk_wait.list); 199 } 200 201 /* 202 * Remove request from queue. 203 * Note: must be called with spin lock held. 204 */ 205 static void __rpc_remove_wait_queue(struct rpc_task *task) 206 { 207 struct rpc_wait_queue *queue; 208 queue = task->u.tk_wait.rpc_waitq; 209 210 if (RPC_IS_PRIORITY(queue)) 211 __rpc_remove_wait_queue_priority(task); 212 else 213 list_del(&task->u.tk_wait.list); 214 queue->qlen--; 215 dprintk("RPC: %5u removed from queue %p \"%s\"\n", 216 task->tk_pid, queue, rpc_qname(queue)); 217 } 218 219 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) 220 { 221 queue->priority = priority; 222 queue->count = 1 << (priority * 2); 223 } 224 225 static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) 226 { 227 queue->cookie = cookie; 228 queue->nr = RPC_BATCH_COUNT; 229 } 230 231 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 232 { 233 rpc_set_waitqueue_priority(queue, queue->maxpriority); 234 rpc_set_waitqueue_cookie(queue, 0); 235 } 236 237 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) 238 { 239 int i; 240 241 spin_lock_init(&queue->lock); 242 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 243 INIT_LIST_HEAD(&queue->tasks[i]); 244 queue->maxpriority = maxprio; 245 rpc_reset_waitqueue_priority(queue); 246 #ifdef RPC_DEBUG 247 queue->name = qname; 248 #endif 249 } 250 251 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 252 { 253 __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); 254 } 255 256 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 257 { 258 __rpc_init_priority_wait_queue(queue, qname, 0); 259 } 260 EXPORT_SYMBOL(rpc_init_wait_queue); 261 262 static int rpc_wait_bit_interruptible(void *word) 263 { 264 if (signal_pending(current)) 265 return -ERESTARTSYS; 266 schedule(); 267 return 0; 268 } 269 270 static void rpc_set_active(struct rpc_task *task) 271 { 272 if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0) 273 return; 274 spin_lock(&rpc_sched_lock); 275 #ifdef RPC_DEBUG 276 task->tk_magic = RPC_TASK_MAGIC_ID; 277 task->tk_pid = rpc_task_id++; 278 #endif 279 /* Add to global list of all tasks */ 280 list_add_tail(&task->tk_task, &all_tasks); 281 spin_unlock(&rpc_sched_lock); 282 } 283 284 /* 285 * Mark an RPC call as having completed by clearing the 'active' bit 286 */ 287 static void rpc_mark_complete_task(struct rpc_task *task) 288 { 289 smp_mb__before_clear_bit(); 290 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 291 smp_mb__after_clear_bit(); 292 wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); 293 } 294 295 /* 296 * Allow callers to wait for completion of an RPC call 297 */ 298 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 299 { 300 if (action == NULL) 301 action = rpc_wait_bit_interruptible; 302 return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 303 action, TASK_INTERRUPTIBLE); 304 } 305 EXPORT_SYMBOL(__rpc_wait_for_completion_task); 306 307 /* 308 * Make an RPC task runnable. 309 * 310 * Note: If the task is ASYNC, this must be called with 311 * the spinlock held to protect the wait queue operation. 312 */ 313 static void rpc_make_runnable(struct rpc_task *task) 314 { 315 BUG_ON(task->tk_timeout_fn); 316 rpc_clear_queued(task); 317 if (rpc_test_and_set_running(task)) 318 return; 319 /* We might have raced */ 320 if (RPC_IS_QUEUED(task)) { 321 rpc_clear_running(task); 322 return; 323 } 324 if (RPC_IS_ASYNC(task)) { 325 int status; 326 327 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 328 status = queue_work(task->tk_workqueue, &task->u.tk_work); 329 if (status < 0) { 330 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 331 task->tk_status = status; 332 return; 333 } 334 } else 335 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 336 } 337 338 /* 339 * Prepare for sleeping on a wait queue. 340 * By always appending tasks to the list we ensure FIFO behavior. 341 * NB: An RPC task will only receive interrupt-driven events as long 342 * as it's on a wait queue. 343 */ 344 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 345 rpc_action action, rpc_action timer) 346 { 347 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", 348 task->tk_pid, rpc_qname(q), jiffies); 349 350 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { 351 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); 352 return; 353 } 354 355 __rpc_add_wait_queue(q, task); 356 357 BUG_ON(task->tk_callback != NULL); 358 task->tk_callback = action; 359 __rpc_add_timer(task, timer); 360 } 361 362 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 363 rpc_action action, rpc_action timer) 364 { 365 /* Mark the task as being activated if so needed */ 366 rpc_set_active(task); 367 368 /* 369 * Protect the queue operations. 370 */ 371 spin_lock_bh(&q->lock); 372 __rpc_sleep_on(q, task, action, timer); 373 spin_unlock_bh(&q->lock); 374 } 375 376 /** 377 * __rpc_do_wake_up_task - wake up a single rpc_task 378 * @task: task to be woken up 379 * 380 * Caller must hold queue->lock, and have cleared the task queued flag. 381 */ 382 static void __rpc_do_wake_up_task(struct rpc_task *task) 383 { 384 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", 385 task->tk_pid, jiffies); 386 387 #ifdef RPC_DEBUG 388 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 389 #endif 390 /* Has the task been executed yet? If not, we cannot wake it up! */ 391 if (!RPC_IS_ACTIVATED(task)) { 392 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 393 return; 394 } 395 396 __rpc_disable_timer(task); 397 __rpc_remove_wait_queue(task); 398 399 rpc_make_runnable(task); 400 401 dprintk("RPC: __rpc_wake_up_task done\n"); 402 } 403 404 /* 405 * Wake up the specified task 406 */ 407 static void __rpc_wake_up_task(struct rpc_task *task) 408 { 409 if (rpc_start_wakeup(task)) { 410 if (RPC_IS_QUEUED(task)) 411 __rpc_do_wake_up_task(task); 412 rpc_finish_wakeup(task); 413 } 414 } 415 416 /* 417 * Default timeout handler if none specified by user 418 */ 419 static void 420 __rpc_default_timer(struct rpc_task *task) 421 { 422 dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid); 423 task->tk_status = -ETIMEDOUT; 424 rpc_wake_up_task(task); 425 } 426 427 /* 428 * Wake up the specified task 429 */ 430 void rpc_wake_up_task(struct rpc_task *task) 431 { 432 rcu_read_lock_bh(); 433 if (rpc_start_wakeup(task)) { 434 if (RPC_IS_QUEUED(task)) { 435 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; 436 437 /* Note: we're already in a bh-safe context */ 438 spin_lock(&queue->lock); 439 __rpc_do_wake_up_task(task); 440 spin_unlock(&queue->lock); 441 } 442 rpc_finish_wakeup(task); 443 } 444 rcu_read_unlock_bh(); 445 } 446 447 /* 448 * Wake up the next task on a priority queue. 449 */ 450 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue) 451 { 452 struct list_head *q; 453 struct rpc_task *task; 454 455 /* 456 * Service a batch of tasks from a single cookie. 457 */ 458 q = &queue->tasks[queue->priority]; 459 if (!list_empty(q)) { 460 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 461 if (queue->cookie == task->tk_cookie) { 462 if (--queue->nr) 463 goto out; 464 list_move_tail(&task->u.tk_wait.list, q); 465 } 466 /* 467 * Check if we need to switch queues. 468 */ 469 if (--queue->count) 470 goto new_cookie; 471 } 472 473 /* 474 * Service the next queue. 475 */ 476 do { 477 if (q == &queue->tasks[0]) 478 q = &queue->tasks[queue->maxpriority]; 479 else 480 q = q - 1; 481 if (!list_empty(q)) { 482 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 483 goto new_queue; 484 } 485 } while (q != &queue->tasks[queue->priority]); 486 487 rpc_reset_waitqueue_priority(queue); 488 return NULL; 489 490 new_queue: 491 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 492 new_cookie: 493 rpc_set_waitqueue_cookie(queue, task->tk_cookie); 494 out: 495 __rpc_wake_up_task(task); 496 return task; 497 } 498 499 /* 500 * Wake up the next task on the wait queue. 501 */ 502 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) 503 { 504 struct rpc_task *task = NULL; 505 506 dprintk("RPC: wake_up_next(%p \"%s\")\n", 507 queue, rpc_qname(queue)); 508 rcu_read_lock_bh(); 509 spin_lock(&queue->lock); 510 if (RPC_IS_PRIORITY(queue)) 511 task = __rpc_wake_up_next_priority(queue); 512 else { 513 task_for_first(task, &queue->tasks[0]) 514 __rpc_wake_up_task(task); 515 } 516 spin_unlock(&queue->lock); 517 rcu_read_unlock_bh(); 518 519 return task; 520 } 521 522 /** 523 * rpc_wake_up - wake up all rpc_tasks 524 * @queue: rpc_wait_queue on which the tasks are sleeping 525 * 526 * Grabs queue->lock 527 */ 528 void rpc_wake_up(struct rpc_wait_queue *queue) 529 { 530 struct rpc_task *task, *next; 531 struct list_head *head; 532 533 rcu_read_lock_bh(); 534 spin_lock(&queue->lock); 535 head = &queue->tasks[queue->maxpriority]; 536 for (;;) { 537 list_for_each_entry_safe(task, next, head, u.tk_wait.list) 538 __rpc_wake_up_task(task); 539 if (head == &queue->tasks[0]) 540 break; 541 head--; 542 } 543 spin_unlock(&queue->lock); 544 rcu_read_unlock_bh(); 545 } 546 547 /** 548 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 549 * @queue: rpc_wait_queue on which the tasks are sleeping 550 * @status: status value to set 551 * 552 * Grabs queue->lock 553 */ 554 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 555 { 556 struct rpc_task *task, *next; 557 struct list_head *head; 558 559 rcu_read_lock_bh(); 560 spin_lock(&queue->lock); 561 head = &queue->tasks[queue->maxpriority]; 562 for (;;) { 563 list_for_each_entry_safe(task, next, head, u.tk_wait.list) { 564 task->tk_status = status; 565 __rpc_wake_up_task(task); 566 } 567 if (head == &queue->tasks[0]) 568 break; 569 head--; 570 } 571 spin_unlock(&queue->lock); 572 rcu_read_unlock_bh(); 573 } 574 575 static void __rpc_atrun(struct rpc_task *task) 576 { 577 rpc_wake_up_task(task); 578 } 579 580 /* 581 * Run a task at a later time 582 */ 583 void rpc_delay(struct rpc_task *task, unsigned long delay) 584 { 585 task->tk_timeout = delay; 586 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 587 } 588 589 /* 590 * Helper to call task->tk_ops->rpc_call_prepare 591 */ 592 static void rpc_prepare_task(struct rpc_task *task) 593 { 594 lock_kernel(); 595 task->tk_ops->rpc_call_prepare(task, task->tk_calldata); 596 unlock_kernel(); 597 } 598 599 /* 600 * Helper that calls task->tk_ops->rpc_call_done if it exists 601 */ 602 void rpc_exit_task(struct rpc_task *task) 603 { 604 task->tk_action = NULL; 605 if (task->tk_ops->rpc_call_done != NULL) { 606 lock_kernel(); 607 task->tk_ops->rpc_call_done(task, task->tk_calldata); 608 unlock_kernel(); 609 if (task->tk_action != NULL) { 610 WARN_ON(RPC_ASSASSINATED(task)); 611 /* Always release the RPC slot and buffer memory */ 612 xprt_release(task); 613 } 614 } 615 } 616 EXPORT_SYMBOL(rpc_exit_task); 617 618 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) 619 { 620 if (ops->rpc_release != NULL) { 621 lock_kernel(); 622 ops->rpc_release(calldata); 623 unlock_kernel(); 624 } 625 } 626 627 /* 628 * This is the RPC `scheduler' (or rather, the finite state machine). 629 */ 630 static void __rpc_execute(struct rpc_task *task) 631 { 632 int status = 0; 633 634 dprintk("RPC: %5u __rpc_execute flags=0x%x\n", 635 task->tk_pid, task->tk_flags); 636 637 BUG_ON(RPC_IS_QUEUED(task)); 638 639 for (;;) { 640 /* 641 * Garbage collection of pending timers... 642 */ 643 rpc_delete_timer(task); 644 645 /* 646 * Execute any pending callback. 647 */ 648 if (RPC_DO_CALLBACK(task)) { 649 /* Define a callback save pointer */ 650 void (*save_callback)(struct rpc_task *); 651 652 /* 653 * If a callback exists, save it, reset it, 654 * call it. 655 * The save is needed to stop from resetting 656 * another callback set within the callback handler 657 * - Dave 658 */ 659 save_callback=task->tk_callback; 660 task->tk_callback=NULL; 661 save_callback(task); 662 } 663 664 /* 665 * Perform the next FSM step. 666 * tk_action may be NULL when the task has been killed 667 * by someone else. 668 */ 669 if (!RPC_IS_QUEUED(task)) { 670 if (task->tk_action == NULL) 671 break; 672 task->tk_action(task); 673 } 674 675 /* 676 * Lockless check for whether task is sleeping or not. 677 */ 678 if (!RPC_IS_QUEUED(task)) 679 continue; 680 rpc_clear_running(task); 681 if (RPC_IS_ASYNC(task)) { 682 /* Careful! we may have raced... */ 683 if (RPC_IS_QUEUED(task)) 684 return; 685 if (rpc_test_and_set_running(task)) 686 return; 687 continue; 688 } 689 690 /* sync task: sleep here */ 691 dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid); 692 /* Note: Caller should be using rpc_clnt_sigmask() */ 693 status = out_of_line_wait_on_bit(&task->tk_runstate, 694 RPC_TASK_QUEUED, rpc_wait_bit_interruptible, 695 TASK_INTERRUPTIBLE); 696 if (status == -ERESTARTSYS) { 697 /* 698 * When a sync task receives a signal, it exits with 699 * -ERESTARTSYS. In order to catch any callbacks that 700 * clean up after sleeping on some queue, we don't 701 * break the loop here, but go around once more. 702 */ 703 dprintk("RPC: %5u got signal\n", task->tk_pid); 704 task->tk_flags |= RPC_TASK_KILLED; 705 rpc_exit(task, -ERESTARTSYS); 706 rpc_wake_up_task(task); 707 } 708 rpc_set_running(task); 709 dprintk("RPC: %5u sync task resuming\n", task->tk_pid); 710 } 711 712 dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status, 713 task->tk_status); 714 /* Release all resources associated with the task */ 715 rpc_release_task(task); 716 } 717 718 /* 719 * User-visible entry point to the scheduler. 720 * 721 * This may be called recursively if e.g. an async NFS task updates 722 * the attributes and finds that dirty pages must be flushed. 723 * NOTE: Upon exit of this function the task is guaranteed to be 724 * released. In particular note that tk_release() will have 725 * been called, so your task memory may have been freed. 726 */ 727 void rpc_execute(struct rpc_task *task) 728 { 729 rpc_set_active(task); 730 rpc_set_running(task); 731 __rpc_execute(task); 732 } 733 734 static void rpc_async_schedule(struct work_struct *work) 735 { 736 __rpc_execute(container_of(work, struct rpc_task, u.tk_work)); 737 } 738 739 /** 740 * rpc_malloc - allocate an RPC buffer 741 * @task: RPC task that will use this buffer 742 * @size: requested byte size 743 * 744 * To prevent rpciod from hanging, this allocator never sleeps, 745 * returning NULL if the request cannot be serviced immediately. 746 * The caller can arrange to sleep in a way that is safe for rpciod. 747 * 748 * Most requests are 'small' (under 2KiB) and can be serviced from a 749 * mempool, ensuring that NFS reads and writes can always proceed, 750 * and that there is good locality of reference for these buffers. 751 * 752 * In order to avoid memory starvation triggering more writebacks of 753 * NFS requests, we avoid using GFP_KERNEL. 754 */ 755 void *rpc_malloc(struct rpc_task *task, size_t size) 756 { 757 size_t *buf; 758 gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT; 759 760 size += sizeof(size_t); 761 if (size <= RPC_BUFFER_MAXSIZE) 762 buf = mempool_alloc(rpc_buffer_mempool, gfp); 763 else 764 buf = kmalloc(size, gfp); 765 *buf = size; 766 dprintk("RPC: %5u allocated buffer of size %u at %p\n", 767 task->tk_pid, size, buf); 768 return (void *) ++buf; 769 } 770 771 /** 772 * rpc_free - free buffer allocated via rpc_malloc 773 * @buffer: buffer to free 774 * 775 */ 776 void rpc_free(void *buffer) 777 { 778 size_t size, *buf = (size_t *) buffer; 779 780 if (!buffer) 781 return; 782 size = *buf; 783 buf--; 784 785 dprintk("RPC: freeing buffer of size %u at %p\n", 786 size, buf); 787 if (size <= RPC_BUFFER_MAXSIZE) 788 mempool_free(buf, rpc_buffer_mempool); 789 else 790 kfree(buf); 791 } 792 793 /* 794 * Creation and deletion of RPC task structures 795 */ 796 void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 797 { 798 memset(task, 0, sizeof(*task)); 799 init_timer(&task->tk_timer); 800 task->tk_timer.data = (unsigned long) task; 801 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 802 atomic_set(&task->tk_count, 1); 803 task->tk_client = clnt; 804 task->tk_flags = flags; 805 task->tk_ops = tk_ops; 806 if (tk_ops->rpc_call_prepare != NULL) 807 task->tk_action = rpc_prepare_task; 808 task->tk_calldata = calldata; 809 810 /* Initialize retry counters */ 811 task->tk_garb_retry = 2; 812 task->tk_cred_retry = 2; 813 814 task->tk_priority = RPC_PRIORITY_NORMAL; 815 task->tk_cookie = (unsigned long)current; 816 817 /* Initialize workqueue for async tasks */ 818 task->tk_workqueue = rpciod_workqueue; 819 820 if (clnt) { 821 atomic_inc(&clnt->cl_users); 822 if (clnt->cl_softrtry) 823 task->tk_flags |= RPC_TASK_SOFT; 824 if (!clnt->cl_intr) 825 task->tk_flags |= RPC_TASK_NOINTR; 826 } 827 828 BUG_ON(task->tk_ops == NULL); 829 830 /* starting timestamp */ 831 task->tk_start = jiffies; 832 833 dprintk("RPC: new task initialized, procpid %u\n", 834 current->pid); 835 } 836 837 static struct rpc_task * 838 rpc_alloc_task(void) 839 { 840 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 841 } 842 843 static void rpc_free_task(struct rcu_head *rcu) 844 { 845 struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); 846 dprintk("RPC: %5u freeing task\n", task->tk_pid); 847 mempool_free(task, rpc_task_mempool); 848 } 849 850 /* 851 * Create a new task for the specified client. We have to 852 * clean up after an allocation failure, as the client may 853 * have specified "oneshot". 854 */ 855 struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) 856 { 857 struct rpc_task *task; 858 859 task = rpc_alloc_task(); 860 if (!task) 861 goto cleanup; 862 863 rpc_init_task(task, clnt, flags, tk_ops, calldata); 864 865 dprintk("RPC: allocated task %p\n", task); 866 task->tk_flags |= RPC_TASK_DYNAMIC; 867 out: 868 return task; 869 870 cleanup: 871 /* Check whether to release the client */ 872 if (clnt) { 873 printk("rpc_new_task: failed, users=%d, oneshot=%d\n", 874 atomic_read(&clnt->cl_users), clnt->cl_oneshot); 875 atomic_inc(&clnt->cl_users); /* pretend we were used ... */ 876 rpc_release_client(clnt); 877 } 878 goto out; 879 } 880 881 882 void rpc_put_task(struct rpc_task *task) 883 { 884 const struct rpc_call_ops *tk_ops = task->tk_ops; 885 void *calldata = task->tk_calldata; 886 887 if (!atomic_dec_and_test(&task->tk_count)) 888 return; 889 /* Release resources */ 890 if (task->tk_rqstp) 891 xprt_release(task); 892 if (task->tk_msg.rpc_cred) 893 rpcauth_unbindcred(task); 894 if (task->tk_client) { 895 rpc_release_client(task->tk_client); 896 task->tk_client = NULL; 897 } 898 if (task->tk_flags & RPC_TASK_DYNAMIC) 899 call_rcu_bh(&task->u.tk_rcu, rpc_free_task); 900 rpc_release_calldata(tk_ops, calldata); 901 } 902 EXPORT_SYMBOL(rpc_put_task); 903 904 static void rpc_release_task(struct rpc_task *task) 905 { 906 #ifdef RPC_DEBUG 907 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 908 #endif 909 dprintk("RPC: %5u release task\n", task->tk_pid); 910 911 /* Remove from global task list */ 912 spin_lock(&rpc_sched_lock); 913 list_del(&task->tk_task); 914 spin_unlock(&rpc_sched_lock); 915 916 BUG_ON (RPC_IS_QUEUED(task)); 917 918 /* Synchronously delete any running timer */ 919 rpc_delete_timer(task); 920 921 #ifdef RPC_DEBUG 922 task->tk_magic = 0; 923 #endif 924 /* Wake up anyone who is waiting for task completion */ 925 rpc_mark_complete_task(task); 926 927 rpc_put_task(task); 928 } 929 930 /** 931 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it 932 * @clnt: pointer to RPC client 933 * @flags: RPC flags 934 * @ops: RPC call ops 935 * @data: user call data 936 */ 937 struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, 938 const struct rpc_call_ops *ops, 939 void *data) 940 { 941 struct rpc_task *task; 942 task = rpc_new_task(clnt, flags, ops, data); 943 if (task == NULL) { 944 rpc_release_calldata(ops, data); 945 return ERR_PTR(-ENOMEM); 946 } 947 atomic_inc(&task->tk_count); 948 rpc_execute(task); 949 return task; 950 } 951 EXPORT_SYMBOL(rpc_run_task); 952 953 /* 954 * Kill all tasks for the given client. 955 * XXX: kill their descendants as well? 956 */ 957 void rpc_killall_tasks(struct rpc_clnt *clnt) 958 { 959 struct rpc_task *rovr; 960 struct list_head *le; 961 962 dprintk("RPC: killing all tasks for client %p\n", clnt); 963 964 /* 965 * Spin lock all_tasks to prevent changes... 966 */ 967 spin_lock(&rpc_sched_lock); 968 alltask_for_each(rovr, le, &all_tasks) { 969 if (! RPC_IS_ACTIVATED(rovr)) 970 continue; 971 if (!clnt || rovr->tk_client == clnt) { 972 rovr->tk_flags |= RPC_TASK_KILLED; 973 rpc_exit(rovr, -EIO); 974 rpc_wake_up_task(rovr); 975 } 976 } 977 spin_unlock(&rpc_sched_lock); 978 } 979 980 static DECLARE_MUTEX_LOCKED(rpciod_running); 981 982 static void rpciod_killall(void) 983 { 984 unsigned long flags; 985 986 while (!list_empty(&all_tasks)) { 987 clear_thread_flag(TIF_SIGPENDING); 988 rpc_killall_tasks(NULL); 989 flush_workqueue(rpciod_workqueue); 990 if (!list_empty(&all_tasks)) { 991 dprintk("RPC: rpciod_killall: waiting for tasks " 992 "to exit\n"); 993 yield(); 994 } 995 } 996 997 spin_lock_irqsave(¤t->sighand->siglock, flags); 998 recalc_sigpending(); 999 spin_unlock_irqrestore(¤t->sighand->siglock, flags); 1000 } 1001 1002 /* 1003 * Start up the rpciod process if it's not already running. 1004 */ 1005 int 1006 rpciod_up(void) 1007 { 1008 struct workqueue_struct *wq; 1009 int error = 0; 1010 1011 mutex_lock(&rpciod_mutex); 1012 dprintk("RPC: rpciod_up: users %u\n", rpciod_users); 1013 rpciod_users++; 1014 if (rpciod_workqueue) 1015 goto out; 1016 /* 1017 * If there's no pid, we should be the first user. 1018 */ 1019 if (rpciod_users > 1) 1020 printk(KERN_WARNING "rpciod_up: no workqueue, %u users??\n", rpciod_users); 1021 /* 1022 * Create the rpciod thread and wait for it to start. 1023 */ 1024 error = -ENOMEM; 1025 wq = create_workqueue("rpciod"); 1026 if (wq == NULL) { 1027 printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); 1028 rpciod_users--; 1029 goto out; 1030 } 1031 rpciod_workqueue = wq; 1032 error = 0; 1033 out: 1034 mutex_unlock(&rpciod_mutex); 1035 return error; 1036 } 1037 1038 void 1039 rpciod_down(void) 1040 { 1041 mutex_lock(&rpciod_mutex); 1042 dprintk("RPC: rpciod_down sema %u\n", rpciod_users); 1043 if (rpciod_users) { 1044 if (--rpciod_users) 1045 goto out; 1046 } else 1047 printk(KERN_WARNING "rpciod_down: no users??\n"); 1048 1049 if (!rpciod_workqueue) { 1050 dprintk("RPC: rpciod_down: Nothing to do!\n"); 1051 goto out; 1052 } 1053 rpciod_killall(); 1054 1055 destroy_workqueue(rpciod_workqueue); 1056 rpciod_workqueue = NULL; 1057 out: 1058 mutex_unlock(&rpciod_mutex); 1059 } 1060 1061 #ifdef RPC_DEBUG 1062 void rpc_show_tasks(void) 1063 { 1064 struct list_head *le; 1065 struct rpc_task *t; 1066 1067 spin_lock(&rpc_sched_lock); 1068 if (list_empty(&all_tasks)) { 1069 spin_unlock(&rpc_sched_lock); 1070 return; 1071 } 1072 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1073 "-rpcwait -action- ---ops--\n"); 1074 alltask_for_each(t, le, &all_tasks) { 1075 const char *rpc_waitq = "none"; 1076 1077 if (RPC_IS_QUEUED(t)) 1078 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1079 1080 printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", 1081 t->tk_pid, 1082 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), 1083 t->tk_flags, t->tk_status, 1084 t->tk_client, 1085 (t->tk_client ? t->tk_client->cl_prog : 0), 1086 t->tk_rqstp, t->tk_timeout, 1087 rpc_waitq, 1088 t->tk_action, t->tk_ops); 1089 } 1090 spin_unlock(&rpc_sched_lock); 1091 } 1092 #endif 1093 1094 void 1095 rpc_destroy_mempool(void) 1096 { 1097 if (rpc_buffer_mempool) 1098 mempool_destroy(rpc_buffer_mempool); 1099 if (rpc_task_mempool) 1100 mempool_destroy(rpc_task_mempool); 1101 if (rpc_task_slabp) 1102 kmem_cache_destroy(rpc_task_slabp); 1103 if (rpc_buffer_slabp) 1104 kmem_cache_destroy(rpc_buffer_slabp); 1105 } 1106 1107 int 1108 rpc_init_mempool(void) 1109 { 1110 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1111 sizeof(struct rpc_task), 1112 0, SLAB_HWCACHE_ALIGN, 1113 NULL, NULL); 1114 if (!rpc_task_slabp) 1115 goto err_nomem; 1116 rpc_buffer_slabp = kmem_cache_create("rpc_buffers", 1117 RPC_BUFFER_MAXSIZE, 1118 0, SLAB_HWCACHE_ALIGN, 1119 NULL, NULL); 1120 if (!rpc_buffer_slabp) 1121 goto err_nomem; 1122 rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, 1123 rpc_task_slabp); 1124 if (!rpc_task_mempool) 1125 goto err_nomem; 1126 rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, 1127 rpc_buffer_slabp); 1128 if (!rpc_buffer_mempool) 1129 goto err_nomem; 1130 return 0; 1131 err_nomem: 1132 rpc_destroy_mempool(); 1133 return -ENOMEM; 1134 } 1135