1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 #include "fail.h" 60 61 /* 62 * Local variables 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_XPRT 67 #endif 68 69 /* 70 * Local functions 71 */ 72 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 73 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 74 static void xprt_destroy(struct rpc_xprt *xprt); 75 static void xprt_request_init(struct rpc_task *task); 76 static int xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf); 77 78 static DEFINE_SPINLOCK(xprt_list_lock); 79 static LIST_HEAD(xprt_list); 80 81 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 82 { 83 unsigned long timeout = jiffies + req->rq_timeout; 84 85 if (time_before(timeout, req->rq_majortimeo)) 86 return timeout; 87 return req->rq_majortimeo; 88 } 89 90 /** 91 * xprt_register_transport - register a transport implementation 92 * @transport: transport to register 93 * 94 * If a transport implementation is loaded as a kernel module, it can 95 * call this interface to make itself known to the RPC client. 96 * 97 * Returns: 98 * 0: transport successfully registered 99 * -EEXIST: transport already registered 100 * -EINVAL: transport module being unloaded 101 */ 102 int xprt_register_transport(struct xprt_class *transport) 103 { 104 struct xprt_class *t; 105 int result; 106 107 result = -EEXIST; 108 spin_lock(&xprt_list_lock); 109 list_for_each_entry(t, &xprt_list, list) { 110 /* don't register the same transport class twice */ 111 if (t->ident == transport->ident) 112 goto out; 113 } 114 115 list_add_tail(&transport->list, &xprt_list); 116 printk(KERN_INFO "RPC: Registered %s transport module.\n", 117 transport->name); 118 result = 0; 119 120 out: 121 spin_unlock(&xprt_list_lock); 122 return result; 123 } 124 EXPORT_SYMBOL_GPL(xprt_register_transport); 125 126 /** 127 * xprt_unregister_transport - unregister a transport implementation 128 * @transport: transport to unregister 129 * 130 * Returns: 131 * 0: transport successfully unregistered 132 * -ENOENT: transport never registered 133 */ 134 int xprt_unregister_transport(struct xprt_class *transport) 135 { 136 struct xprt_class *t; 137 int result; 138 139 result = 0; 140 spin_lock(&xprt_list_lock); 141 list_for_each_entry(t, &xprt_list, list) { 142 if (t == transport) { 143 printk(KERN_INFO 144 "RPC: Unregistered %s transport module.\n", 145 transport->name); 146 list_del_init(&transport->list); 147 goto out; 148 } 149 } 150 result = -ENOENT; 151 152 out: 153 spin_unlock(&xprt_list_lock); 154 return result; 155 } 156 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 157 158 static void 159 xprt_class_release(const struct xprt_class *t) 160 { 161 module_put(t->owner); 162 } 163 164 static const struct xprt_class * 165 xprt_class_find_by_ident_locked(int ident) 166 { 167 const struct xprt_class *t; 168 169 list_for_each_entry(t, &xprt_list, list) { 170 if (t->ident != ident) 171 continue; 172 if (!try_module_get(t->owner)) 173 continue; 174 return t; 175 } 176 return NULL; 177 } 178 179 static const struct xprt_class * 180 xprt_class_find_by_ident(int ident) 181 { 182 const struct xprt_class *t; 183 184 spin_lock(&xprt_list_lock); 185 t = xprt_class_find_by_ident_locked(ident); 186 spin_unlock(&xprt_list_lock); 187 return t; 188 } 189 190 static const struct xprt_class * 191 xprt_class_find_by_netid_locked(const char *netid) 192 { 193 const struct xprt_class *t; 194 unsigned int i; 195 196 list_for_each_entry(t, &xprt_list, list) { 197 for (i = 0; t->netid[i][0] != '\0'; i++) { 198 if (strcmp(t->netid[i], netid) != 0) 199 continue; 200 if (!try_module_get(t->owner)) 201 continue; 202 return t; 203 } 204 } 205 return NULL; 206 } 207 208 static const struct xprt_class * 209 xprt_class_find_by_netid(const char *netid) 210 { 211 const struct xprt_class *t; 212 213 spin_lock(&xprt_list_lock); 214 t = xprt_class_find_by_netid_locked(netid); 215 if (!t) { 216 spin_unlock(&xprt_list_lock); 217 request_module("rpc%s", netid); 218 spin_lock(&xprt_list_lock); 219 t = xprt_class_find_by_netid_locked(netid); 220 } 221 spin_unlock(&xprt_list_lock); 222 return t; 223 } 224 225 /** 226 * xprt_find_transport_ident - convert a netid into a transport identifier 227 * @netid: transport to load 228 * 229 * Returns: 230 * > 0: transport identifier 231 * -ENOENT: transport module not available 232 */ 233 int xprt_find_transport_ident(const char *netid) 234 { 235 const struct xprt_class *t; 236 int ret; 237 238 t = xprt_class_find_by_netid(netid); 239 if (!t) 240 return -ENOENT; 241 ret = t->ident; 242 xprt_class_release(t); 243 return ret; 244 } 245 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 246 247 static void xprt_clear_locked(struct rpc_xprt *xprt) 248 { 249 xprt->snd_task = NULL; 250 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) 251 clear_bit_unlock(XPRT_LOCKED, &xprt->state); 252 else 253 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 254 } 255 256 /** 257 * xprt_reserve_xprt - serialize write access to transports 258 * @task: task that is requesting access to the transport 259 * @xprt: pointer to the target transport 260 * 261 * This prevents mixing the payload of separate requests, and prevents 262 * transport connects from colliding with writes. No congestion control 263 * is provided. 264 */ 265 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 266 { 267 struct rpc_rqst *req = task->tk_rqstp; 268 269 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 270 if (task == xprt->snd_task) 271 goto out_locked; 272 goto out_sleep; 273 } 274 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 275 goto out_unlock; 276 xprt->snd_task = task; 277 278 out_locked: 279 trace_xprt_reserve_xprt(xprt, task); 280 return 1; 281 282 out_unlock: 283 xprt_clear_locked(xprt); 284 out_sleep: 285 task->tk_status = -EAGAIN; 286 if (RPC_IS_SOFT(task) || RPC_IS_SOFTCONN(task)) 287 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 288 xprt_request_timeout(req)); 289 else 290 rpc_sleep_on(&xprt->sending, task, NULL); 291 return 0; 292 } 293 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 294 295 static bool 296 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 297 { 298 return test_bit(XPRT_CWND_WAIT, &xprt->state); 299 } 300 301 static void 302 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 303 { 304 if (!list_empty(&xprt->xmit_queue)) { 305 /* Peek at head of queue to see if it can make progress */ 306 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 307 rq_xmit)->rq_cong) 308 return; 309 } 310 set_bit(XPRT_CWND_WAIT, &xprt->state); 311 } 312 313 static void 314 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 315 { 316 if (!RPCXPRT_CONGESTED(xprt)) 317 clear_bit(XPRT_CWND_WAIT, &xprt->state); 318 } 319 320 /* 321 * xprt_reserve_xprt_cong - serialize write access to transports 322 * @task: task that is requesting access to the transport 323 * 324 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 325 * integrated into the decision of whether a request is allowed to be 326 * woken up and given access to the transport. 327 * Note that the lock is only granted if we know there are free slots. 328 */ 329 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 330 { 331 struct rpc_rqst *req = task->tk_rqstp; 332 333 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 334 if (task == xprt->snd_task) 335 goto out_locked; 336 goto out_sleep; 337 } 338 if (req == NULL) { 339 xprt->snd_task = task; 340 goto out_locked; 341 } 342 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 343 goto out_unlock; 344 if (!xprt_need_congestion_window_wait(xprt)) { 345 xprt->snd_task = task; 346 goto out_locked; 347 } 348 out_unlock: 349 xprt_clear_locked(xprt); 350 out_sleep: 351 task->tk_status = -EAGAIN; 352 if (RPC_IS_SOFT(task) || RPC_IS_SOFTCONN(task)) 353 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 354 xprt_request_timeout(req)); 355 else 356 rpc_sleep_on(&xprt->sending, task, NULL); 357 return 0; 358 out_locked: 359 trace_xprt_reserve_cong(xprt, task); 360 return 1; 361 } 362 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 363 364 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 int retval; 367 368 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 369 return 1; 370 spin_lock(&xprt->transport_lock); 371 retval = xprt->ops->reserve_xprt(xprt, task); 372 spin_unlock(&xprt->transport_lock); 373 return retval; 374 } 375 376 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 377 { 378 struct rpc_xprt *xprt = data; 379 380 xprt->snd_task = task; 381 return true; 382 } 383 384 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 385 { 386 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 387 return; 388 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 389 goto out_unlock; 390 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 391 __xprt_lock_write_func, xprt)) 392 return; 393 out_unlock: 394 xprt_clear_locked(xprt); 395 } 396 397 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 398 { 399 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 400 return; 401 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 402 goto out_unlock; 403 if (xprt_need_congestion_window_wait(xprt)) 404 goto out_unlock; 405 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 406 __xprt_lock_write_func, xprt)) 407 return; 408 out_unlock: 409 xprt_clear_locked(xprt); 410 } 411 412 /** 413 * xprt_release_xprt - allow other requests to use a transport 414 * @xprt: transport with other tasks potentially waiting 415 * @task: task that is releasing access to the transport 416 * 417 * Note that "task" can be NULL. No congestion control is provided. 418 */ 419 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 420 { 421 if (xprt->snd_task == task) { 422 xprt_clear_locked(xprt); 423 __xprt_lock_write_next(xprt); 424 } 425 trace_xprt_release_xprt(xprt, task); 426 } 427 EXPORT_SYMBOL_GPL(xprt_release_xprt); 428 429 /** 430 * xprt_release_xprt_cong - allow other requests to use a transport 431 * @xprt: transport with other tasks potentially waiting 432 * @task: task that is releasing access to the transport 433 * 434 * Note that "task" can be NULL. Another task is awoken to use the 435 * transport if the transport's congestion window allows it. 436 */ 437 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 438 { 439 if (xprt->snd_task == task) { 440 xprt_clear_locked(xprt); 441 __xprt_lock_write_next_cong(xprt); 442 } 443 trace_xprt_release_cong(xprt, task); 444 } 445 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 446 447 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 448 { 449 if (xprt->snd_task != task) 450 return; 451 spin_lock(&xprt->transport_lock); 452 xprt->ops->release_xprt(xprt, task); 453 spin_unlock(&xprt->transport_lock); 454 } 455 456 /* 457 * Van Jacobson congestion avoidance. Check if the congestion window 458 * overflowed. Put the task to sleep if this is the case. 459 */ 460 static int 461 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 462 { 463 if (req->rq_cong) 464 return 1; 465 trace_xprt_get_cong(xprt, req->rq_task); 466 if (RPCXPRT_CONGESTED(xprt)) { 467 xprt_set_congestion_window_wait(xprt); 468 return 0; 469 } 470 req->rq_cong = 1; 471 xprt->cong += RPC_CWNDSCALE; 472 return 1; 473 } 474 475 /* 476 * Adjust the congestion window, and wake up the next task 477 * that has been sleeping due to congestion 478 */ 479 static void 480 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 481 { 482 if (!req->rq_cong) 483 return; 484 req->rq_cong = 0; 485 xprt->cong -= RPC_CWNDSCALE; 486 xprt_test_and_clear_congestion_window_wait(xprt); 487 trace_xprt_put_cong(xprt, req->rq_task); 488 __xprt_lock_write_next_cong(xprt); 489 } 490 491 /** 492 * xprt_request_get_cong - Request congestion control credits 493 * @xprt: pointer to transport 494 * @req: pointer to RPC request 495 * 496 * Useful for transports that require congestion control. 497 */ 498 bool 499 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 500 { 501 bool ret = false; 502 503 if (req->rq_cong) 504 return true; 505 spin_lock(&xprt->transport_lock); 506 ret = __xprt_get_cong(xprt, req) != 0; 507 spin_unlock(&xprt->transport_lock); 508 return ret; 509 } 510 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 511 512 /** 513 * xprt_release_rqst_cong - housekeeping when request is complete 514 * @task: RPC request that recently completed 515 * 516 * Useful for transports that require congestion control. 517 */ 518 void xprt_release_rqst_cong(struct rpc_task *task) 519 { 520 struct rpc_rqst *req = task->tk_rqstp; 521 522 __xprt_put_cong(req->rq_xprt, req); 523 } 524 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 525 526 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 527 { 528 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 529 __xprt_lock_write_next_cong(xprt); 530 } 531 532 /* 533 * Clear the congestion window wait flag and wake up the next 534 * entry on xprt->sending 535 */ 536 static void 537 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 538 { 539 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 540 spin_lock(&xprt->transport_lock); 541 __xprt_lock_write_next_cong(xprt); 542 spin_unlock(&xprt->transport_lock); 543 } 544 } 545 546 /** 547 * xprt_adjust_cwnd - adjust transport congestion window 548 * @xprt: pointer to xprt 549 * @task: recently completed RPC request used to adjust window 550 * @result: result code of completed RPC request 551 * 552 * The transport code maintains an estimate on the maximum number of out- 553 * standing RPC requests, using a smoothed version of the congestion 554 * avoidance implemented in 44BSD. This is basically the Van Jacobson 555 * congestion algorithm: If a retransmit occurs, the congestion window is 556 * halved; otherwise, it is incremented by 1/cwnd when 557 * 558 * - a reply is received and 559 * - a full number of requests are outstanding and 560 * - the congestion window hasn't been updated recently. 561 */ 562 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 563 { 564 struct rpc_rqst *req = task->tk_rqstp; 565 unsigned long cwnd = xprt->cwnd; 566 567 if (result >= 0 && cwnd <= xprt->cong) { 568 /* The (cwnd >> 1) term makes sure 569 * the result gets rounded properly. */ 570 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 571 if (cwnd > RPC_MAXCWND(xprt)) 572 cwnd = RPC_MAXCWND(xprt); 573 __xprt_lock_write_next_cong(xprt); 574 } else if (result == -ETIMEDOUT) { 575 cwnd >>= 1; 576 if (cwnd < RPC_CWNDSCALE) 577 cwnd = RPC_CWNDSCALE; 578 } 579 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 580 xprt->cong, xprt->cwnd, cwnd); 581 xprt->cwnd = cwnd; 582 __xprt_put_cong(xprt, req); 583 } 584 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 585 586 /** 587 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 588 * @xprt: transport with waiting tasks 589 * @status: result code to plant in each task before waking it 590 * 591 */ 592 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 593 { 594 if (status < 0) 595 rpc_wake_up_status(&xprt->pending, status); 596 else 597 rpc_wake_up(&xprt->pending); 598 } 599 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 600 601 /** 602 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 603 * @xprt: transport 604 * 605 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 606 * we don't in general want to force a socket disconnection due to 607 * an incomplete RPC call transmission. 608 */ 609 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 610 { 611 set_bit(XPRT_WRITE_SPACE, &xprt->state); 612 } 613 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 614 615 static bool 616 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 617 { 618 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 619 __xprt_lock_write_next(xprt); 620 dprintk("RPC: write space: waking waiting task on " 621 "xprt %p\n", xprt); 622 return true; 623 } 624 return false; 625 } 626 627 /** 628 * xprt_write_space - wake the task waiting for transport output buffer space 629 * @xprt: transport with waiting tasks 630 * 631 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 632 */ 633 bool xprt_write_space(struct rpc_xprt *xprt) 634 { 635 bool ret; 636 637 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 638 return false; 639 spin_lock(&xprt->transport_lock); 640 ret = xprt_clear_write_space_locked(xprt); 641 spin_unlock(&xprt->transport_lock); 642 return ret; 643 } 644 EXPORT_SYMBOL_GPL(xprt_write_space); 645 646 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 647 { 648 s64 delta = ktime_to_ns(ktime_get() - abstime); 649 return likely(delta >= 0) ? 650 jiffies - nsecs_to_jiffies(delta) : 651 jiffies + nsecs_to_jiffies(-delta); 652 } 653 654 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req, 655 const struct rpc_timeout *to) 656 { 657 unsigned long majortimeo = req->rq_timeout; 658 659 if (to->to_exponential) 660 majortimeo <<= to->to_retries; 661 else 662 majortimeo += to->to_increment * to->to_retries; 663 if (majortimeo > to->to_maxval || majortimeo == 0) 664 majortimeo = to->to_maxval; 665 return majortimeo; 666 } 667 668 static void xprt_reset_majortimeo(struct rpc_rqst *req, 669 const struct rpc_timeout *to) 670 { 671 req->rq_majortimeo += xprt_calc_majortimeo(req, to); 672 } 673 674 static void xprt_reset_minortimeo(struct rpc_rqst *req) 675 { 676 req->rq_minortimeo += req->rq_timeout; 677 } 678 679 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req, 680 const struct rpc_timeout *to) 681 { 682 unsigned long time_init; 683 struct rpc_xprt *xprt = req->rq_xprt; 684 685 if (likely(xprt && xprt_connected(xprt))) 686 time_init = jiffies; 687 else 688 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 689 690 req->rq_timeout = to->to_initval; 691 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req, to); 692 req->rq_minortimeo = time_init + req->rq_timeout; 693 } 694 695 /** 696 * xprt_adjust_timeout - adjust timeout values for next retransmit 697 * @req: RPC request containing parameters to use for the adjustment 698 * 699 */ 700 int xprt_adjust_timeout(struct rpc_rqst *req) 701 { 702 struct rpc_xprt *xprt = req->rq_xprt; 703 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 704 int status = 0; 705 706 if (time_before(jiffies, req->rq_majortimeo)) { 707 if (time_before(jiffies, req->rq_minortimeo)) 708 return status; 709 if (to->to_exponential) 710 req->rq_timeout <<= 1; 711 else 712 req->rq_timeout += to->to_increment; 713 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 714 req->rq_timeout = to->to_maxval; 715 req->rq_retries++; 716 } else { 717 req->rq_timeout = to->to_initval; 718 req->rq_retries = 0; 719 xprt_reset_majortimeo(req, to); 720 /* Reset the RTT counters == "slow start" */ 721 spin_lock(&xprt->transport_lock); 722 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 723 spin_unlock(&xprt->transport_lock); 724 status = -ETIMEDOUT; 725 } 726 xprt_reset_minortimeo(req); 727 728 if (req->rq_timeout == 0) { 729 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 730 req->rq_timeout = 5 * HZ; 731 } 732 return status; 733 } 734 735 static void xprt_autoclose(struct work_struct *work) 736 { 737 struct rpc_xprt *xprt = 738 container_of(work, struct rpc_xprt, task_cleanup); 739 unsigned int pflags = memalloc_nofs_save(); 740 741 trace_xprt_disconnect_auto(xprt); 742 xprt->connect_cookie++; 743 smp_mb__before_atomic(); 744 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 745 xprt->ops->close(xprt); 746 xprt_release_write(xprt, NULL); 747 wake_up_bit(&xprt->state, XPRT_LOCKED); 748 memalloc_nofs_restore(pflags); 749 } 750 751 /** 752 * xprt_disconnect_done - mark a transport as disconnected 753 * @xprt: transport to flag for disconnect 754 * 755 */ 756 void xprt_disconnect_done(struct rpc_xprt *xprt) 757 { 758 trace_xprt_disconnect_done(xprt); 759 spin_lock(&xprt->transport_lock); 760 xprt_clear_connected(xprt); 761 xprt_clear_write_space_locked(xprt); 762 xprt_clear_congestion_window_wait_locked(xprt); 763 xprt_wake_pending_tasks(xprt, -ENOTCONN); 764 spin_unlock(&xprt->transport_lock); 765 } 766 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 767 768 /** 769 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call 770 * @xprt: transport to disconnect 771 */ 772 static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt) 773 { 774 if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state)) 775 return; 776 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 777 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 778 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state)) 779 rpc_wake_up_queued_task_set_status(&xprt->pending, 780 xprt->snd_task, -ENOTCONN); 781 } 782 783 /** 784 * xprt_force_disconnect - force a transport to disconnect 785 * @xprt: transport to disconnect 786 * 787 */ 788 void xprt_force_disconnect(struct rpc_xprt *xprt) 789 { 790 trace_xprt_disconnect_force(xprt); 791 792 /* Don't race with the test_bit() in xprt_clear_locked() */ 793 spin_lock(&xprt->transport_lock); 794 xprt_schedule_autoclose_locked(xprt); 795 spin_unlock(&xprt->transport_lock); 796 } 797 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 798 799 static unsigned int 800 xprt_connect_cookie(struct rpc_xprt *xprt) 801 { 802 return READ_ONCE(xprt->connect_cookie); 803 } 804 805 static bool 806 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 807 { 808 struct rpc_rqst *req = task->tk_rqstp; 809 struct rpc_xprt *xprt = req->rq_xprt; 810 811 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 812 !xprt_connected(xprt); 813 } 814 815 /** 816 * xprt_conditional_disconnect - force a transport to disconnect 817 * @xprt: transport to disconnect 818 * @cookie: 'connection cookie' 819 * 820 * This attempts to break the connection if and only if 'cookie' matches 821 * the current transport 'connection cookie'. It ensures that we don't 822 * try to break the connection more than once when we need to retransmit 823 * a batch of RPC requests. 824 * 825 */ 826 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 827 { 828 /* Don't race with the test_bit() in xprt_clear_locked() */ 829 spin_lock(&xprt->transport_lock); 830 if (cookie != xprt->connect_cookie) 831 goto out; 832 if (test_bit(XPRT_CLOSING, &xprt->state)) 833 goto out; 834 xprt_schedule_autoclose_locked(xprt); 835 out: 836 spin_unlock(&xprt->transport_lock); 837 } 838 839 static bool 840 xprt_has_timer(const struct rpc_xprt *xprt) 841 { 842 return xprt->idle_timeout != 0; 843 } 844 845 static void 846 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 847 __must_hold(&xprt->transport_lock) 848 { 849 xprt->last_used = jiffies; 850 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 851 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 852 } 853 854 static void 855 xprt_init_autodisconnect(struct timer_list *t) 856 { 857 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 858 859 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 860 return; 861 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 862 xprt->last_used = jiffies; 863 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 864 return; 865 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 866 } 867 868 #if IS_ENABLED(CONFIG_FAIL_SUNRPC) 869 static void xprt_inject_disconnect(struct rpc_xprt *xprt) 870 { 871 if (!fail_sunrpc.ignore_client_disconnect && 872 should_fail(&fail_sunrpc.attr, 1)) 873 xprt->ops->inject_disconnect(xprt); 874 } 875 #else 876 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) 877 { 878 } 879 #endif 880 881 bool xprt_lock_connect(struct rpc_xprt *xprt, 882 struct rpc_task *task, 883 void *cookie) 884 { 885 bool ret = false; 886 887 spin_lock(&xprt->transport_lock); 888 if (!test_bit(XPRT_LOCKED, &xprt->state)) 889 goto out; 890 if (xprt->snd_task != task) 891 goto out; 892 set_bit(XPRT_SND_IS_COOKIE, &xprt->state); 893 xprt->snd_task = cookie; 894 ret = true; 895 out: 896 spin_unlock(&xprt->transport_lock); 897 return ret; 898 } 899 EXPORT_SYMBOL_GPL(xprt_lock_connect); 900 901 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 902 { 903 spin_lock(&xprt->transport_lock); 904 if (xprt->snd_task != cookie) 905 goto out; 906 if (!test_bit(XPRT_LOCKED, &xprt->state)) 907 goto out; 908 xprt->snd_task =NULL; 909 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state); 910 xprt->ops->release_xprt(xprt, NULL); 911 xprt_schedule_autodisconnect(xprt); 912 out: 913 spin_unlock(&xprt->transport_lock); 914 wake_up_bit(&xprt->state, XPRT_LOCKED); 915 } 916 EXPORT_SYMBOL_GPL(xprt_unlock_connect); 917 918 /** 919 * xprt_connect - schedule a transport connect operation 920 * @task: RPC task that is requesting the connect 921 * 922 */ 923 void xprt_connect(struct rpc_task *task) 924 { 925 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 926 927 trace_xprt_connect(xprt); 928 929 if (!xprt_bound(xprt)) { 930 task->tk_status = -EAGAIN; 931 return; 932 } 933 if (!xprt_lock_write(xprt, task)) 934 return; 935 936 if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 937 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 938 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 939 xprt_request_timeout(task->tk_rqstp)); 940 941 if (test_bit(XPRT_CLOSING, &xprt->state)) 942 return; 943 if (xprt_test_and_set_connecting(xprt)) 944 return; 945 /* Race breaker */ 946 if (!xprt_connected(xprt)) { 947 xprt->stat.connect_start = jiffies; 948 xprt->ops->connect(xprt, task); 949 } else { 950 xprt_clear_connecting(xprt); 951 task->tk_status = 0; 952 rpc_wake_up_queued_task(&xprt->pending, task); 953 } 954 } 955 xprt_release_write(xprt, task); 956 } 957 958 /** 959 * xprt_reconnect_delay - compute the wait before scheduling a connect 960 * @xprt: transport instance 961 * 962 */ 963 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 964 { 965 unsigned long start, now = jiffies; 966 967 start = xprt->stat.connect_start + xprt->reestablish_timeout; 968 if (time_after(start, now)) 969 return start - now; 970 return 0; 971 } 972 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 973 974 /** 975 * xprt_reconnect_backoff - compute the new re-establish timeout 976 * @xprt: transport instance 977 * @init_to: initial reestablish timeout 978 * 979 */ 980 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 981 { 982 xprt->reestablish_timeout <<= 1; 983 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 984 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 985 if (xprt->reestablish_timeout < init_to) 986 xprt->reestablish_timeout = init_to; 987 } 988 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 989 990 enum xprt_xid_rb_cmp { 991 XID_RB_EQUAL, 992 XID_RB_LEFT, 993 XID_RB_RIGHT, 994 }; 995 static enum xprt_xid_rb_cmp 996 xprt_xid_cmp(__be32 xid1, __be32 xid2) 997 { 998 if (xid1 == xid2) 999 return XID_RB_EQUAL; 1000 if ((__force u32)xid1 < (__force u32)xid2) 1001 return XID_RB_LEFT; 1002 return XID_RB_RIGHT; 1003 } 1004 1005 static struct rpc_rqst * 1006 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 1007 { 1008 struct rb_node *n = xprt->recv_queue.rb_node; 1009 struct rpc_rqst *req; 1010 1011 while (n != NULL) { 1012 req = rb_entry(n, struct rpc_rqst, rq_recv); 1013 switch (xprt_xid_cmp(xid, req->rq_xid)) { 1014 case XID_RB_LEFT: 1015 n = n->rb_left; 1016 break; 1017 case XID_RB_RIGHT: 1018 n = n->rb_right; 1019 break; 1020 case XID_RB_EQUAL: 1021 return req; 1022 } 1023 } 1024 return NULL; 1025 } 1026 1027 static void 1028 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1029 { 1030 struct rb_node **p = &xprt->recv_queue.rb_node; 1031 struct rb_node *n = NULL; 1032 struct rpc_rqst *req; 1033 1034 while (*p != NULL) { 1035 n = *p; 1036 req = rb_entry(n, struct rpc_rqst, rq_recv); 1037 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1038 case XID_RB_LEFT: 1039 p = &n->rb_left; 1040 break; 1041 case XID_RB_RIGHT: 1042 p = &n->rb_right; 1043 break; 1044 case XID_RB_EQUAL: 1045 WARN_ON_ONCE(new != req); 1046 return; 1047 } 1048 } 1049 rb_link_node(&new->rq_recv, n, p); 1050 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1051 } 1052 1053 static void 1054 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1055 { 1056 rb_erase(&req->rq_recv, &xprt->recv_queue); 1057 } 1058 1059 /** 1060 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1061 * @xprt: transport on which the original request was transmitted 1062 * @xid: RPC XID of incoming reply 1063 * 1064 * Caller holds xprt->queue_lock. 1065 */ 1066 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1067 { 1068 struct rpc_rqst *entry; 1069 1070 entry = xprt_request_rb_find(xprt, xid); 1071 if (entry != NULL) { 1072 trace_xprt_lookup_rqst(xprt, xid, 0); 1073 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1074 return entry; 1075 } 1076 1077 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1078 ntohl(xid)); 1079 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1080 xprt->stat.bad_xids++; 1081 return NULL; 1082 } 1083 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1084 1085 static bool 1086 xprt_is_pinned_rqst(struct rpc_rqst *req) 1087 { 1088 return atomic_read(&req->rq_pin) != 0; 1089 } 1090 1091 /** 1092 * xprt_pin_rqst - Pin a request on the transport receive list 1093 * @req: Request to pin 1094 * 1095 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1096 * so should be holding xprt->queue_lock. 1097 */ 1098 void xprt_pin_rqst(struct rpc_rqst *req) 1099 { 1100 atomic_inc(&req->rq_pin); 1101 } 1102 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1103 1104 /** 1105 * xprt_unpin_rqst - Unpin a request on the transport receive list 1106 * @req: Request to pin 1107 * 1108 * Caller should be holding xprt->queue_lock. 1109 */ 1110 void xprt_unpin_rqst(struct rpc_rqst *req) 1111 { 1112 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1113 atomic_dec(&req->rq_pin); 1114 return; 1115 } 1116 if (atomic_dec_and_test(&req->rq_pin)) 1117 wake_up_var(&req->rq_pin); 1118 } 1119 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1120 1121 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1122 { 1123 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1124 } 1125 1126 static bool 1127 xprt_request_data_received(struct rpc_task *task) 1128 { 1129 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1130 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1131 } 1132 1133 static bool 1134 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1135 { 1136 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1137 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1138 } 1139 1140 /** 1141 * xprt_request_enqueue_receive - Add an request to the receive queue 1142 * @task: RPC task 1143 * 1144 */ 1145 int 1146 xprt_request_enqueue_receive(struct rpc_task *task) 1147 { 1148 struct rpc_rqst *req = task->tk_rqstp; 1149 struct rpc_xprt *xprt = req->rq_xprt; 1150 int ret; 1151 1152 if (!xprt_request_need_enqueue_receive(task, req)) 1153 return 0; 1154 1155 ret = xprt_request_prepare(task->tk_rqstp, &req->rq_rcv_buf); 1156 if (ret) 1157 return ret; 1158 spin_lock(&xprt->queue_lock); 1159 1160 /* Update the softirq receive buffer */ 1161 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1162 sizeof(req->rq_private_buf)); 1163 1164 /* Add request to the receive list */ 1165 xprt_request_rb_insert(xprt, req); 1166 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1167 spin_unlock(&xprt->queue_lock); 1168 1169 /* Turn off autodisconnect */ 1170 del_timer_sync(&xprt->timer); 1171 return 0; 1172 } 1173 1174 /** 1175 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1176 * @task: RPC task 1177 * 1178 * Caller must hold xprt->queue_lock. 1179 */ 1180 static void 1181 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1182 { 1183 struct rpc_rqst *req = task->tk_rqstp; 1184 1185 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1186 xprt_request_rb_remove(req->rq_xprt, req); 1187 } 1188 1189 /** 1190 * xprt_update_rtt - Update RPC RTT statistics 1191 * @task: RPC request that recently completed 1192 * 1193 * Caller holds xprt->queue_lock. 1194 */ 1195 void xprt_update_rtt(struct rpc_task *task) 1196 { 1197 struct rpc_rqst *req = task->tk_rqstp; 1198 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1199 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1200 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1201 1202 if (timer) { 1203 if (req->rq_ntrans == 1) 1204 rpc_update_rtt(rtt, timer, m); 1205 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1206 } 1207 } 1208 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1209 1210 /** 1211 * xprt_complete_rqst - called when reply processing is complete 1212 * @task: RPC request that recently completed 1213 * @copied: actual number of bytes received from the transport 1214 * 1215 * Caller holds xprt->queue_lock. 1216 */ 1217 void xprt_complete_rqst(struct rpc_task *task, int copied) 1218 { 1219 struct rpc_rqst *req = task->tk_rqstp; 1220 struct rpc_xprt *xprt = req->rq_xprt; 1221 1222 xprt->stat.recvs++; 1223 1224 xdr_free_bvec(&req->rq_rcv_buf); 1225 req->rq_private_buf.bvec = NULL; 1226 req->rq_private_buf.len = copied; 1227 /* Ensure all writes are done before we update */ 1228 /* req->rq_reply_bytes_recvd */ 1229 smp_wmb(); 1230 req->rq_reply_bytes_recvd = copied; 1231 xprt_request_dequeue_receive_locked(task); 1232 rpc_wake_up_queued_task(&xprt->pending, task); 1233 } 1234 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1235 1236 static void xprt_timer(struct rpc_task *task) 1237 { 1238 struct rpc_rqst *req = task->tk_rqstp; 1239 struct rpc_xprt *xprt = req->rq_xprt; 1240 1241 if (task->tk_status != -ETIMEDOUT) 1242 return; 1243 1244 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1245 if (!req->rq_reply_bytes_recvd) { 1246 if (xprt->ops->timer) 1247 xprt->ops->timer(xprt, task); 1248 } else 1249 task->tk_status = 0; 1250 } 1251 1252 /** 1253 * xprt_wait_for_reply_request_def - wait for reply 1254 * @task: pointer to rpc_task 1255 * 1256 * Set a request's retransmit timeout based on the transport's 1257 * default timeout parameters. Used by transports that don't adjust 1258 * the retransmit timeout based on round-trip time estimation, 1259 * and put the task to sleep on the pending queue. 1260 */ 1261 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1262 { 1263 struct rpc_rqst *req = task->tk_rqstp; 1264 1265 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1266 xprt_request_timeout(req)); 1267 } 1268 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1269 1270 /** 1271 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1272 * @task: pointer to rpc_task 1273 * 1274 * Set a request's retransmit timeout using the RTT estimator, 1275 * and put the task to sleep on the pending queue. 1276 */ 1277 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1278 { 1279 int timer = task->tk_msg.rpc_proc->p_timer; 1280 struct rpc_clnt *clnt = task->tk_client; 1281 struct rpc_rtt *rtt = clnt->cl_rtt; 1282 struct rpc_rqst *req = task->tk_rqstp; 1283 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1284 unsigned long timeout; 1285 1286 timeout = rpc_calc_rto(rtt, timer); 1287 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1288 if (timeout > max_timeout || timeout == 0) 1289 timeout = max_timeout; 1290 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1291 jiffies + timeout); 1292 } 1293 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1294 1295 /** 1296 * xprt_request_wait_receive - wait for the reply to an RPC request 1297 * @task: RPC task about to send a request 1298 * 1299 */ 1300 void xprt_request_wait_receive(struct rpc_task *task) 1301 { 1302 struct rpc_rqst *req = task->tk_rqstp; 1303 struct rpc_xprt *xprt = req->rq_xprt; 1304 1305 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1306 return; 1307 /* 1308 * Sleep on the pending queue if we're expecting a reply. 1309 * The spinlock ensures atomicity between the test of 1310 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1311 */ 1312 spin_lock(&xprt->queue_lock); 1313 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1314 xprt->ops->wait_for_reply_request(task); 1315 /* 1316 * Send an extra queue wakeup call if the 1317 * connection was dropped in case the call to 1318 * rpc_sleep_on() raced. 1319 */ 1320 if (xprt_request_retransmit_after_disconnect(task)) 1321 rpc_wake_up_queued_task_set_status(&xprt->pending, 1322 task, -ENOTCONN); 1323 } 1324 spin_unlock(&xprt->queue_lock); 1325 } 1326 1327 static bool 1328 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1329 { 1330 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1331 } 1332 1333 /** 1334 * xprt_request_enqueue_transmit - queue a task for transmission 1335 * @task: pointer to rpc_task 1336 * 1337 * Add a task to the transmission queue. 1338 */ 1339 void 1340 xprt_request_enqueue_transmit(struct rpc_task *task) 1341 { 1342 struct rpc_rqst *pos, *req = task->tk_rqstp; 1343 struct rpc_xprt *xprt = req->rq_xprt; 1344 int ret; 1345 1346 if (xprt_request_need_enqueue_transmit(task, req)) { 1347 ret = xprt_request_prepare(task->tk_rqstp, &req->rq_snd_buf); 1348 if (ret) { 1349 task->tk_status = ret; 1350 return; 1351 } 1352 req->rq_bytes_sent = 0; 1353 spin_lock(&xprt->queue_lock); 1354 /* 1355 * Requests that carry congestion control credits are added 1356 * to the head of the list to avoid starvation issues. 1357 */ 1358 if (req->rq_cong) { 1359 xprt_clear_congestion_window_wait(xprt); 1360 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1361 if (pos->rq_cong) 1362 continue; 1363 /* Note: req is added _before_ pos */ 1364 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1365 INIT_LIST_HEAD(&req->rq_xmit2); 1366 goto out; 1367 } 1368 } else if (!req->rq_seqno) { 1369 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1370 if (pos->rq_task->tk_owner != task->tk_owner) 1371 continue; 1372 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1373 INIT_LIST_HEAD(&req->rq_xmit); 1374 goto out; 1375 } 1376 } 1377 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1378 INIT_LIST_HEAD(&req->rq_xmit2); 1379 out: 1380 atomic_long_inc(&xprt->xmit_queuelen); 1381 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1382 spin_unlock(&xprt->queue_lock); 1383 } 1384 } 1385 1386 /** 1387 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1388 * @task: pointer to rpc_task 1389 * 1390 * Remove a task from the transmission queue 1391 * Caller must hold xprt->queue_lock 1392 */ 1393 static void 1394 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1395 { 1396 struct rpc_rqst *req = task->tk_rqstp; 1397 1398 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1399 return; 1400 if (!list_empty(&req->rq_xmit)) { 1401 struct rpc_xprt *xprt = req->rq_xprt; 1402 1403 if (list_is_first(&req->rq_xmit, &xprt->xmit_queue) && 1404 xprt->ops->abort_send_request) 1405 xprt->ops->abort_send_request(req); 1406 1407 list_del(&req->rq_xmit); 1408 if (!list_empty(&req->rq_xmit2)) { 1409 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1410 struct rpc_rqst, rq_xmit2); 1411 list_del(&req->rq_xmit2); 1412 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1413 } 1414 } else 1415 list_del(&req->rq_xmit2); 1416 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1417 xdr_free_bvec(&req->rq_snd_buf); 1418 } 1419 1420 /** 1421 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1422 * @task: pointer to rpc_task 1423 * 1424 * Remove a task from the transmission queue 1425 */ 1426 static void 1427 xprt_request_dequeue_transmit(struct rpc_task *task) 1428 { 1429 struct rpc_rqst *req = task->tk_rqstp; 1430 struct rpc_xprt *xprt = req->rq_xprt; 1431 1432 spin_lock(&xprt->queue_lock); 1433 xprt_request_dequeue_transmit_locked(task); 1434 spin_unlock(&xprt->queue_lock); 1435 } 1436 1437 /** 1438 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1439 * @task: pointer to rpc_task 1440 * 1441 * Remove a task from the transmit and receive queues, and ensure that 1442 * it is not pinned by the receive work item. 1443 */ 1444 void 1445 xprt_request_dequeue_xprt(struct rpc_task *task) 1446 { 1447 struct rpc_rqst *req = task->tk_rqstp; 1448 struct rpc_xprt *xprt = req->rq_xprt; 1449 1450 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1451 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1452 xprt_is_pinned_rqst(req)) { 1453 spin_lock(&xprt->queue_lock); 1454 while (xprt_is_pinned_rqst(req)) { 1455 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1456 spin_unlock(&xprt->queue_lock); 1457 xprt_wait_on_pinned_rqst(req); 1458 spin_lock(&xprt->queue_lock); 1459 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1460 } 1461 xprt_request_dequeue_transmit_locked(task); 1462 xprt_request_dequeue_receive_locked(task); 1463 spin_unlock(&xprt->queue_lock); 1464 xdr_free_bvec(&req->rq_rcv_buf); 1465 } 1466 } 1467 1468 /** 1469 * xprt_request_prepare - prepare an encoded request for transport 1470 * @req: pointer to rpc_rqst 1471 * @buf: pointer to send/rcv xdr_buf 1472 * 1473 * Calls into the transport layer to do whatever is needed to prepare 1474 * the request for transmission or receive. 1475 * Returns error, or zero. 1476 */ 1477 static int 1478 xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf) 1479 { 1480 struct rpc_xprt *xprt = req->rq_xprt; 1481 1482 if (xprt->ops->prepare_request) 1483 return xprt->ops->prepare_request(req, buf); 1484 return 0; 1485 } 1486 1487 /** 1488 * xprt_request_need_retransmit - Test if a task needs retransmission 1489 * @task: pointer to rpc_task 1490 * 1491 * Test for whether a connection breakage requires the task to retransmit 1492 */ 1493 bool 1494 xprt_request_need_retransmit(struct rpc_task *task) 1495 { 1496 return xprt_request_retransmit_after_disconnect(task); 1497 } 1498 1499 /** 1500 * xprt_prepare_transmit - reserve the transport before sending a request 1501 * @task: RPC task about to send a request 1502 * 1503 */ 1504 bool xprt_prepare_transmit(struct rpc_task *task) 1505 { 1506 struct rpc_rqst *req = task->tk_rqstp; 1507 struct rpc_xprt *xprt = req->rq_xprt; 1508 1509 if (!xprt_lock_write(xprt, task)) { 1510 /* Race breaker: someone may have transmitted us */ 1511 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1512 rpc_wake_up_queued_task_set_status(&xprt->sending, 1513 task, 0); 1514 return false; 1515 1516 } 1517 if (atomic_read(&xprt->swapper)) 1518 /* This will be clear in __rpc_execute */ 1519 current->flags |= PF_MEMALLOC; 1520 return true; 1521 } 1522 1523 void xprt_end_transmit(struct rpc_task *task) 1524 { 1525 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1526 1527 xprt_inject_disconnect(xprt); 1528 xprt_release_write(xprt, task); 1529 } 1530 1531 /** 1532 * xprt_request_transmit - send an RPC request on a transport 1533 * @req: pointer to request to transmit 1534 * @snd_task: RPC task that owns the transport lock 1535 * 1536 * This performs the transmission of a single request. 1537 * Note that if the request is not the same as snd_task, then it 1538 * does need to be pinned. 1539 * Returns '0' on success. 1540 */ 1541 static int 1542 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1543 { 1544 struct rpc_xprt *xprt = req->rq_xprt; 1545 struct rpc_task *task = req->rq_task; 1546 unsigned int connect_cookie; 1547 int is_retrans = RPC_WAS_SENT(task); 1548 int status; 1549 1550 if (test_bit(XPRT_CLOSE_WAIT, &xprt->state)) 1551 return -ENOTCONN; 1552 1553 if (!req->rq_bytes_sent) { 1554 if (xprt_request_data_received(task)) { 1555 status = 0; 1556 goto out_dequeue; 1557 } 1558 /* Verify that our message lies in the RPCSEC_GSS window */ 1559 if (rpcauth_xmit_need_reencode(task)) { 1560 status = -EBADMSG; 1561 goto out_dequeue; 1562 } 1563 if (RPC_SIGNALLED(task)) { 1564 status = -ERESTARTSYS; 1565 goto out_dequeue; 1566 } 1567 } 1568 1569 /* 1570 * Update req->rq_ntrans before transmitting to avoid races with 1571 * xprt_update_rtt(), which needs to know that it is recording a 1572 * reply to the first transmission. 1573 */ 1574 req->rq_ntrans++; 1575 1576 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1577 connect_cookie = xprt->connect_cookie; 1578 status = xprt->ops->send_request(req); 1579 if (status != 0) { 1580 req->rq_ntrans--; 1581 trace_xprt_transmit(req, status); 1582 return status; 1583 } 1584 1585 if (is_retrans) { 1586 task->tk_client->cl_stats->rpcretrans++; 1587 trace_xprt_retransmit(req); 1588 } 1589 1590 xprt_inject_disconnect(xprt); 1591 1592 task->tk_flags |= RPC_TASK_SENT; 1593 spin_lock(&xprt->transport_lock); 1594 1595 xprt->stat.sends++; 1596 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1597 xprt->stat.bklog_u += xprt->backlog.qlen; 1598 xprt->stat.sending_u += xprt->sending.qlen; 1599 xprt->stat.pending_u += xprt->pending.qlen; 1600 spin_unlock(&xprt->transport_lock); 1601 1602 req->rq_connect_cookie = connect_cookie; 1603 out_dequeue: 1604 trace_xprt_transmit(req, status); 1605 xprt_request_dequeue_transmit(task); 1606 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1607 return status; 1608 } 1609 1610 /** 1611 * xprt_transmit - send an RPC request on a transport 1612 * @task: controlling RPC task 1613 * 1614 * Attempts to drain the transmit queue. On exit, either the transport 1615 * signalled an error that needs to be handled before transmission can 1616 * resume, or @task finished transmitting, and detected that it already 1617 * received a reply. 1618 */ 1619 void 1620 xprt_transmit(struct rpc_task *task) 1621 { 1622 struct rpc_rqst *next, *req = task->tk_rqstp; 1623 struct rpc_xprt *xprt = req->rq_xprt; 1624 int status; 1625 1626 spin_lock(&xprt->queue_lock); 1627 for (;;) { 1628 next = list_first_entry_or_null(&xprt->xmit_queue, 1629 struct rpc_rqst, rq_xmit); 1630 if (!next) 1631 break; 1632 xprt_pin_rqst(next); 1633 spin_unlock(&xprt->queue_lock); 1634 status = xprt_request_transmit(next, task); 1635 if (status == -EBADMSG && next != req) 1636 status = 0; 1637 spin_lock(&xprt->queue_lock); 1638 xprt_unpin_rqst(next); 1639 if (status < 0) { 1640 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1641 task->tk_status = status; 1642 break; 1643 } 1644 /* Was @task transmitted, and has it received a reply? */ 1645 if (xprt_request_data_received(task) && 1646 !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1647 break; 1648 cond_resched_lock(&xprt->queue_lock); 1649 } 1650 spin_unlock(&xprt->queue_lock); 1651 } 1652 1653 static void xprt_complete_request_init(struct rpc_task *task) 1654 { 1655 if (task->tk_rqstp) 1656 xprt_request_init(task); 1657 } 1658 1659 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1660 { 1661 set_bit(XPRT_CONGESTED, &xprt->state); 1662 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1663 } 1664 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1665 1666 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1667 { 1668 struct rpc_rqst *req = data; 1669 1670 if (task->tk_rqstp == NULL) { 1671 memset(req, 0, sizeof(*req)); /* mark unused */ 1672 task->tk_rqstp = req; 1673 return true; 1674 } 1675 return false; 1676 } 1677 1678 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1679 { 1680 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1681 clear_bit(XPRT_CONGESTED, &xprt->state); 1682 return false; 1683 } 1684 return true; 1685 } 1686 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1687 1688 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1689 { 1690 bool ret = false; 1691 1692 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1693 goto out; 1694 spin_lock(&xprt->reserve_lock); 1695 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1696 xprt_add_backlog(xprt, task); 1697 ret = true; 1698 } 1699 spin_unlock(&xprt->reserve_lock); 1700 out: 1701 return ret; 1702 } 1703 1704 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1705 { 1706 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1707 1708 if (xprt->num_reqs >= xprt->max_reqs) 1709 goto out; 1710 ++xprt->num_reqs; 1711 spin_unlock(&xprt->reserve_lock); 1712 req = kzalloc(sizeof(*req), rpc_task_gfp_mask()); 1713 spin_lock(&xprt->reserve_lock); 1714 if (req != NULL) 1715 goto out; 1716 --xprt->num_reqs; 1717 req = ERR_PTR(-ENOMEM); 1718 out: 1719 return req; 1720 } 1721 1722 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1723 { 1724 if (xprt->num_reqs > xprt->min_reqs) { 1725 --xprt->num_reqs; 1726 kfree(req); 1727 return true; 1728 } 1729 return false; 1730 } 1731 1732 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1733 { 1734 struct rpc_rqst *req; 1735 1736 spin_lock(&xprt->reserve_lock); 1737 if (!list_empty(&xprt->free)) { 1738 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1739 list_del(&req->rq_list); 1740 goto out_init_req; 1741 } 1742 req = xprt_dynamic_alloc_slot(xprt); 1743 if (!IS_ERR(req)) 1744 goto out_init_req; 1745 switch (PTR_ERR(req)) { 1746 case -ENOMEM: 1747 dprintk("RPC: dynamic allocation of request slot " 1748 "failed! Retrying\n"); 1749 task->tk_status = -ENOMEM; 1750 break; 1751 case -EAGAIN: 1752 xprt_add_backlog(xprt, task); 1753 dprintk("RPC: waiting for request slot\n"); 1754 fallthrough; 1755 default: 1756 task->tk_status = -EAGAIN; 1757 } 1758 spin_unlock(&xprt->reserve_lock); 1759 return; 1760 out_init_req: 1761 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1762 xprt->num_reqs); 1763 spin_unlock(&xprt->reserve_lock); 1764 1765 task->tk_status = 0; 1766 task->tk_rqstp = req; 1767 } 1768 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1769 1770 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1771 { 1772 spin_lock(&xprt->reserve_lock); 1773 if (!xprt_wake_up_backlog(xprt, req) && 1774 !xprt_dynamic_free_slot(xprt, req)) { 1775 memset(req, 0, sizeof(*req)); /* mark unused */ 1776 list_add(&req->rq_list, &xprt->free); 1777 } 1778 spin_unlock(&xprt->reserve_lock); 1779 } 1780 EXPORT_SYMBOL_GPL(xprt_free_slot); 1781 1782 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1783 { 1784 struct rpc_rqst *req; 1785 while (!list_empty(&xprt->free)) { 1786 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1787 list_del(&req->rq_list); 1788 kfree(req); 1789 } 1790 } 1791 1792 static DEFINE_IDA(rpc_xprt_ids); 1793 1794 void xprt_cleanup_ids(void) 1795 { 1796 ida_destroy(&rpc_xprt_ids); 1797 } 1798 1799 static int xprt_alloc_id(struct rpc_xprt *xprt) 1800 { 1801 int id; 1802 1803 id = ida_alloc(&rpc_xprt_ids, GFP_KERNEL); 1804 if (id < 0) 1805 return id; 1806 1807 xprt->id = id; 1808 return 0; 1809 } 1810 1811 static void xprt_free_id(struct rpc_xprt *xprt) 1812 { 1813 ida_free(&rpc_xprt_ids, xprt->id); 1814 } 1815 1816 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1817 unsigned int num_prealloc, 1818 unsigned int max_alloc) 1819 { 1820 struct rpc_xprt *xprt; 1821 struct rpc_rqst *req; 1822 int i; 1823 1824 xprt = kzalloc(size, GFP_KERNEL); 1825 if (xprt == NULL) 1826 goto out; 1827 1828 xprt_alloc_id(xprt); 1829 xprt_init(xprt, net); 1830 1831 for (i = 0; i < num_prealloc; i++) { 1832 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1833 if (!req) 1834 goto out_free; 1835 list_add(&req->rq_list, &xprt->free); 1836 } 1837 xprt->max_reqs = max_t(unsigned int, max_alloc, num_prealloc); 1838 xprt->min_reqs = num_prealloc; 1839 xprt->num_reqs = num_prealloc; 1840 1841 return xprt; 1842 1843 out_free: 1844 xprt_free(xprt); 1845 out: 1846 return NULL; 1847 } 1848 EXPORT_SYMBOL_GPL(xprt_alloc); 1849 1850 void xprt_free(struct rpc_xprt *xprt) 1851 { 1852 put_net_track(xprt->xprt_net, &xprt->ns_tracker); 1853 xprt_free_all_slots(xprt); 1854 xprt_free_id(xprt); 1855 rpc_sysfs_xprt_destroy(xprt); 1856 kfree_rcu(xprt, rcu); 1857 } 1858 EXPORT_SYMBOL_GPL(xprt_free); 1859 1860 static void 1861 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1862 { 1863 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1864 } 1865 1866 static __be32 1867 xprt_alloc_xid(struct rpc_xprt *xprt) 1868 { 1869 __be32 xid; 1870 1871 spin_lock(&xprt->reserve_lock); 1872 xid = (__force __be32)xprt->xid++; 1873 spin_unlock(&xprt->reserve_lock); 1874 return xid; 1875 } 1876 1877 static void 1878 xprt_init_xid(struct rpc_xprt *xprt) 1879 { 1880 xprt->xid = get_random_u32(); 1881 } 1882 1883 static void 1884 xprt_request_init(struct rpc_task *task) 1885 { 1886 struct rpc_xprt *xprt = task->tk_xprt; 1887 struct rpc_rqst *req = task->tk_rqstp; 1888 1889 req->rq_task = task; 1890 req->rq_xprt = xprt; 1891 req->rq_buffer = NULL; 1892 req->rq_xid = xprt_alloc_xid(xprt); 1893 xprt_init_connect_cookie(req, xprt); 1894 req->rq_snd_buf.len = 0; 1895 req->rq_snd_buf.buflen = 0; 1896 req->rq_rcv_buf.len = 0; 1897 req->rq_rcv_buf.buflen = 0; 1898 req->rq_snd_buf.bvec = NULL; 1899 req->rq_rcv_buf.bvec = NULL; 1900 req->rq_release_snd_buf = NULL; 1901 xprt_init_majortimeo(task, req, task->tk_client->cl_timeout); 1902 1903 trace_xprt_reserve(req); 1904 } 1905 1906 static void 1907 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1908 { 1909 xprt->ops->alloc_slot(xprt, task); 1910 if (task->tk_rqstp != NULL) 1911 xprt_request_init(task); 1912 } 1913 1914 /** 1915 * xprt_reserve - allocate an RPC request slot 1916 * @task: RPC task requesting a slot allocation 1917 * 1918 * If the transport is marked as being congested, or if no more 1919 * slots are available, place the task on the transport's 1920 * backlog queue. 1921 */ 1922 void xprt_reserve(struct rpc_task *task) 1923 { 1924 struct rpc_xprt *xprt = task->tk_xprt; 1925 1926 task->tk_status = 0; 1927 if (task->tk_rqstp != NULL) 1928 return; 1929 1930 task->tk_status = -EAGAIN; 1931 if (!xprt_throttle_congested(xprt, task)) 1932 xprt_do_reserve(xprt, task); 1933 } 1934 1935 /** 1936 * xprt_retry_reserve - allocate an RPC request slot 1937 * @task: RPC task requesting a slot allocation 1938 * 1939 * If no more slots are available, place the task on the transport's 1940 * backlog queue. 1941 * Note that the only difference with xprt_reserve is that we now 1942 * ignore the value of the XPRT_CONGESTED flag. 1943 */ 1944 void xprt_retry_reserve(struct rpc_task *task) 1945 { 1946 struct rpc_xprt *xprt = task->tk_xprt; 1947 1948 task->tk_status = 0; 1949 if (task->tk_rqstp != NULL) 1950 return; 1951 1952 task->tk_status = -EAGAIN; 1953 xprt_do_reserve(xprt, task); 1954 } 1955 1956 /** 1957 * xprt_release - release an RPC request slot 1958 * @task: task which is finished with the slot 1959 * 1960 */ 1961 void xprt_release(struct rpc_task *task) 1962 { 1963 struct rpc_xprt *xprt; 1964 struct rpc_rqst *req = task->tk_rqstp; 1965 1966 if (req == NULL) { 1967 if (task->tk_client) { 1968 xprt = task->tk_xprt; 1969 xprt_release_write(xprt, task); 1970 } 1971 return; 1972 } 1973 1974 xprt = req->rq_xprt; 1975 xprt_request_dequeue_xprt(task); 1976 spin_lock(&xprt->transport_lock); 1977 xprt->ops->release_xprt(xprt, task); 1978 if (xprt->ops->release_request) 1979 xprt->ops->release_request(task); 1980 xprt_schedule_autodisconnect(xprt); 1981 spin_unlock(&xprt->transport_lock); 1982 if (req->rq_buffer) 1983 xprt->ops->buf_free(task); 1984 if (req->rq_cred != NULL) 1985 put_rpccred(req->rq_cred); 1986 if (req->rq_release_snd_buf) 1987 req->rq_release_snd_buf(req); 1988 1989 task->tk_rqstp = NULL; 1990 if (likely(!bc_prealloc(req))) 1991 xprt->ops->free_slot(xprt, req); 1992 else 1993 xprt_free_bc_request(req); 1994 } 1995 1996 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1997 void 1998 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task, 1999 const struct rpc_timeout *to) 2000 { 2001 struct xdr_buf *xbufp = &req->rq_snd_buf; 2002 2003 task->tk_rqstp = req; 2004 req->rq_task = task; 2005 xprt_init_connect_cookie(req, req->rq_xprt); 2006 /* 2007 * Set up the xdr_buf length. 2008 * This also indicates that the buffer is XDR encoded already. 2009 */ 2010 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 2011 xbufp->tail[0].iov_len; 2012 /* 2013 * Backchannel Replies are sent with !RPC_TASK_SOFT and 2014 * RPC_TASK_NO_RETRANS_TIMEOUT. The major timeout setting 2015 * affects only how long each Reply waits to be sent when 2016 * a transport connection cannot be established. 2017 */ 2018 xprt_init_majortimeo(task, req, to); 2019 } 2020 #endif 2021 2022 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 2023 { 2024 kref_init(&xprt->kref); 2025 2026 spin_lock_init(&xprt->transport_lock); 2027 spin_lock_init(&xprt->reserve_lock); 2028 spin_lock_init(&xprt->queue_lock); 2029 2030 INIT_LIST_HEAD(&xprt->free); 2031 xprt->recv_queue = RB_ROOT; 2032 INIT_LIST_HEAD(&xprt->xmit_queue); 2033 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 2034 spin_lock_init(&xprt->bc_pa_lock); 2035 INIT_LIST_HEAD(&xprt->bc_pa_list); 2036 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 2037 INIT_LIST_HEAD(&xprt->xprt_switch); 2038 2039 xprt->last_used = jiffies; 2040 xprt->cwnd = RPC_INITCWND; 2041 xprt->bind_index = 0; 2042 2043 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2044 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2045 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2046 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2047 2048 xprt_init_xid(xprt); 2049 2050 xprt->xprt_net = get_net_track(net, &xprt->ns_tracker, GFP_KERNEL); 2051 } 2052 2053 /** 2054 * xprt_create_transport - create an RPC transport 2055 * @args: rpc transport creation arguments 2056 * 2057 */ 2058 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2059 { 2060 struct rpc_xprt *xprt; 2061 const struct xprt_class *t; 2062 2063 t = xprt_class_find_by_ident(args->ident); 2064 if (!t) { 2065 dprintk("RPC: transport (%d) not supported\n", args->ident); 2066 return ERR_PTR(-EIO); 2067 } 2068 2069 xprt = t->setup(args); 2070 xprt_class_release(t); 2071 2072 if (IS_ERR(xprt)) 2073 goto out; 2074 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2075 xprt->idle_timeout = 0; 2076 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2077 if (xprt_has_timer(xprt)) 2078 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2079 else 2080 timer_setup(&xprt->timer, NULL, 0); 2081 2082 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2083 xprt_destroy(xprt); 2084 return ERR_PTR(-EINVAL); 2085 } 2086 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2087 if (xprt->servername == NULL) { 2088 xprt_destroy(xprt); 2089 return ERR_PTR(-ENOMEM); 2090 } 2091 2092 rpc_xprt_debugfs_register(xprt); 2093 2094 trace_xprt_create(xprt); 2095 out: 2096 return xprt; 2097 } 2098 2099 static void xprt_destroy_cb(struct work_struct *work) 2100 { 2101 struct rpc_xprt *xprt = 2102 container_of(work, struct rpc_xprt, task_cleanup); 2103 2104 trace_xprt_destroy(xprt); 2105 2106 rpc_xprt_debugfs_unregister(xprt); 2107 rpc_destroy_wait_queue(&xprt->binding); 2108 rpc_destroy_wait_queue(&xprt->pending); 2109 rpc_destroy_wait_queue(&xprt->sending); 2110 rpc_destroy_wait_queue(&xprt->backlog); 2111 kfree(xprt->servername); 2112 /* 2113 * Destroy any existing back channel 2114 */ 2115 xprt_destroy_backchannel(xprt, UINT_MAX); 2116 2117 /* 2118 * Tear down transport state and free the rpc_xprt 2119 */ 2120 xprt->ops->destroy(xprt); 2121 } 2122 2123 /** 2124 * xprt_destroy - destroy an RPC transport, killing off all requests. 2125 * @xprt: transport to destroy 2126 * 2127 */ 2128 static void xprt_destroy(struct rpc_xprt *xprt) 2129 { 2130 /* 2131 * Exclude transport connect/disconnect handlers and autoclose 2132 */ 2133 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2134 2135 /* 2136 * xprt_schedule_autodisconnect() can run after XPRT_LOCKED 2137 * is cleared. We use ->transport_lock to ensure the mod_timer() 2138 * can only run *before* del_time_sync(), never after. 2139 */ 2140 spin_lock(&xprt->transport_lock); 2141 del_timer_sync(&xprt->timer); 2142 spin_unlock(&xprt->transport_lock); 2143 2144 /* 2145 * Destroy sockets etc from the system workqueue so they can 2146 * safely flush receive work running on rpciod. 2147 */ 2148 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2149 schedule_work(&xprt->task_cleanup); 2150 } 2151 2152 static void xprt_destroy_kref(struct kref *kref) 2153 { 2154 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2155 } 2156 2157 /** 2158 * xprt_get - return a reference to an RPC transport. 2159 * @xprt: pointer to the transport 2160 * 2161 */ 2162 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2163 { 2164 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2165 return xprt; 2166 return NULL; 2167 } 2168 EXPORT_SYMBOL_GPL(xprt_get); 2169 2170 /** 2171 * xprt_put - release a reference to an RPC transport. 2172 * @xprt: pointer to the transport 2173 * 2174 */ 2175 void xprt_put(struct rpc_xprt *xprt) 2176 { 2177 if (xprt != NULL) 2178 kref_put(&xprt->kref, xprt_destroy_kref); 2179 } 2180 EXPORT_SYMBOL_GPL(xprt_put); 2181 2182 void xprt_set_offline_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps) 2183 { 2184 if (!test_and_set_bit(XPRT_OFFLINE, &xprt->state)) { 2185 spin_lock(&xps->xps_lock); 2186 xps->xps_nactive--; 2187 spin_unlock(&xps->xps_lock); 2188 } 2189 } 2190 2191 void xprt_set_online_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps) 2192 { 2193 if (test_and_clear_bit(XPRT_OFFLINE, &xprt->state)) { 2194 spin_lock(&xps->xps_lock); 2195 xps->xps_nactive++; 2196 spin_unlock(&xps->xps_lock); 2197 } 2198 } 2199 2200 void xprt_delete_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps) 2201 { 2202 if (test_and_set_bit(XPRT_REMOVE, &xprt->state)) 2203 return; 2204 2205 xprt_force_disconnect(xprt); 2206 if (!test_bit(XPRT_CONNECTED, &xprt->state)) 2207 return; 2208 2209 if (!xprt->sending.qlen && !xprt->pending.qlen && 2210 !xprt->backlog.qlen && !atomic_long_read(&xprt->queuelen)) 2211 rpc_xprt_switch_remove_xprt(xps, xprt, true); 2212 } 2213