1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 60 /* 61 * Local variables 62 */ 63 64 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 65 # define RPCDBG_FACILITY RPCDBG_XPRT 66 #endif 67 68 /* 69 * Local functions 70 */ 71 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 72 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 73 static void xprt_destroy(struct rpc_xprt *xprt); 74 static void xprt_request_init(struct rpc_task *task); 75 76 static DEFINE_SPINLOCK(xprt_list_lock); 77 static LIST_HEAD(xprt_list); 78 79 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 80 { 81 unsigned long timeout = jiffies + req->rq_timeout; 82 83 if (time_before(timeout, req->rq_majortimeo)) 84 return timeout; 85 return req->rq_majortimeo; 86 } 87 88 /** 89 * xprt_register_transport - register a transport implementation 90 * @transport: transport to register 91 * 92 * If a transport implementation is loaded as a kernel module, it can 93 * call this interface to make itself known to the RPC client. 94 * 95 * Returns: 96 * 0: transport successfully registered 97 * -EEXIST: transport already registered 98 * -EINVAL: transport module being unloaded 99 */ 100 int xprt_register_transport(struct xprt_class *transport) 101 { 102 struct xprt_class *t; 103 int result; 104 105 result = -EEXIST; 106 spin_lock(&xprt_list_lock); 107 list_for_each_entry(t, &xprt_list, list) { 108 /* don't register the same transport class twice */ 109 if (t->ident == transport->ident) 110 goto out; 111 } 112 113 list_add_tail(&transport->list, &xprt_list); 114 printk(KERN_INFO "RPC: Registered %s transport module.\n", 115 transport->name); 116 result = 0; 117 118 out: 119 spin_unlock(&xprt_list_lock); 120 return result; 121 } 122 EXPORT_SYMBOL_GPL(xprt_register_transport); 123 124 /** 125 * xprt_unregister_transport - unregister a transport implementation 126 * @transport: transport to unregister 127 * 128 * Returns: 129 * 0: transport successfully unregistered 130 * -ENOENT: transport never registered 131 */ 132 int xprt_unregister_transport(struct xprt_class *transport) 133 { 134 struct xprt_class *t; 135 int result; 136 137 result = 0; 138 spin_lock(&xprt_list_lock); 139 list_for_each_entry(t, &xprt_list, list) { 140 if (t == transport) { 141 printk(KERN_INFO 142 "RPC: Unregistered %s transport module.\n", 143 transport->name); 144 list_del_init(&transport->list); 145 goto out; 146 } 147 } 148 result = -ENOENT; 149 150 out: 151 spin_unlock(&xprt_list_lock); 152 return result; 153 } 154 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 155 156 static void 157 xprt_class_release(const struct xprt_class *t) 158 { 159 module_put(t->owner); 160 } 161 162 static const struct xprt_class * 163 xprt_class_find_by_ident_locked(int ident) 164 { 165 const struct xprt_class *t; 166 167 list_for_each_entry(t, &xprt_list, list) { 168 if (t->ident != ident) 169 continue; 170 if (!try_module_get(t->owner)) 171 continue; 172 return t; 173 } 174 return NULL; 175 } 176 177 static const struct xprt_class * 178 xprt_class_find_by_ident(int ident) 179 { 180 const struct xprt_class *t; 181 182 spin_lock(&xprt_list_lock); 183 t = xprt_class_find_by_ident_locked(ident); 184 spin_unlock(&xprt_list_lock); 185 return t; 186 } 187 188 static const struct xprt_class * 189 xprt_class_find_by_netid_locked(const char *netid) 190 { 191 const struct xprt_class *t; 192 unsigned int i; 193 194 list_for_each_entry(t, &xprt_list, list) { 195 for (i = 0; t->netid[i][0] != '\0'; i++) { 196 if (strcmp(t->netid[i], netid) != 0) 197 continue; 198 if (!try_module_get(t->owner)) 199 continue; 200 return t; 201 } 202 } 203 return NULL; 204 } 205 206 static const struct xprt_class * 207 xprt_class_find_by_netid(const char *netid) 208 { 209 const struct xprt_class *t; 210 211 spin_lock(&xprt_list_lock); 212 t = xprt_class_find_by_netid_locked(netid); 213 if (!t) { 214 spin_unlock(&xprt_list_lock); 215 request_module("rpc%s", netid); 216 spin_lock(&xprt_list_lock); 217 t = xprt_class_find_by_netid_locked(netid); 218 } 219 spin_unlock(&xprt_list_lock); 220 return t; 221 } 222 223 /** 224 * xprt_find_transport_ident - convert a netid into a transport identifier 225 * @netid: transport to load 226 * 227 * Returns: 228 * > 0: transport identifier 229 * -ENOENT: transport module not available 230 */ 231 int xprt_find_transport_ident(const char *netid) 232 { 233 const struct xprt_class *t; 234 int ret; 235 236 t = xprt_class_find_by_netid(netid); 237 if (!t) 238 return -ENOENT; 239 ret = t->ident; 240 xprt_class_release(t); 241 return ret; 242 } 243 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 244 245 static void xprt_clear_locked(struct rpc_xprt *xprt) 246 { 247 xprt->snd_task = NULL; 248 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 249 smp_mb__before_atomic(); 250 clear_bit(XPRT_LOCKED, &xprt->state); 251 smp_mb__after_atomic(); 252 } else 253 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 254 } 255 256 /** 257 * xprt_reserve_xprt - serialize write access to transports 258 * @task: task that is requesting access to the transport 259 * @xprt: pointer to the target transport 260 * 261 * This prevents mixing the payload of separate requests, and prevents 262 * transport connects from colliding with writes. No congestion control 263 * is provided. 264 */ 265 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 266 { 267 struct rpc_rqst *req = task->tk_rqstp; 268 269 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 270 if (task == xprt->snd_task) 271 goto out_locked; 272 goto out_sleep; 273 } 274 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 275 goto out_unlock; 276 xprt->snd_task = task; 277 278 out_locked: 279 trace_xprt_reserve_xprt(xprt, task); 280 return 1; 281 282 out_unlock: 283 xprt_clear_locked(xprt); 284 out_sleep: 285 task->tk_status = -EAGAIN; 286 if (RPC_IS_SOFT(task)) 287 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 288 xprt_request_timeout(req)); 289 else 290 rpc_sleep_on(&xprt->sending, task, NULL); 291 return 0; 292 } 293 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 294 295 static bool 296 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 297 { 298 return test_bit(XPRT_CWND_WAIT, &xprt->state); 299 } 300 301 static void 302 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 303 { 304 if (!list_empty(&xprt->xmit_queue)) { 305 /* Peek at head of queue to see if it can make progress */ 306 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 307 rq_xmit)->rq_cong) 308 return; 309 } 310 set_bit(XPRT_CWND_WAIT, &xprt->state); 311 } 312 313 static void 314 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 315 { 316 if (!RPCXPRT_CONGESTED(xprt)) 317 clear_bit(XPRT_CWND_WAIT, &xprt->state); 318 } 319 320 /* 321 * xprt_reserve_xprt_cong - serialize write access to transports 322 * @task: task that is requesting access to the transport 323 * 324 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 325 * integrated into the decision of whether a request is allowed to be 326 * woken up and given access to the transport. 327 * Note that the lock is only granted if we know there are free slots. 328 */ 329 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 330 { 331 struct rpc_rqst *req = task->tk_rqstp; 332 333 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 334 if (task == xprt->snd_task) 335 goto out_locked; 336 goto out_sleep; 337 } 338 if (req == NULL) { 339 xprt->snd_task = task; 340 goto out_locked; 341 } 342 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 343 goto out_unlock; 344 if (!xprt_need_congestion_window_wait(xprt)) { 345 xprt->snd_task = task; 346 goto out_locked; 347 } 348 out_unlock: 349 xprt_clear_locked(xprt); 350 out_sleep: 351 task->tk_status = -EAGAIN; 352 if (RPC_IS_SOFT(task)) 353 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 354 xprt_request_timeout(req)); 355 else 356 rpc_sleep_on(&xprt->sending, task, NULL); 357 return 0; 358 out_locked: 359 trace_xprt_reserve_cong(xprt, task); 360 return 1; 361 } 362 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 363 364 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 int retval; 367 368 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 369 return 1; 370 spin_lock(&xprt->transport_lock); 371 retval = xprt->ops->reserve_xprt(xprt, task); 372 spin_unlock(&xprt->transport_lock); 373 return retval; 374 } 375 376 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 377 { 378 struct rpc_xprt *xprt = data; 379 380 xprt->snd_task = task; 381 return true; 382 } 383 384 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 385 { 386 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 387 return; 388 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 389 goto out_unlock; 390 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 391 __xprt_lock_write_func, xprt)) 392 return; 393 out_unlock: 394 xprt_clear_locked(xprt); 395 } 396 397 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 398 { 399 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 400 return; 401 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 402 goto out_unlock; 403 if (xprt_need_congestion_window_wait(xprt)) 404 goto out_unlock; 405 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 406 __xprt_lock_write_func, xprt)) 407 return; 408 out_unlock: 409 xprt_clear_locked(xprt); 410 } 411 412 /** 413 * xprt_release_xprt - allow other requests to use a transport 414 * @xprt: transport with other tasks potentially waiting 415 * @task: task that is releasing access to the transport 416 * 417 * Note that "task" can be NULL. No congestion control is provided. 418 */ 419 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 420 { 421 if (xprt->snd_task == task) { 422 xprt_clear_locked(xprt); 423 __xprt_lock_write_next(xprt); 424 } 425 trace_xprt_release_xprt(xprt, task); 426 } 427 EXPORT_SYMBOL_GPL(xprt_release_xprt); 428 429 /** 430 * xprt_release_xprt_cong - allow other requests to use a transport 431 * @xprt: transport with other tasks potentially waiting 432 * @task: task that is releasing access to the transport 433 * 434 * Note that "task" can be NULL. Another task is awoken to use the 435 * transport if the transport's congestion window allows it. 436 */ 437 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 438 { 439 if (xprt->snd_task == task) { 440 xprt_clear_locked(xprt); 441 __xprt_lock_write_next_cong(xprt); 442 } 443 trace_xprt_release_cong(xprt, task); 444 } 445 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 446 447 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 448 { 449 if (xprt->snd_task != task) 450 return; 451 spin_lock(&xprt->transport_lock); 452 xprt->ops->release_xprt(xprt, task); 453 spin_unlock(&xprt->transport_lock); 454 } 455 456 /* 457 * Van Jacobson congestion avoidance. Check if the congestion window 458 * overflowed. Put the task to sleep if this is the case. 459 */ 460 static int 461 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 462 { 463 if (req->rq_cong) 464 return 1; 465 trace_xprt_get_cong(xprt, req->rq_task); 466 if (RPCXPRT_CONGESTED(xprt)) { 467 xprt_set_congestion_window_wait(xprt); 468 return 0; 469 } 470 req->rq_cong = 1; 471 xprt->cong += RPC_CWNDSCALE; 472 return 1; 473 } 474 475 /* 476 * Adjust the congestion window, and wake up the next task 477 * that has been sleeping due to congestion 478 */ 479 static void 480 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 481 { 482 if (!req->rq_cong) 483 return; 484 req->rq_cong = 0; 485 xprt->cong -= RPC_CWNDSCALE; 486 xprt_test_and_clear_congestion_window_wait(xprt); 487 trace_xprt_put_cong(xprt, req->rq_task); 488 __xprt_lock_write_next_cong(xprt); 489 } 490 491 /** 492 * xprt_request_get_cong - Request congestion control credits 493 * @xprt: pointer to transport 494 * @req: pointer to RPC request 495 * 496 * Useful for transports that require congestion control. 497 */ 498 bool 499 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 500 { 501 bool ret = false; 502 503 if (req->rq_cong) 504 return true; 505 spin_lock(&xprt->transport_lock); 506 ret = __xprt_get_cong(xprt, req) != 0; 507 spin_unlock(&xprt->transport_lock); 508 return ret; 509 } 510 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 511 512 /** 513 * xprt_release_rqst_cong - housekeeping when request is complete 514 * @task: RPC request that recently completed 515 * 516 * Useful for transports that require congestion control. 517 */ 518 void xprt_release_rqst_cong(struct rpc_task *task) 519 { 520 struct rpc_rqst *req = task->tk_rqstp; 521 522 __xprt_put_cong(req->rq_xprt, req); 523 } 524 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 525 526 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 527 { 528 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 529 __xprt_lock_write_next_cong(xprt); 530 } 531 532 /* 533 * Clear the congestion window wait flag and wake up the next 534 * entry on xprt->sending 535 */ 536 static void 537 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 538 { 539 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 540 spin_lock(&xprt->transport_lock); 541 __xprt_lock_write_next_cong(xprt); 542 spin_unlock(&xprt->transport_lock); 543 } 544 } 545 546 /** 547 * xprt_adjust_cwnd - adjust transport congestion window 548 * @xprt: pointer to xprt 549 * @task: recently completed RPC request used to adjust window 550 * @result: result code of completed RPC request 551 * 552 * The transport code maintains an estimate on the maximum number of out- 553 * standing RPC requests, using a smoothed version of the congestion 554 * avoidance implemented in 44BSD. This is basically the Van Jacobson 555 * congestion algorithm: If a retransmit occurs, the congestion window is 556 * halved; otherwise, it is incremented by 1/cwnd when 557 * 558 * - a reply is received and 559 * - a full number of requests are outstanding and 560 * - the congestion window hasn't been updated recently. 561 */ 562 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 563 { 564 struct rpc_rqst *req = task->tk_rqstp; 565 unsigned long cwnd = xprt->cwnd; 566 567 if (result >= 0 && cwnd <= xprt->cong) { 568 /* The (cwnd >> 1) term makes sure 569 * the result gets rounded properly. */ 570 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 571 if (cwnd > RPC_MAXCWND(xprt)) 572 cwnd = RPC_MAXCWND(xprt); 573 __xprt_lock_write_next_cong(xprt); 574 } else if (result == -ETIMEDOUT) { 575 cwnd >>= 1; 576 if (cwnd < RPC_CWNDSCALE) 577 cwnd = RPC_CWNDSCALE; 578 } 579 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 580 xprt->cong, xprt->cwnd, cwnd); 581 xprt->cwnd = cwnd; 582 __xprt_put_cong(xprt, req); 583 } 584 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 585 586 /** 587 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 588 * @xprt: transport with waiting tasks 589 * @status: result code to plant in each task before waking it 590 * 591 */ 592 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 593 { 594 if (status < 0) 595 rpc_wake_up_status(&xprt->pending, status); 596 else 597 rpc_wake_up(&xprt->pending); 598 } 599 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 600 601 /** 602 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 603 * @xprt: transport 604 * 605 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 606 * we don't in general want to force a socket disconnection due to 607 * an incomplete RPC call transmission. 608 */ 609 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 610 { 611 set_bit(XPRT_WRITE_SPACE, &xprt->state); 612 } 613 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 614 615 static bool 616 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 617 { 618 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 619 __xprt_lock_write_next(xprt); 620 dprintk("RPC: write space: waking waiting task on " 621 "xprt %p\n", xprt); 622 return true; 623 } 624 return false; 625 } 626 627 /** 628 * xprt_write_space - wake the task waiting for transport output buffer space 629 * @xprt: transport with waiting tasks 630 * 631 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 632 */ 633 bool xprt_write_space(struct rpc_xprt *xprt) 634 { 635 bool ret; 636 637 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 638 return false; 639 spin_lock(&xprt->transport_lock); 640 ret = xprt_clear_write_space_locked(xprt); 641 spin_unlock(&xprt->transport_lock); 642 return ret; 643 } 644 EXPORT_SYMBOL_GPL(xprt_write_space); 645 646 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 647 { 648 s64 delta = ktime_to_ns(ktime_get() - abstime); 649 return likely(delta >= 0) ? 650 jiffies - nsecs_to_jiffies(delta) : 651 jiffies + nsecs_to_jiffies(-delta); 652 } 653 654 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 655 { 656 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 657 unsigned long majortimeo = req->rq_timeout; 658 659 if (to->to_exponential) 660 majortimeo <<= to->to_retries; 661 else 662 majortimeo += to->to_increment * to->to_retries; 663 if (majortimeo > to->to_maxval || majortimeo == 0) 664 majortimeo = to->to_maxval; 665 return majortimeo; 666 } 667 668 static void xprt_reset_majortimeo(struct rpc_rqst *req) 669 { 670 req->rq_majortimeo += xprt_calc_majortimeo(req); 671 } 672 673 static void xprt_reset_minortimeo(struct rpc_rqst *req) 674 { 675 req->rq_minortimeo += req->rq_timeout; 676 } 677 678 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 679 { 680 unsigned long time_init; 681 struct rpc_xprt *xprt = req->rq_xprt; 682 683 if (likely(xprt && xprt_connected(xprt))) 684 time_init = jiffies; 685 else 686 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 687 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 688 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 689 req->rq_minortimeo = time_init + req->rq_timeout; 690 } 691 692 /** 693 * xprt_adjust_timeout - adjust timeout values for next retransmit 694 * @req: RPC request containing parameters to use for the adjustment 695 * 696 */ 697 int xprt_adjust_timeout(struct rpc_rqst *req) 698 { 699 struct rpc_xprt *xprt = req->rq_xprt; 700 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 701 int status = 0; 702 703 if (time_before(jiffies, req->rq_majortimeo)) { 704 if (time_before(jiffies, req->rq_minortimeo)) 705 return status; 706 if (to->to_exponential) 707 req->rq_timeout <<= 1; 708 else 709 req->rq_timeout += to->to_increment; 710 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 711 req->rq_timeout = to->to_maxval; 712 req->rq_retries++; 713 } else { 714 req->rq_timeout = to->to_initval; 715 req->rq_retries = 0; 716 xprt_reset_majortimeo(req); 717 /* Reset the RTT counters == "slow start" */ 718 spin_lock(&xprt->transport_lock); 719 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 720 spin_unlock(&xprt->transport_lock); 721 status = -ETIMEDOUT; 722 } 723 xprt_reset_minortimeo(req); 724 725 if (req->rq_timeout == 0) { 726 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 727 req->rq_timeout = 5 * HZ; 728 } 729 return status; 730 } 731 732 static void xprt_autoclose(struct work_struct *work) 733 { 734 struct rpc_xprt *xprt = 735 container_of(work, struct rpc_xprt, task_cleanup); 736 unsigned int pflags = memalloc_nofs_save(); 737 738 trace_xprt_disconnect_auto(xprt); 739 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 740 xprt->ops->close(xprt); 741 xprt_release_write(xprt, NULL); 742 wake_up_bit(&xprt->state, XPRT_LOCKED); 743 memalloc_nofs_restore(pflags); 744 } 745 746 /** 747 * xprt_disconnect_done - mark a transport as disconnected 748 * @xprt: transport to flag for disconnect 749 * 750 */ 751 void xprt_disconnect_done(struct rpc_xprt *xprt) 752 { 753 trace_xprt_disconnect_done(xprt); 754 spin_lock(&xprt->transport_lock); 755 xprt_clear_connected(xprt); 756 xprt_clear_write_space_locked(xprt); 757 xprt_clear_congestion_window_wait_locked(xprt); 758 xprt_wake_pending_tasks(xprt, -ENOTCONN); 759 spin_unlock(&xprt->transport_lock); 760 } 761 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 762 763 /** 764 * xprt_force_disconnect - force a transport to disconnect 765 * @xprt: transport to disconnect 766 * 767 */ 768 void xprt_force_disconnect(struct rpc_xprt *xprt) 769 { 770 trace_xprt_disconnect_force(xprt); 771 772 /* Don't race with the test_bit() in xprt_clear_locked() */ 773 spin_lock(&xprt->transport_lock); 774 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 775 /* Try to schedule an autoclose RPC call */ 776 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 777 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 778 else if (xprt->snd_task) 779 rpc_wake_up_queued_task_set_status(&xprt->pending, 780 xprt->snd_task, -ENOTCONN); 781 spin_unlock(&xprt->transport_lock); 782 } 783 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 784 785 static unsigned int 786 xprt_connect_cookie(struct rpc_xprt *xprt) 787 { 788 return READ_ONCE(xprt->connect_cookie); 789 } 790 791 static bool 792 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 793 { 794 struct rpc_rqst *req = task->tk_rqstp; 795 struct rpc_xprt *xprt = req->rq_xprt; 796 797 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 798 !xprt_connected(xprt); 799 } 800 801 /** 802 * xprt_conditional_disconnect - force a transport to disconnect 803 * @xprt: transport to disconnect 804 * @cookie: 'connection cookie' 805 * 806 * This attempts to break the connection if and only if 'cookie' matches 807 * the current transport 'connection cookie'. It ensures that we don't 808 * try to break the connection more than once when we need to retransmit 809 * a batch of RPC requests. 810 * 811 */ 812 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 813 { 814 /* Don't race with the test_bit() in xprt_clear_locked() */ 815 spin_lock(&xprt->transport_lock); 816 if (cookie != xprt->connect_cookie) 817 goto out; 818 if (test_bit(XPRT_CLOSING, &xprt->state)) 819 goto out; 820 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 821 /* Try to schedule an autoclose RPC call */ 822 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 823 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 824 xprt_wake_pending_tasks(xprt, -EAGAIN); 825 out: 826 spin_unlock(&xprt->transport_lock); 827 } 828 829 static bool 830 xprt_has_timer(const struct rpc_xprt *xprt) 831 { 832 return xprt->idle_timeout != 0; 833 } 834 835 static void 836 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 837 __must_hold(&xprt->transport_lock) 838 { 839 xprt->last_used = jiffies; 840 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 841 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 842 } 843 844 static void 845 xprt_init_autodisconnect(struct timer_list *t) 846 { 847 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 848 849 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 850 return; 851 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 852 xprt->last_used = jiffies; 853 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 854 return; 855 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 856 } 857 858 bool xprt_lock_connect(struct rpc_xprt *xprt, 859 struct rpc_task *task, 860 void *cookie) 861 { 862 bool ret = false; 863 864 spin_lock(&xprt->transport_lock); 865 if (!test_bit(XPRT_LOCKED, &xprt->state)) 866 goto out; 867 if (xprt->snd_task != task) 868 goto out; 869 xprt->snd_task = cookie; 870 ret = true; 871 out: 872 spin_unlock(&xprt->transport_lock); 873 return ret; 874 } 875 876 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 877 { 878 spin_lock(&xprt->transport_lock); 879 if (xprt->snd_task != cookie) 880 goto out; 881 if (!test_bit(XPRT_LOCKED, &xprt->state)) 882 goto out; 883 xprt->snd_task =NULL; 884 xprt->ops->release_xprt(xprt, NULL); 885 xprt_schedule_autodisconnect(xprt); 886 out: 887 spin_unlock(&xprt->transport_lock); 888 wake_up_bit(&xprt->state, XPRT_LOCKED); 889 } 890 891 /** 892 * xprt_connect - schedule a transport connect operation 893 * @task: RPC task that is requesting the connect 894 * 895 */ 896 void xprt_connect(struct rpc_task *task) 897 { 898 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 899 900 trace_xprt_connect(xprt); 901 902 if (!xprt_bound(xprt)) { 903 task->tk_status = -EAGAIN; 904 return; 905 } 906 if (!xprt_lock_write(xprt, task)) 907 return; 908 909 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 910 trace_xprt_disconnect_cleanup(xprt); 911 xprt->ops->close(xprt); 912 } 913 914 if (!xprt_connected(xprt)) { 915 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 916 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 917 xprt_request_timeout(task->tk_rqstp)); 918 919 if (test_bit(XPRT_CLOSING, &xprt->state)) 920 return; 921 if (xprt_test_and_set_connecting(xprt)) 922 return; 923 /* Race breaker */ 924 if (!xprt_connected(xprt)) { 925 xprt->stat.connect_start = jiffies; 926 xprt->ops->connect(xprt, task); 927 } else { 928 xprt_clear_connecting(xprt); 929 task->tk_status = 0; 930 rpc_wake_up_queued_task(&xprt->pending, task); 931 } 932 } 933 xprt_release_write(xprt, task); 934 } 935 936 /** 937 * xprt_reconnect_delay - compute the wait before scheduling a connect 938 * @xprt: transport instance 939 * 940 */ 941 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 942 { 943 unsigned long start, now = jiffies; 944 945 start = xprt->stat.connect_start + xprt->reestablish_timeout; 946 if (time_after(start, now)) 947 return start - now; 948 return 0; 949 } 950 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 951 952 /** 953 * xprt_reconnect_backoff - compute the new re-establish timeout 954 * @xprt: transport instance 955 * @init_to: initial reestablish timeout 956 * 957 */ 958 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 959 { 960 xprt->reestablish_timeout <<= 1; 961 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 962 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 963 if (xprt->reestablish_timeout < init_to) 964 xprt->reestablish_timeout = init_to; 965 } 966 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 967 968 enum xprt_xid_rb_cmp { 969 XID_RB_EQUAL, 970 XID_RB_LEFT, 971 XID_RB_RIGHT, 972 }; 973 static enum xprt_xid_rb_cmp 974 xprt_xid_cmp(__be32 xid1, __be32 xid2) 975 { 976 if (xid1 == xid2) 977 return XID_RB_EQUAL; 978 if ((__force u32)xid1 < (__force u32)xid2) 979 return XID_RB_LEFT; 980 return XID_RB_RIGHT; 981 } 982 983 static struct rpc_rqst * 984 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 985 { 986 struct rb_node *n = xprt->recv_queue.rb_node; 987 struct rpc_rqst *req; 988 989 while (n != NULL) { 990 req = rb_entry(n, struct rpc_rqst, rq_recv); 991 switch (xprt_xid_cmp(xid, req->rq_xid)) { 992 case XID_RB_LEFT: 993 n = n->rb_left; 994 break; 995 case XID_RB_RIGHT: 996 n = n->rb_right; 997 break; 998 case XID_RB_EQUAL: 999 return req; 1000 } 1001 } 1002 return NULL; 1003 } 1004 1005 static void 1006 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1007 { 1008 struct rb_node **p = &xprt->recv_queue.rb_node; 1009 struct rb_node *n = NULL; 1010 struct rpc_rqst *req; 1011 1012 while (*p != NULL) { 1013 n = *p; 1014 req = rb_entry(n, struct rpc_rqst, rq_recv); 1015 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1016 case XID_RB_LEFT: 1017 p = &n->rb_left; 1018 break; 1019 case XID_RB_RIGHT: 1020 p = &n->rb_right; 1021 break; 1022 case XID_RB_EQUAL: 1023 WARN_ON_ONCE(new != req); 1024 return; 1025 } 1026 } 1027 rb_link_node(&new->rq_recv, n, p); 1028 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1029 } 1030 1031 static void 1032 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1033 { 1034 rb_erase(&req->rq_recv, &xprt->recv_queue); 1035 } 1036 1037 /** 1038 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1039 * @xprt: transport on which the original request was transmitted 1040 * @xid: RPC XID of incoming reply 1041 * 1042 * Caller holds xprt->queue_lock. 1043 */ 1044 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1045 { 1046 struct rpc_rqst *entry; 1047 1048 entry = xprt_request_rb_find(xprt, xid); 1049 if (entry != NULL) { 1050 trace_xprt_lookup_rqst(xprt, xid, 0); 1051 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1052 return entry; 1053 } 1054 1055 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1056 ntohl(xid)); 1057 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1058 xprt->stat.bad_xids++; 1059 return NULL; 1060 } 1061 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1062 1063 static bool 1064 xprt_is_pinned_rqst(struct rpc_rqst *req) 1065 { 1066 return atomic_read(&req->rq_pin) != 0; 1067 } 1068 1069 /** 1070 * xprt_pin_rqst - Pin a request on the transport receive list 1071 * @req: Request to pin 1072 * 1073 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1074 * so should be holding xprt->queue_lock. 1075 */ 1076 void xprt_pin_rqst(struct rpc_rqst *req) 1077 { 1078 atomic_inc(&req->rq_pin); 1079 } 1080 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1081 1082 /** 1083 * xprt_unpin_rqst - Unpin a request on the transport receive list 1084 * @req: Request to pin 1085 * 1086 * Caller should be holding xprt->queue_lock. 1087 */ 1088 void xprt_unpin_rqst(struct rpc_rqst *req) 1089 { 1090 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1091 atomic_dec(&req->rq_pin); 1092 return; 1093 } 1094 if (atomic_dec_and_test(&req->rq_pin)) 1095 wake_up_var(&req->rq_pin); 1096 } 1097 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1098 1099 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1100 { 1101 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1102 } 1103 1104 static bool 1105 xprt_request_data_received(struct rpc_task *task) 1106 { 1107 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1108 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1109 } 1110 1111 static bool 1112 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1113 { 1114 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1115 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1116 } 1117 1118 /** 1119 * xprt_request_enqueue_receive - Add an request to the receive queue 1120 * @task: RPC task 1121 * 1122 */ 1123 void 1124 xprt_request_enqueue_receive(struct rpc_task *task) 1125 { 1126 struct rpc_rqst *req = task->tk_rqstp; 1127 struct rpc_xprt *xprt = req->rq_xprt; 1128 1129 if (!xprt_request_need_enqueue_receive(task, req)) 1130 return; 1131 1132 xprt_request_prepare(task->tk_rqstp); 1133 spin_lock(&xprt->queue_lock); 1134 1135 /* Update the softirq receive buffer */ 1136 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1137 sizeof(req->rq_private_buf)); 1138 1139 /* Add request to the receive list */ 1140 xprt_request_rb_insert(xprt, req); 1141 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1142 spin_unlock(&xprt->queue_lock); 1143 1144 /* Turn off autodisconnect */ 1145 del_singleshot_timer_sync(&xprt->timer); 1146 } 1147 1148 /** 1149 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1150 * @task: RPC task 1151 * 1152 * Caller must hold xprt->queue_lock. 1153 */ 1154 static void 1155 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1156 { 1157 struct rpc_rqst *req = task->tk_rqstp; 1158 1159 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1160 xprt_request_rb_remove(req->rq_xprt, req); 1161 } 1162 1163 /** 1164 * xprt_update_rtt - Update RPC RTT statistics 1165 * @task: RPC request that recently completed 1166 * 1167 * Caller holds xprt->queue_lock. 1168 */ 1169 void xprt_update_rtt(struct rpc_task *task) 1170 { 1171 struct rpc_rqst *req = task->tk_rqstp; 1172 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1173 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1174 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1175 1176 if (timer) { 1177 if (req->rq_ntrans == 1) 1178 rpc_update_rtt(rtt, timer, m); 1179 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1180 } 1181 } 1182 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1183 1184 /** 1185 * xprt_complete_rqst - called when reply processing is complete 1186 * @task: RPC request that recently completed 1187 * @copied: actual number of bytes received from the transport 1188 * 1189 * Caller holds xprt->queue_lock. 1190 */ 1191 void xprt_complete_rqst(struct rpc_task *task, int copied) 1192 { 1193 struct rpc_rqst *req = task->tk_rqstp; 1194 struct rpc_xprt *xprt = req->rq_xprt; 1195 1196 xprt->stat.recvs++; 1197 1198 req->rq_private_buf.len = copied; 1199 /* Ensure all writes are done before we update */ 1200 /* req->rq_reply_bytes_recvd */ 1201 smp_wmb(); 1202 req->rq_reply_bytes_recvd = copied; 1203 xprt_request_dequeue_receive_locked(task); 1204 rpc_wake_up_queued_task(&xprt->pending, task); 1205 } 1206 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1207 1208 static void xprt_timer(struct rpc_task *task) 1209 { 1210 struct rpc_rqst *req = task->tk_rqstp; 1211 struct rpc_xprt *xprt = req->rq_xprt; 1212 1213 if (task->tk_status != -ETIMEDOUT) 1214 return; 1215 1216 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1217 if (!req->rq_reply_bytes_recvd) { 1218 if (xprt->ops->timer) 1219 xprt->ops->timer(xprt, task); 1220 } else 1221 task->tk_status = 0; 1222 } 1223 1224 /** 1225 * xprt_wait_for_reply_request_def - wait for reply 1226 * @task: pointer to rpc_task 1227 * 1228 * Set a request's retransmit timeout based on the transport's 1229 * default timeout parameters. Used by transports that don't adjust 1230 * the retransmit timeout based on round-trip time estimation, 1231 * and put the task to sleep on the pending queue. 1232 */ 1233 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1234 { 1235 struct rpc_rqst *req = task->tk_rqstp; 1236 1237 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1238 xprt_request_timeout(req)); 1239 } 1240 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1241 1242 /** 1243 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1244 * @task: pointer to rpc_task 1245 * 1246 * Set a request's retransmit timeout using the RTT estimator, 1247 * and put the task to sleep on the pending queue. 1248 */ 1249 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1250 { 1251 int timer = task->tk_msg.rpc_proc->p_timer; 1252 struct rpc_clnt *clnt = task->tk_client; 1253 struct rpc_rtt *rtt = clnt->cl_rtt; 1254 struct rpc_rqst *req = task->tk_rqstp; 1255 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1256 unsigned long timeout; 1257 1258 timeout = rpc_calc_rto(rtt, timer); 1259 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1260 if (timeout > max_timeout || timeout == 0) 1261 timeout = max_timeout; 1262 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1263 jiffies + timeout); 1264 } 1265 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1266 1267 /** 1268 * xprt_request_wait_receive - wait for the reply to an RPC request 1269 * @task: RPC task about to send a request 1270 * 1271 */ 1272 void xprt_request_wait_receive(struct rpc_task *task) 1273 { 1274 struct rpc_rqst *req = task->tk_rqstp; 1275 struct rpc_xprt *xprt = req->rq_xprt; 1276 1277 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1278 return; 1279 /* 1280 * Sleep on the pending queue if we're expecting a reply. 1281 * The spinlock ensures atomicity between the test of 1282 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1283 */ 1284 spin_lock(&xprt->queue_lock); 1285 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1286 xprt->ops->wait_for_reply_request(task); 1287 /* 1288 * Send an extra queue wakeup call if the 1289 * connection was dropped in case the call to 1290 * rpc_sleep_on() raced. 1291 */ 1292 if (xprt_request_retransmit_after_disconnect(task)) 1293 rpc_wake_up_queued_task_set_status(&xprt->pending, 1294 task, -ENOTCONN); 1295 } 1296 spin_unlock(&xprt->queue_lock); 1297 } 1298 1299 static bool 1300 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1301 { 1302 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1303 } 1304 1305 /** 1306 * xprt_request_enqueue_transmit - queue a task for transmission 1307 * @task: pointer to rpc_task 1308 * 1309 * Add a task to the transmission queue. 1310 */ 1311 void 1312 xprt_request_enqueue_transmit(struct rpc_task *task) 1313 { 1314 struct rpc_rqst *pos, *req = task->tk_rqstp; 1315 struct rpc_xprt *xprt = req->rq_xprt; 1316 1317 if (xprt_request_need_enqueue_transmit(task, req)) { 1318 req->rq_bytes_sent = 0; 1319 spin_lock(&xprt->queue_lock); 1320 /* 1321 * Requests that carry congestion control credits are added 1322 * to the head of the list to avoid starvation issues. 1323 */ 1324 if (req->rq_cong) { 1325 xprt_clear_congestion_window_wait(xprt); 1326 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1327 if (pos->rq_cong) 1328 continue; 1329 /* Note: req is added _before_ pos */ 1330 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1331 INIT_LIST_HEAD(&req->rq_xmit2); 1332 goto out; 1333 } 1334 } else if (RPC_IS_SWAPPER(task)) { 1335 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1336 if (pos->rq_cong || pos->rq_bytes_sent) 1337 continue; 1338 if (RPC_IS_SWAPPER(pos->rq_task)) 1339 continue; 1340 /* Note: req is added _before_ pos */ 1341 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1342 INIT_LIST_HEAD(&req->rq_xmit2); 1343 goto out; 1344 } 1345 } else if (!req->rq_seqno) { 1346 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1347 if (pos->rq_task->tk_owner != task->tk_owner) 1348 continue; 1349 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1350 INIT_LIST_HEAD(&req->rq_xmit); 1351 goto out; 1352 } 1353 } 1354 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1355 INIT_LIST_HEAD(&req->rq_xmit2); 1356 out: 1357 atomic_long_inc(&xprt->xmit_queuelen); 1358 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1359 spin_unlock(&xprt->queue_lock); 1360 } 1361 } 1362 1363 /** 1364 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1365 * @task: pointer to rpc_task 1366 * 1367 * Remove a task from the transmission queue 1368 * Caller must hold xprt->queue_lock 1369 */ 1370 static void 1371 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1372 { 1373 struct rpc_rqst *req = task->tk_rqstp; 1374 1375 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1376 return; 1377 if (!list_empty(&req->rq_xmit)) { 1378 list_del(&req->rq_xmit); 1379 if (!list_empty(&req->rq_xmit2)) { 1380 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1381 struct rpc_rqst, rq_xmit2); 1382 list_del(&req->rq_xmit2); 1383 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1384 } 1385 } else 1386 list_del(&req->rq_xmit2); 1387 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1388 } 1389 1390 /** 1391 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1392 * @task: pointer to rpc_task 1393 * 1394 * Remove a task from the transmission queue 1395 */ 1396 static void 1397 xprt_request_dequeue_transmit(struct rpc_task *task) 1398 { 1399 struct rpc_rqst *req = task->tk_rqstp; 1400 struct rpc_xprt *xprt = req->rq_xprt; 1401 1402 spin_lock(&xprt->queue_lock); 1403 xprt_request_dequeue_transmit_locked(task); 1404 spin_unlock(&xprt->queue_lock); 1405 } 1406 1407 /** 1408 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1409 * @task: pointer to rpc_task 1410 * 1411 * Remove a task from the transmit and receive queues, and ensure that 1412 * it is not pinned by the receive work item. 1413 */ 1414 void 1415 xprt_request_dequeue_xprt(struct rpc_task *task) 1416 { 1417 struct rpc_rqst *req = task->tk_rqstp; 1418 struct rpc_xprt *xprt = req->rq_xprt; 1419 1420 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1421 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1422 xprt_is_pinned_rqst(req)) { 1423 spin_lock(&xprt->queue_lock); 1424 xprt_request_dequeue_transmit_locked(task); 1425 xprt_request_dequeue_receive_locked(task); 1426 while (xprt_is_pinned_rqst(req)) { 1427 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1428 spin_unlock(&xprt->queue_lock); 1429 xprt_wait_on_pinned_rqst(req); 1430 spin_lock(&xprt->queue_lock); 1431 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1432 } 1433 spin_unlock(&xprt->queue_lock); 1434 } 1435 } 1436 1437 /** 1438 * xprt_request_prepare - prepare an encoded request for transport 1439 * @req: pointer to rpc_rqst 1440 * 1441 * Calls into the transport layer to do whatever is needed to prepare 1442 * the request for transmission or receive. 1443 */ 1444 void 1445 xprt_request_prepare(struct rpc_rqst *req) 1446 { 1447 struct rpc_xprt *xprt = req->rq_xprt; 1448 1449 if (xprt->ops->prepare_request) 1450 xprt->ops->prepare_request(req); 1451 } 1452 1453 /** 1454 * xprt_request_need_retransmit - Test if a task needs retransmission 1455 * @task: pointer to rpc_task 1456 * 1457 * Test for whether a connection breakage requires the task to retransmit 1458 */ 1459 bool 1460 xprt_request_need_retransmit(struct rpc_task *task) 1461 { 1462 return xprt_request_retransmit_after_disconnect(task); 1463 } 1464 1465 /** 1466 * xprt_prepare_transmit - reserve the transport before sending a request 1467 * @task: RPC task about to send a request 1468 * 1469 */ 1470 bool xprt_prepare_transmit(struct rpc_task *task) 1471 { 1472 struct rpc_rqst *req = task->tk_rqstp; 1473 struct rpc_xprt *xprt = req->rq_xprt; 1474 1475 if (!xprt_lock_write(xprt, task)) { 1476 /* Race breaker: someone may have transmitted us */ 1477 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1478 rpc_wake_up_queued_task_set_status(&xprt->sending, 1479 task, 0); 1480 return false; 1481 1482 } 1483 return true; 1484 } 1485 1486 void xprt_end_transmit(struct rpc_task *task) 1487 { 1488 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1489 1490 xprt_inject_disconnect(xprt); 1491 xprt_release_write(xprt, task); 1492 } 1493 1494 /** 1495 * xprt_request_transmit - send an RPC request on a transport 1496 * @req: pointer to request to transmit 1497 * @snd_task: RPC task that owns the transport lock 1498 * 1499 * This performs the transmission of a single request. 1500 * Note that if the request is not the same as snd_task, then it 1501 * does need to be pinned. 1502 * Returns '0' on success. 1503 */ 1504 static int 1505 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1506 { 1507 struct rpc_xprt *xprt = req->rq_xprt; 1508 struct rpc_task *task = req->rq_task; 1509 unsigned int connect_cookie; 1510 int is_retrans = RPC_WAS_SENT(task); 1511 int status; 1512 1513 if (!req->rq_bytes_sent) { 1514 if (xprt_request_data_received(task)) { 1515 status = 0; 1516 goto out_dequeue; 1517 } 1518 /* Verify that our message lies in the RPCSEC_GSS window */ 1519 if (rpcauth_xmit_need_reencode(task)) { 1520 status = -EBADMSG; 1521 goto out_dequeue; 1522 } 1523 if (RPC_SIGNALLED(task)) { 1524 status = -ERESTARTSYS; 1525 goto out_dequeue; 1526 } 1527 } 1528 1529 /* 1530 * Update req->rq_ntrans before transmitting to avoid races with 1531 * xprt_update_rtt(), which needs to know that it is recording a 1532 * reply to the first transmission. 1533 */ 1534 req->rq_ntrans++; 1535 1536 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1537 connect_cookie = xprt->connect_cookie; 1538 status = xprt->ops->send_request(req); 1539 if (status != 0) { 1540 req->rq_ntrans--; 1541 trace_xprt_transmit(req, status); 1542 return status; 1543 } 1544 1545 if (is_retrans) { 1546 task->tk_client->cl_stats->rpcretrans++; 1547 trace_xprt_retransmit(req); 1548 } 1549 1550 xprt_inject_disconnect(xprt); 1551 1552 task->tk_flags |= RPC_TASK_SENT; 1553 spin_lock(&xprt->transport_lock); 1554 1555 xprt->stat.sends++; 1556 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1557 xprt->stat.bklog_u += xprt->backlog.qlen; 1558 xprt->stat.sending_u += xprt->sending.qlen; 1559 xprt->stat.pending_u += xprt->pending.qlen; 1560 spin_unlock(&xprt->transport_lock); 1561 1562 req->rq_connect_cookie = connect_cookie; 1563 out_dequeue: 1564 trace_xprt_transmit(req, status); 1565 xprt_request_dequeue_transmit(task); 1566 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1567 return status; 1568 } 1569 1570 /** 1571 * xprt_transmit - send an RPC request on a transport 1572 * @task: controlling RPC task 1573 * 1574 * Attempts to drain the transmit queue. On exit, either the transport 1575 * signalled an error that needs to be handled before transmission can 1576 * resume, or @task finished transmitting, and detected that it already 1577 * received a reply. 1578 */ 1579 void 1580 xprt_transmit(struct rpc_task *task) 1581 { 1582 struct rpc_rqst *next, *req = task->tk_rqstp; 1583 struct rpc_xprt *xprt = req->rq_xprt; 1584 int counter, status; 1585 1586 spin_lock(&xprt->queue_lock); 1587 counter = 0; 1588 while (!list_empty(&xprt->xmit_queue)) { 1589 if (++counter == 20) 1590 break; 1591 next = list_first_entry(&xprt->xmit_queue, 1592 struct rpc_rqst, rq_xmit); 1593 xprt_pin_rqst(next); 1594 spin_unlock(&xprt->queue_lock); 1595 status = xprt_request_transmit(next, task); 1596 if (status == -EBADMSG && next != req) 1597 status = 0; 1598 spin_lock(&xprt->queue_lock); 1599 xprt_unpin_rqst(next); 1600 if (status == 0) { 1601 if (!xprt_request_data_received(task) || 1602 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1603 continue; 1604 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1605 task->tk_status = status; 1606 break; 1607 } 1608 spin_unlock(&xprt->queue_lock); 1609 } 1610 1611 static void xprt_complete_request_init(struct rpc_task *task) 1612 { 1613 if (task->tk_rqstp) 1614 xprt_request_init(task); 1615 } 1616 1617 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1618 { 1619 set_bit(XPRT_CONGESTED, &xprt->state); 1620 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1621 } 1622 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1623 1624 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1625 { 1626 struct rpc_rqst *req = data; 1627 1628 if (task->tk_rqstp == NULL) { 1629 memset(req, 0, sizeof(*req)); /* mark unused */ 1630 task->tk_rqstp = req; 1631 return true; 1632 } 1633 return false; 1634 } 1635 1636 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1637 { 1638 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1639 clear_bit(XPRT_CONGESTED, &xprt->state); 1640 return false; 1641 } 1642 return true; 1643 } 1644 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1645 1646 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1647 { 1648 bool ret = false; 1649 1650 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1651 goto out; 1652 spin_lock(&xprt->reserve_lock); 1653 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1654 xprt_add_backlog(xprt, task); 1655 ret = true; 1656 } 1657 spin_unlock(&xprt->reserve_lock); 1658 out: 1659 return ret; 1660 } 1661 1662 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1663 { 1664 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1665 1666 if (xprt->num_reqs >= xprt->max_reqs) 1667 goto out; 1668 ++xprt->num_reqs; 1669 spin_unlock(&xprt->reserve_lock); 1670 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1671 spin_lock(&xprt->reserve_lock); 1672 if (req != NULL) 1673 goto out; 1674 --xprt->num_reqs; 1675 req = ERR_PTR(-ENOMEM); 1676 out: 1677 return req; 1678 } 1679 1680 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1681 { 1682 if (xprt->num_reqs > xprt->min_reqs) { 1683 --xprt->num_reqs; 1684 kfree(req); 1685 return true; 1686 } 1687 return false; 1688 } 1689 1690 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1691 { 1692 struct rpc_rqst *req; 1693 1694 spin_lock(&xprt->reserve_lock); 1695 if (!list_empty(&xprt->free)) { 1696 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1697 list_del(&req->rq_list); 1698 goto out_init_req; 1699 } 1700 req = xprt_dynamic_alloc_slot(xprt); 1701 if (!IS_ERR(req)) 1702 goto out_init_req; 1703 switch (PTR_ERR(req)) { 1704 case -ENOMEM: 1705 dprintk("RPC: dynamic allocation of request slot " 1706 "failed! Retrying\n"); 1707 task->tk_status = -ENOMEM; 1708 break; 1709 case -EAGAIN: 1710 xprt_add_backlog(xprt, task); 1711 dprintk("RPC: waiting for request slot\n"); 1712 fallthrough; 1713 default: 1714 task->tk_status = -EAGAIN; 1715 } 1716 spin_unlock(&xprt->reserve_lock); 1717 return; 1718 out_init_req: 1719 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1720 xprt->num_reqs); 1721 spin_unlock(&xprt->reserve_lock); 1722 1723 task->tk_status = 0; 1724 task->tk_rqstp = req; 1725 } 1726 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1727 1728 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1729 { 1730 spin_lock(&xprt->reserve_lock); 1731 if (!xprt_wake_up_backlog(xprt, req) && 1732 !xprt_dynamic_free_slot(xprt, req)) { 1733 memset(req, 0, sizeof(*req)); /* mark unused */ 1734 list_add(&req->rq_list, &xprt->free); 1735 } 1736 spin_unlock(&xprt->reserve_lock); 1737 } 1738 EXPORT_SYMBOL_GPL(xprt_free_slot); 1739 1740 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1741 { 1742 struct rpc_rqst *req; 1743 while (!list_empty(&xprt->free)) { 1744 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1745 list_del(&req->rq_list); 1746 kfree(req); 1747 } 1748 } 1749 1750 static DEFINE_IDA(rpc_xprt_ids); 1751 1752 void xprt_cleanup_ids(void) 1753 { 1754 ida_destroy(&rpc_xprt_ids); 1755 } 1756 1757 static int xprt_alloc_id(struct rpc_xprt *xprt) 1758 { 1759 int id; 1760 1761 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1762 if (id < 0) 1763 return id; 1764 1765 xprt->id = id; 1766 return 0; 1767 } 1768 1769 static void xprt_free_id(struct rpc_xprt *xprt) 1770 { 1771 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1772 } 1773 1774 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1775 unsigned int num_prealloc, 1776 unsigned int max_alloc) 1777 { 1778 struct rpc_xprt *xprt; 1779 struct rpc_rqst *req; 1780 int i; 1781 1782 xprt = kzalloc(size, GFP_KERNEL); 1783 if (xprt == NULL) 1784 goto out; 1785 1786 xprt_alloc_id(xprt); 1787 xprt_init(xprt, net); 1788 1789 for (i = 0; i < num_prealloc; i++) { 1790 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1791 if (!req) 1792 goto out_free; 1793 list_add(&req->rq_list, &xprt->free); 1794 } 1795 if (max_alloc > num_prealloc) 1796 xprt->max_reqs = max_alloc; 1797 else 1798 xprt->max_reqs = num_prealloc; 1799 xprt->min_reqs = num_prealloc; 1800 xprt->num_reqs = num_prealloc; 1801 1802 return xprt; 1803 1804 out_free: 1805 xprt_free(xprt); 1806 out: 1807 return NULL; 1808 } 1809 EXPORT_SYMBOL_GPL(xprt_alloc); 1810 1811 void xprt_free(struct rpc_xprt *xprt) 1812 { 1813 put_net(xprt->xprt_net); 1814 xprt_free_all_slots(xprt); 1815 xprt_free_id(xprt); 1816 rpc_sysfs_xprt_destroy(xprt); 1817 kfree_rcu(xprt, rcu); 1818 } 1819 EXPORT_SYMBOL_GPL(xprt_free); 1820 1821 static void 1822 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1823 { 1824 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1825 } 1826 1827 static __be32 1828 xprt_alloc_xid(struct rpc_xprt *xprt) 1829 { 1830 __be32 xid; 1831 1832 spin_lock(&xprt->reserve_lock); 1833 xid = (__force __be32)xprt->xid++; 1834 spin_unlock(&xprt->reserve_lock); 1835 return xid; 1836 } 1837 1838 static void 1839 xprt_init_xid(struct rpc_xprt *xprt) 1840 { 1841 xprt->xid = prandom_u32(); 1842 } 1843 1844 static void 1845 xprt_request_init(struct rpc_task *task) 1846 { 1847 struct rpc_xprt *xprt = task->tk_xprt; 1848 struct rpc_rqst *req = task->tk_rqstp; 1849 1850 req->rq_task = task; 1851 req->rq_xprt = xprt; 1852 req->rq_buffer = NULL; 1853 req->rq_xid = xprt_alloc_xid(xprt); 1854 xprt_init_connect_cookie(req, xprt); 1855 req->rq_snd_buf.len = 0; 1856 req->rq_snd_buf.buflen = 0; 1857 req->rq_rcv_buf.len = 0; 1858 req->rq_rcv_buf.buflen = 0; 1859 req->rq_snd_buf.bvec = NULL; 1860 req->rq_rcv_buf.bvec = NULL; 1861 req->rq_release_snd_buf = NULL; 1862 xprt_init_majortimeo(task, req); 1863 1864 trace_xprt_reserve(req); 1865 } 1866 1867 static void 1868 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1869 { 1870 xprt->ops->alloc_slot(xprt, task); 1871 if (task->tk_rqstp != NULL) 1872 xprt_request_init(task); 1873 } 1874 1875 /** 1876 * xprt_reserve - allocate an RPC request slot 1877 * @task: RPC task requesting a slot allocation 1878 * 1879 * If the transport is marked as being congested, or if no more 1880 * slots are available, place the task on the transport's 1881 * backlog queue. 1882 */ 1883 void xprt_reserve(struct rpc_task *task) 1884 { 1885 struct rpc_xprt *xprt = task->tk_xprt; 1886 1887 task->tk_status = 0; 1888 if (task->tk_rqstp != NULL) 1889 return; 1890 1891 task->tk_status = -EAGAIN; 1892 if (!xprt_throttle_congested(xprt, task)) 1893 xprt_do_reserve(xprt, task); 1894 } 1895 1896 /** 1897 * xprt_retry_reserve - allocate an RPC request slot 1898 * @task: RPC task requesting a slot allocation 1899 * 1900 * If no more slots are available, place the task on the transport's 1901 * backlog queue. 1902 * Note that the only difference with xprt_reserve is that we now 1903 * ignore the value of the XPRT_CONGESTED flag. 1904 */ 1905 void xprt_retry_reserve(struct rpc_task *task) 1906 { 1907 struct rpc_xprt *xprt = task->tk_xprt; 1908 1909 task->tk_status = 0; 1910 if (task->tk_rqstp != NULL) 1911 return; 1912 1913 task->tk_status = -EAGAIN; 1914 xprt_do_reserve(xprt, task); 1915 } 1916 1917 /** 1918 * xprt_release - release an RPC request slot 1919 * @task: task which is finished with the slot 1920 * 1921 */ 1922 void xprt_release(struct rpc_task *task) 1923 { 1924 struct rpc_xprt *xprt; 1925 struct rpc_rqst *req = task->tk_rqstp; 1926 1927 if (req == NULL) { 1928 if (task->tk_client) { 1929 xprt = task->tk_xprt; 1930 xprt_release_write(xprt, task); 1931 } 1932 return; 1933 } 1934 1935 xprt = req->rq_xprt; 1936 xprt_request_dequeue_xprt(task); 1937 spin_lock(&xprt->transport_lock); 1938 xprt->ops->release_xprt(xprt, task); 1939 if (xprt->ops->release_request) 1940 xprt->ops->release_request(task); 1941 xprt_schedule_autodisconnect(xprt); 1942 spin_unlock(&xprt->transport_lock); 1943 if (req->rq_buffer) 1944 xprt->ops->buf_free(task); 1945 xdr_free_bvec(&req->rq_rcv_buf); 1946 xdr_free_bvec(&req->rq_snd_buf); 1947 if (req->rq_cred != NULL) 1948 put_rpccred(req->rq_cred); 1949 if (req->rq_release_snd_buf) 1950 req->rq_release_snd_buf(req); 1951 1952 task->tk_rqstp = NULL; 1953 if (likely(!bc_prealloc(req))) 1954 xprt->ops->free_slot(xprt, req); 1955 else 1956 xprt_free_bc_request(req); 1957 } 1958 1959 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1960 void 1961 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1962 { 1963 struct xdr_buf *xbufp = &req->rq_snd_buf; 1964 1965 task->tk_rqstp = req; 1966 req->rq_task = task; 1967 xprt_init_connect_cookie(req, req->rq_xprt); 1968 /* 1969 * Set up the xdr_buf length. 1970 * This also indicates that the buffer is XDR encoded already. 1971 */ 1972 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1973 xbufp->tail[0].iov_len; 1974 } 1975 #endif 1976 1977 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1978 { 1979 kref_init(&xprt->kref); 1980 1981 spin_lock_init(&xprt->transport_lock); 1982 spin_lock_init(&xprt->reserve_lock); 1983 spin_lock_init(&xprt->queue_lock); 1984 1985 INIT_LIST_HEAD(&xprt->free); 1986 xprt->recv_queue = RB_ROOT; 1987 INIT_LIST_HEAD(&xprt->xmit_queue); 1988 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1989 spin_lock_init(&xprt->bc_pa_lock); 1990 INIT_LIST_HEAD(&xprt->bc_pa_list); 1991 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1992 INIT_LIST_HEAD(&xprt->xprt_switch); 1993 1994 xprt->last_used = jiffies; 1995 xprt->cwnd = RPC_INITCWND; 1996 xprt->bind_index = 0; 1997 1998 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1999 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2000 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2001 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2002 2003 xprt_init_xid(xprt); 2004 2005 xprt->xprt_net = get_net(net); 2006 } 2007 2008 /** 2009 * xprt_create_transport - create an RPC transport 2010 * @args: rpc transport creation arguments 2011 * 2012 */ 2013 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2014 { 2015 struct rpc_xprt *xprt; 2016 const struct xprt_class *t; 2017 2018 t = xprt_class_find_by_ident(args->ident); 2019 if (!t) { 2020 dprintk("RPC: transport (%d) not supported\n", args->ident); 2021 return ERR_PTR(-EIO); 2022 } 2023 2024 xprt = t->setup(args); 2025 xprt_class_release(t); 2026 2027 if (IS_ERR(xprt)) 2028 goto out; 2029 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2030 xprt->idle_timeout = 0; 2031 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2032 if (xprt_has_timer(xprt)) 2033 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2034 else 2035 timer_setup(&xprt->timer, NULL, 0); 2036 2037 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2038 xprt_destroy(xprt); 2039 return ERR_PTR(-EINVAL); 2040 } 2041 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2042 if (xprt->servername == NULL) { 2043 xprt_destroy(xprt); 2044 return ERR_PTR(-ENOMEM); 2045 } 2046 2047 rpc_xprt_debugfs_register(xprt); 2048 2049 trace_xprt_create(xprt); 2050 out: 2051 return xprt; 2052 } 2053 2054 static void xprt_destroy_cb(struct work_struct *work) 2055 { 2056 struct rpc_xprt *xprt = 2057 container_of(work, struct rpc_xprt, task_cleanup); 2058 2059 trace_xprt_destroy(xprt); 2060 2061 rpc_xprt_debugfs_unregister(xprt); 2062 rpc_destroy_wait_queue(&xprt->binding); 2063 rpc_destroy_wait_queue(&xprt->pending); 2064 rpc_destroy_wait_queue(&xprt->sending); 2065 rpc_destroy_wait_queue(&xprt->backlog); 2066 kfree(xprt->servername); 2067 /* 2068 * Destroy any existing back channel 2069 */ 2070 xprt_destroy_backchannel(xprt, UINT_MAX); 2071 2072 /* 2073 * Tear down transport state and free the rpc_xprt 2074 */ 2075 xprt->ops->destroy(xprt); 2076 } 2077 2078 /** 2079 * xprt_destroy - destroy an RPC transport, killing off all requests. 2080 * @xprt: transport to destroy 2081 * 2082 */ 2083 static void xprt_destroy(struct rpc_xprt *xprt) 2084 { 2085 /* 2086 * Exclude transport connect/disconnect handlers and autoclose 2087 */ 2088 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2089 2090 del_timer_sync(&xprt->timer); 2091 2092 /* 2093 * Destroy sockets etc from the system workqueue so they can 2094 * safely flush receive work running on rpciod. 2095 */ 2096 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2097 schedule_work(&xprt->task_cleanup); 2098 } 2099 2100 static void xprt_destroy_kref(struct kref *kref) 2101 { 2102 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2103 } 2104 2105 /** 2106 * xprt_get - return a reference to an RPC transport. 2107 * @xprt: pointer to the transport 2108 * 2109 */ 2110 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2111 { 2112 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2113 return xprt; 2114 return NULL; 2115 } 2116 EXPORT_SYMBOL_GPL(xprt_get); 2117 2118 /** 2119 * xprt_put - release a reference to an RPC transport. 2120 * @xprt: pointer to the transport 2121 * 2122 */ 2123 void xprt_put(struct rpc_xprt *xprt) 2124 { 2125 if (xprt != NULL) 2126 kref_put(&xprt->kref, xprt_destroy_kref); 2127 } 2128 EXPORT_SYMBOL_GPL(xprt_put); 2129