1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 59 /* 60 * Local variables 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_XPRT 65 #endif 66 67 /* 68 * Local functions 69 */ 70 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 71 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 72 static void xprt_destroy(struct rpc_xprt *xprt); 73 74 static DEFINE_SPINLOCK(xprt_list_lock); 75 static LIST_HEAD(xprt_list); 76 77 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 78 { 79 unsigned long timeout = jiffies + req->rq_timeout; 80 81 if (time_before(timeout, req->rq_majortimeo)) 82 return timeout; 83 return req->rq_majortimeo; 84 } 85 86 /** 87 * xprt_register_transport - register a transport implementation 88 * @transport: transport to register 89 * 90 * If a transport implementation is loaded as a kernel module, it can 91 * call this interface to make itself known to the RPC client. 92 * 93 * Returns: 94 * 0: transport successfully registered 95 * -EEXIST: transport already registered 96 * -EINVAL: transport module being unloaded 97 */ 98 int xprt_register_transport(struct xprt_class *transport) 99 { 100 struct xprt_class *t; 101 int result; 102 103 result = -EEXIST; 104 spin_lock(&xprt_list_lock); 105 list_for_each_entry(t, &xprt_list, list) { 106 /* don't register the same transport class twice */ 107 if (t->ident == transport->ident) 108 goto out; 109 } 110 111 list_add_tail(&transport->list, &xprt_list); 112 printk(KERN_INFO "RPC: Registered %s transport module.\n", 113 transport->name); 114 result = 0; 115 116 out: 117 spin_unlock(&xprt_list_lock); 118 return result; 119 } 120 EXPORT_SYMBOL_GPL(xprt_register_transport); 121 122 /** 123 * xprt_unregister_transport - unregister a transport implementation 124 * @transport: transport to unregister 125 * 126 * Returns: 127 * 0: transport successfully unregistered 128 * -ENOENT: transport never registered 129 */ 130 int xprt_unregister_transport(struct xprt_class *transport) 131 { 132 struct xprt_class *t; 133 int result; 134 135 result = 0; 136 spin_lock(&xprt_list_lock); 137 list_for_each_entry(t, &xprt_list, list) { 138 if (t == transport) { 139 printk(KERN_INFO 140 "RPC: Unregistered %s transport module.\n", 141 transport->name); 142 list_del_init(&transport->list); 143 goto out; 144 } 145 } 146 result = -ENOENT; 147 148 out: 149 spin_unlock(&xprt_list_lock); 150 return result; 151 } 152 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 153 154 /** 155 * xprt_load_transport - load a transport implementation 156 * @transport_name: transport to load 157 * 158 * Returns: 159 * 0: transport successfully loaded 160 * -ENOENT: transport module not available 161 */ 162 int xprt_load_transport(const char *transport_name) 163 { 164 struct xprt_class *t; 165 int result; 166 167 result = 0; 168 spin_lock(&xprt_list_lock); 169 list_for_each_entry(t, &xprt_list, list) { 170 if (strcmp(t->name, transport_name) == 0) { 171 spin_unlock(&xprt_list_lock); 172 goto out; 173 } 174 } 175 spin_unlock(&xprt_list_lock); 176 result = request_module("xprt%s", transport_name); 177 out: 178 return result; 179 } 180 EXPORT_SYMBOL_GPL(xprt_load_transport); 181 182 static void xprt_clear_locked(struct rpc_xprt *xprt) 183 { 184 xprt->snd_task = NULL; 185 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 186 smp_mb__before_atomic(); 187 clear_bit(XPRT_LOCKED, &xprt->state); 188 smp_mb__after_atomic(); 189 } else 190 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 191 } 192 193 /** 194 * xprt_reserve_xprt - serialize write access to transports 195 * @task: task that is requesting access to the transport 196 * @xprt: pointer to the target transport 197 * 198 * This prevents mixing the payload of separate requests, and prevents 199 * transport connects from colliding with writes. No congestion control 200 * is provided. 201 */ 202 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 203 { 204 struct rpc_rqst *req = task->tk_rqstp; 205 206 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 207 if (task == xprt->snd_task) 208 return 1; 209 goto out_sleep; 210 } 211 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 212 goto out_unlock; 213 xprt->snd_task = task; 214 215 return 1; 216 217 out_unlock: 218 xprt_clear_locked(xprt); 219 out_sleep: 220 dprintk("RPC: %5u failed to lock transport %p\n", 221 task->tk_pid, xprt); 222 task->tk_status = -EAGAIN; 223 if (RPC_IS_SOFT(task)) 224 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 225 xprt_request_timeout(req)); 226 else 227 rpc_sleep_on(&xprt->sending, task, NULL); 228 return 0; 229 } 230 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 231 232 static bool 233 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 234 { 235 return test_bit(XPRT_CWND_WAIT, &xprt->state); 236 } 237 238 static void 239 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 240 { 241 if (!list_empty(&xprt->xmit_queue)) { 242 /* Peek at head of queue to see if it can make progress */ 243 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 244 rq_xmit)->rq_cong) 245 return; 246 } 247 set_bit(XPRT_CWND_WAIT, &xprt->state); 248 } 249 250 static void 251 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 252 { 253 if (!RPCXPRT_CONGESTED(xprt)) 254 clear_bit(XPRT_CWND_WAIT, &xprt->state); 255 } 256 257 /* 258 * xprt_reserve_xprt_cong - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * 261 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 262 * integrated into the decision of whether a request is allowed to be 263 * woken up and given access to the transport. 264 * Note that the lock is only granted if we know there are free slots. 265 */ 266 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 return 1; 273 goto out_sleep; 274 } 275 if (req == NULL) { 276 xprt->snd_task = task; 277 return 1; 278 } 279 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 280 goto out_unlock; 281 if (!xprt_need_congestion_window_wait(xprt)) { 282 xprt->snd_task = task; 283 return 1; 284 } 285 out_unlock: 286 xprt_clear_locked(xprt); 287 out_sleep: 288 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 289 task->tk_status = -EAGAIN; 290 if (RPC_IS_SOFT(task)) 291 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 292 xprt_request_timeout(req)); 293 else 294 rpc_sleep_on(&xprt->sending, task, NULL); 295 return 0; 296 } 297 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 298 299 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 300 { 301 int retval; 302 303 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 304 return 1; 305 spin_lock_bh(&xprt->transport_lock); 306 retval = xprt->ops->reserve_xprt(xprt, task); 307 spin_unlock_bh(&xprt->transport_lock); 308 return retval; 309 } 310 311 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 312 { 313 struct rpc_xprt *xprt = data; 314 315 xprt->snd_task = task; 316 return true; 317 } 318 319 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 320 { 321 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 322 return; 323 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 324 goto out_unlock; 325 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 326 __xprt_lock_write_func, xprt)) 327 return; 328 out_unlock: 329 xprt_clear_locked(xprt); 330 } 331 332 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 333 { 334 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 335 return; 336 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 337 goto out_unlock; 338 if (xprt_need_congestion_window_wait(xprt)) 339 goto out_unlock; 340 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 341 __xprt_lock_write_func, xprt)) 342 return; 343 out_unlock: 344 xprt_clear_locked(xprt); 345 } 346 347 /** 348 * xprt_release_xprt - allow other requests to use a transport 349 * @xprt: transport with other tasks potentially waiting 350 * @task: task that is releasing access to the transport 351 * 352 * Note that "task" can be NULL. No congestion control is provided. 353 */ 354 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 355 { 356 if (xprt->snd_task == task) { 357 xprt_clear_locked(xprt); 358 __xprt_lock_write_next(xprt); 359 } 360 } 361 EXPORT_SYMBOL_GPL(xprt_release_xprt); 362 363 /** 364 * xprt_release_xprt_cong - allow other requests to use a transport 365 * @xprt: transport with other tasks potentially waiting 366 * @task: task that is releasing access to the transport 367 * 368 * Note that "task" can be NULL. Another task is awoken to use the 369 * transport if the transport's congestion window allows it. 370 */ 371 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 372 { 373 if (xprt->snd_task == task) { 374 xprt_clear_locked(xprt); 375 __xprt_lock_write_next_cong(xprt); 376 } 377 } 378 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 379 380 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 381 { 382 if (xprt->snd_task != task) 383 return; 384 spin_lock_bh(&xprt->transport_lock); 385 xprt->ops->release_xprt(xprt, task); 386 spin_unlock_bh(&xprt->transport_lock); 387 } 388 389 /* 390 * Van Jacobson congestion avoidance. Check if the congestion window 391 * overflowed. Put the task to sleep if this is the case. 392 */ 393 static int 394 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 395 { 396 if (req->rq_cong) 397 return 1; 398 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 399 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 400 if (RPCXPRT_CONGESTED(xprt)) { 401 xprt_set_congestion_window_wait(xprt); 402 return 0; 403 } 404 req->rq_cong = 1; 405 xprt->cong += RPC_CWNDSCALE; 406 return 1; 407 } 408 409 /* 410 * Adjust the congestion window, and wake up the next task 411 * that has been sleeping due to congestion 412 */ 413 static void 414 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 415 { 416 if (!req->rq_cong) 417 return; 418 req->rq_cong = 0; 419 xprt->cong -= RPC_CWNDSCALE; 420 xprt_test_and_clear_congestion_window_wait(xprt); 421 __xprt_lock_write_next_cong(xprt); 422 } 423 424 /** 425 * xprt_request_get_cong - Request congestion control credits 426 * @xprt: pointer to transport 427 * @req: pointer to RPC request 428 * 429 * Useful for transports that require congestion control. 430 */ 431 bool 432 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 433 { 434 bool ret = false; 435 436 if (req->rq_cong) 437 return true; 438 spin_lock_bh(&xprt->transport_lock); 439 ret = __xprt_get_cong(xprt, req) != 0; 440 spin_unlock_bh(&xprt->transport_lock); 441 return ret; 442 } 443 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 444 445 /** 446 * xprt_release_rqst_cong - housekeeping when request is complete 447 * @task: RPC request that recently completed 448 * 449 * Useful for transports that require congestion control. 450 */ 451 void xprt_release_rqst_cong(struct rpc_task *task) 452 { 453 struct rpc_rqst *req = task->tk_rqstp; 454 455 __xprt_put_cong(req->rq_xprt, req); 456 } 457 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 458 459 /* 460 * Clear the congestion window wait flag and wake up the next 461 * entry on xprt->sending 462 */ 463 static void 464 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 465 { 466 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 467 spin_lock_bh(&xprt->transport_lock); 468 __xprt_lock_write_next_cong(xprt); 469 spin_unlock_bh(&xprt->transport_lock); 470 } 471 } 472 473 /** 474 * xprt_adjust_cwnd - adjust transport congestion window 475 * @xprt: pointer to xprt 476 * @task: recently completed RPC request used to adjust window 477 * @result: result code of completed RPC request 478 * 479 * The transport code maintains an estimate on the maximum number of out- 480 * standing RPC requests, using a smoothed version of the congestion 481 * avoidance implemented in 44BSD. This is basically the Van Jacobson 482 * congestion algorithm: If a retransmit occurs, the congestion window is 483 * halved; otherwise, it is incremented by 1/cwnd when 484 * 485 * - a reply is received and 486 * - a full number of requests are outstanding and 487 * - the congestion window hasn't been updated recently. 488 */ 489 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 490 { 491 struct rpc_rqst *req = task->tk_rqstp; 492 unsigned long cwnd = xprt->cwnd; 493 494 if (result >= 0 && cwnd <= xprt->cong) { 495 /* The (cwnd >> 1) term makes sure 496 * the result gets rounded properly. */ 497 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 498 if (cwnd > RPC_MAXCWND(xprt)) 499 cwnd = RPC_MAXCWND(xprt); 500 __xprt_lock_write_next_cong(xprt); 501 } else if (result == -ETIMEDOUT) { 502 cwnd >>= 1; 503 if (cwnd < RPC_CWNDSCALE) 504 cwnd = RPC_CWNDSCALE; 505 } 506 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 507 xprt->cong, xprt->cwnd, cwnd); 508 xprt->cwnd = cwnd; 509 __xprt_put_cong(xprt, req); 510 } 511 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 512 513 /** 514 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 515 * @xprt: transport with waiting tasks 516 * @status: result code to plant in each task before waking it 517 * 518 */ 519 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 520 { 521 if (status < 0) 522 rpc_wake_up_status(&xprt->pending, status); 523 else 524 rpc_wake_up(&xprt->pending); 525 } 526 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 527 528 /** 529 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 530 * @xprt: transport 531 * 532 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 533 * we don't in general want to force a socket disconnection due to 534 * an incomplete RPC call transmission. 535 */ 536 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 537 { 538 set_bit(XPRT_WRITE_SPACE, &xprt->state); 539 } 540 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 541 542 static bool 543 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 544 { 545 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 546 __xprt_lock_write_next(xprt); 547 dprintk("RPC: write space: waking waiting task on " 548 "xprt %p\n", xprt); 549 return true; 550 } 551 return false; 552 } 553 554 /** 555 * xprt_write_space - wake the task waiting for transport output buffer space 556 * @xprt: transport with waiting tasks 557 * 558 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 559 */ 560 bool xprt_write_space(struct rpc_xprt *xprt) 561 { 562 bool ret; 563 564 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 565 return false; 566 spin_lock_bh(&xprt->transport_lock); 567 ret = xprt_clear_write_space_locked(xprt); 568 spin_unlock_bh(&xprt->transport_lock); 569 return ret; 570 } 571 EXPORT_SYMBOL_GPL(xprt_write_space); 572 573 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 574 { 575 s64 delta = ktime_to_ns(ktime_get() - abstime); 576 return likely(delta >= 0) ? 577 jiffies - nsecs_to_jiffies(delta) : 578 jiffies + nsecs_to_jiffies(-delta); 579 } 580 581 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 582 { 583 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 584 unsigned long majortimeo = req->rq_timeout; 585 586 if (to->to_exponential) 587 majortimeo <<= to->to_retries; 588 else 589 majortimeo += to->to_increment * to->to_retries; 590 if (majortimeo > to->to_maxval || majortimeo == 0) 591 majortimeo = to->to_maxval; 592 return majortimeo; 593 } 594 595 static void xprt_reset_majortimeo(struct rpc_rqst *req) 596 { 597 req->rq_majortimeo += xprt_calc_majortimeo(req); 598 } 599 600 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 601 { 602 unsigned long time_init; 603 struct rpc_xprt *xprt = req->rq_xprt; 604 605 if (likely(xprt && xprt_connected(xprt))) 606 time_init = jiffies; 607 else 608 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 609 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 610 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 611 } 612 613 /** 614 * xprt_adjust_timeout - adjust timeout values for next retransmit 615 * @req: RPC request containing parameters to use for the adjustment 616 * 617 */ 618 int xprt_adjust_timeout(struct rpc_rqst *req) 619 { 620 struct rpc_xprt *xprt = req->rq_xprt; 621 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 622 int status = 0; 623 624 if (time_before(jiffies, req->rq_majortimeo)) { 625 if (to->to_exponential) 626 req->rq_timeout <<= 1; 627 else 628 req->rq_timeout += to->to_increment; 629 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 630 req->rq_timeout = to->to_maxval; 631 req->rq_retries++; 632 } else { 633 req->rq_timeout = to->to_initval; 634 req->rq_retries = 0; 635 xprt_reset_majortimeo(req); 636 /* Reset the RTT counters == "slow start" */ 637 spin_lock_bh(&xprt->transport_lock); 638 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 639 spin_unlock_bh(&xprt->transport_lock); 640 status = -ETIMEDOUT; 641 } 642 643 if (req->rq_timeout == 0) { 644 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 645 req->rq_timeout = 5 * HZ; 646 } 647 return status; 648 } 649 650 static void xprt_autoclose(struct work_struct *work) 651 { 652 struct rpc_xprt *xprt = 653 container_of(work, struct rpc_xprt, task_cleanup); 654 unsigned int pflags = memalloc_nofs_save(); 655 656 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 657 xprt->ops->close(xprt); 658 xprt_release_write(xprt, NULL); 659 wake_up_bit(&xprt->state, XPRT_LOCKED); 660 memalloc_nofs_restore(pflags); 661 } 662 663 /** 664 * xprt_disconnect_done - mark a transport as disconnected 665 * @xprt: transport to flag for disconnect 666 * 667 */ 668 void xprt_disconnect_done(struct rpc_xprt *xprt) 669 { 670 dprintk("RPC: disconnected transport %p\n", xprt); 671 spin_lock_bh(&xprt->transport_lock); 672 xprt_clear_connected(xprt); 673 xprt_clear_write_space_locked(xprt); 674 xprt_wake_pending_tasks(xprt, -ENOTCONN); 675 spin_unlock_bh(&xprt->transport_lock); 676 } 677 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 678 679 /** 680 * xprt_force_disconnect - force a transport to disconnect 681 * @xprt: transport to disconnect 682 * 683 */ 684 void xprt_force_disconnect(struct rpc_xprt *xprt) 685 { 686 /* Don't race with the test_bit() in xprt_clear_locked() */ 687 spin_lock_bh(&xprt->transport_lock); 688 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 689 /* Try to schedule an autoclose RPC call */ 690 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 691 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 692 else if (xprt->snd_task) 693 rpc_wake_up_queued_task_set_status(&xprt->pending, 694 xprt->snd_task, -ENOTCONN); 695 spin_unlock_bh(&xprt->transport_lock); 696 } 697 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 698 699 static unsigned int 700 xprt_connect_cookie(struct rpc_xprt *xprt) 701 { 702 return READ_ONCE(xprt->connect_cookie); 703 } 704 705 static bool 706 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 707 { 708 struct rpc_rqst *req = task->tk_rqstp; 709 struct rpc_xprt *xprt = req->rq_xprt; 710 711 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 712 !xprt_connected(xprt); 713 } 714 715 /** 716 * xprt_conditional_disconnect - force a transport to disconnect 717 * @xprt: transport to disconnect 718 * @cookie: 'connection cookie' 719 * 720 * This attempts to break the connection if and only if 'cookie' matches 721 * the current transport 'connection cookie'. It ensures that we don't 722 * try to break the connection more than once when we need to retransmit 723 * a batch of RPC requests. 724 * 725 */ 726 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 727 { 728 /* Don't race with the test_bit() in xprt_clear_locked() */ 729 spin_lock_bh(&xprt->transport_lock); 730 if (cookie != xprt->connect_cookie) 731 goto out; 732 if (test_bit(XPRT_CLOSING, &xprt->state)) 733 goto out; 734 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 735 /* Try to schedule an autoclose RPC call */ 736 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 737 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 738 xprt_wake_pending_tasks(xprt, -EAGAIN); 739 out: 740 spin_unlock_bh(&xprt->transport_lock); 741 } 742 743 static bool 744 xprt_has_timer(const struct rpc_xprt *xprt) 745 { 746 return xprt->idle_timeout != 0; 747 } 748 749 static void 750 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 751 __must_hold(&xprt->transport_lock) 752 { 753 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 754 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 755 } 756 757 static void 758 xprt_init_autodisconnect(struct timer_list *t) 759 { 760 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 761 762 spin_lock(&xprt->transport_lock); 763 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 764 goto out_abort; 765 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 766 xprt->last_used = jiffies; 767 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 768 goto out_abort; 769 spin_unlock(&xprt->transport_lock); 770 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 771 return; 772 out_abort: 773 spin_unlock(&xprt->transport_lock); 774 } 775 776 bool xprt_lock_connect(struct rpc_xprt *xprt, 777 struct rpc_task *task, 778 void *cookie) 779 { 780 bool ret = false; 781 782 spin_lock_bh(&xprt->transport_lock); 783 if (!test_bit(XPRT_LOCKED, &xprt->state)) 784 goto out; 785 if (xprt->snd_task != task) 786 goto out; 787 xprt->snd_task = cookie; 788 ret = true; 789 out: 790 spin_unlock_bh(&xprt->transport_lock); 791 return ret; 792 } 793 794 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 795 { 796 spin_lock_bh(&xprt->transport_lock); 797 if (xprt->snd_task != cookie) 798 goto out; 799 if (!test_bit(XPRT_LOCKED, &xprt->state)) 800 goto out; 801 xprt->snd_task =NULL; 802 xprt->ops->release_xprt(xprt, NULL); 803 xprt_schedule_autodisconnect(xprt); 804 out: 805 spin_unlock_bh(&xprt->transport_lock); 806 wake_up_bit(&xprt->state, XPRT_LOCKED); 807 } 808 809 /** 810 * xprt_connect - schedule a transport connect operation 811 * @task: RPC task that is requesting the connect 812 * 813 */ 814 void xprt_connect(struct rpc_task *task) 815 { 816 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 817 818 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 819 xprt, (xprt_connected(xprt) ? "is" : "is not")); 820 821 if (!xprt_bound(xprt)) { 822 task->tk_status = -EAGAIN; 823 return; 824 } 825 if (!xprt_lock_write(xprt, task)) 826 return; 827 828 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 829 xprt->ops->close(xprt); 830 831 if (!xprt_connected(xprt)) { 832 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 833 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 834 xprt_request_timeout(task->tk_rqstp)); 835 836 if (test_bit(XPRT_CLOSING, &xprt->state)) 837 return; 838 if (xprt_test_and_set_connecting(xprt)) 839 return; 840 /* Race breaker */ 841 if (!xprt_connected(xprt)) { 842 xprt->stat.connect_start = jiffies; 843 xprt->ops->connect(xprt, task); 844 } else { 845 xprt_clear_connecting(xprt); 846 task->tk_status = 0; 847 rpc_wake_up_queued_task(&xprt->pending, task); 848 } 849 } 850 xprt_release_write(xprt, task); 851 } 852 853 enum xprt_xid_rb_cmp { 854 XID_RB_EQUAL, 855 XID_RB_LEFT, 856 XID_RB_RIGHT, 857 }; 858 static enum xprt_xid_rb_cmp 859 xprt_xid_cmp(__be32 xid1, __be32 xid2) 860 { 861 if (xid1 == xid2) 862 return XID_RB_EQUAL; 863 if ((__force u32)xid1 < (__force u32)xid2) 864 return XID_RB_LEFT; 865 return XID_RB_RIGHT; 866 } 867 868 static struct rpc_rqst * 869 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 870 { 871 struct rb_node *n = xprt->recv_queue.rb_node; 872 struct rpc_rqst *req; 873 874 while (n != NULL) { 875 req = rb_entry(n, struct rpc_rqst, rq_recv); 876 switch (xprt_xid_cmp(xid, req->rq_xid)) { 877 case XID_RB_LEFT: 878 n = n->rb_left; 879 break; 880 case XID_RB_RIGHT: 881 n = n->rb_right; 882 break; 883 case XID_RB_EQUAL: 884 return req; 885 } 886 } 887 return NULL; 888 } 889 890 static void 891 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 892 { 893 struct rb_node **p = &xprt->recv_queue.rb_node; 894 struct rb_node *n = NULL; 895 struct rpc_rqst *req; 896 897 while (*p != NULL) { 898 n = *p; 899 req = rb_entry(n, struct rpc_rqst, rq_recv); 900 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 901 case XID_RB_LEFT: 902 p = &n->rb_left; 903 break; 904 case XID_RB_RIGHT: 905 p = &n->rb_right; 906 break; 907 case XID_RB_EQUAL: 908 WARN_ON_ONCE(new != req); 909 return; 910 } 911 } 912 rb_link_node(&new->rq_recv, n, p); 913 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 914 } 915 916 static void 917 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 918 { 919 rb_erase(&req->rq_recv, &xprt->recv_queue); 920 } 921 922 /** 923 * xprt_lookup_rqst - find an RPC request corresponding to an XID 924 * @xprt: transport on which the original request was transmitted 925 * @xid: RPC XID of incoming reply 926 * 927 * Caller holds xprt->queue_lock. 928 */ 929 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 930 { 931 struct rpc_rqst *entry; 932 933 entry = xprt_request_rb_find(xprt, xid); 934 if (entry != NULL) { 935 trace_xprt_lookup_rqst(xprt, xid, 0); 936 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 937 return entry; 938 } 939 940 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 941 ntohl(xid)); 942 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 943 xprt->stat.bad_xids++; 944 return NULL; 945 } 946 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 947 948 static bool 949 xprt_is_pinned_rqst(struct rpc_rqst *req) 950 { 951 return atomic_read(&req->rq_pin) != 0; 952 } 953 954 /** 955 * xprt_pin_rqst - Pin a request on the transport receive list 956 * @req: Request to pin 957 * 958 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 959 * so should be holding xprt->queue_lock. 960 */ 961 void xprt_pin_rqst(struct rpc_rqst *req) 962 { 963 atomic_inc(&req->rq_pin); 964 } 965 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 966 967 /** 968 * xprt_unpin_rqst - Unpin a request on the transport receive list 969 * @req: Request to pin 970 * 971 * Caller should be holding xprt->queue_lock. 972 */ 973 void xprt_unpin_rqst(struct rpc_rqst *req) 974 { 975 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 976 atomic_dec(&req->rq_pin); 977 return; 978 } 979 if (atomic_dec_and_test(&req->rq_pin)) 980 wake_up_var(&req->rq_pin); 981 } 982 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 983 984 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 985 { 986 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 987 } 988 989 static bool 990 xprt_request_data_received(struct rpc_task *task) 991 { 992 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 993 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 994 } 995 996 static bool 997 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 998 { 999 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1000 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1001 } 1002 1003 /** 1004 * xprt_request_enqueue_receive - Add an request to the receive queue 1005 * @task: RPC task 1006 * 1007 */ 1008 void 1009 xprt_request_enqueue_receive(struct rpc_task *task) 1010 { 1011 struct rpc_rqst *req = task->tk_rqstp; 1012 struct rpc_xprt *xprt = req->rq_xprt; 1013 1014 if (!xprt_request_need_enqueue_receive(task, req)) 1015 return; 1016 spin_lock(&xprt->queue_lock); 1017 1018 /* Update the softirq receive buffer */ 1019 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1020 sizeof(req->rq_private_buf)); 1021 1022 /* Add request to the receive list */ 1023 xprt_request_rb_insert(xprt, req); 1024 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1025 spin_unlock(&xprt->queue_lock); 1026 1027 /* Turn off autodisconnect */ 1028 del_singleshot_timer_sync(&xprt->timer); 1029 } 1030 1031 /** 1032 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1033 * @task: RPC task 1034 * 1035 * Caller must hold xprt->queue_lock. 1036 */ 1037 static void 1038 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1039 { 1040 struct rpc_rqst *req = task->tk_rqstp; 1041 1042 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1043 xprt_request_rb_remove(req->rq_xprt, req); 1044 } 1045 1046 /** 1047 * xprt_update_rtt - Update RPC RTT statistics 1048 * @task: RPC request that recently completed 1049 * 1050 * Caller holds xprt->queue_lock. 1051 */ 1052 void xprt_update_rtt(struct rpc_task *task) 1053 { 1054 struct rpc_rqst *req = task->tk_rqstp; 1055 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1056 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1057 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1058 1059 if (timer) { 1060 if (req->rq_ntrans == 1) 1061 rpc_update_rtt(rtt, timer, m); 1062 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1063 } 1064 } 1065 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1066 1067 /** 1068 * xprt_complete_rqst - called when reply processing is complete 1069 * @task: RPC request that recently completed 1070 * @copied: actual number of bytes received from the transport 1071 * 1072 * Caller holds xprt->queue_lock. 1073 */ 1074 void xprt_complete_rqst(struct rpc_task *task, int copied) 1075 { 1076 struct rpc_rqst *req = task->tk_rqstp; 1077 struct rpc_xprt *xprt = req->rq_xprt; 1078 1079 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1080 task->tk_pid, ntohl(req->rq_xid), copied); 1081 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1082 1083 xprt->stat.recvs++; 1084 1085 req->rq_private_buf.len = copied; 1086 /* Ensure all writes are done before we update */ 1087 /* req->rq_reply_bytes_recvd */ 1088 smp_wmb(); 1089 req->rq_reply_bytes_recvd = copied; 1090 xprt_request_dequeue_receive_locked(task); 1091 rpc_wake_up_queued_task(&xprt->pending, task); 1092 } 1093 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1094 1095 static void xprt_timer(struct rpc_task *task) 1096 { 1097 struct rpc_rqst *req = task->tk_rqstp; 1098 struct rpc_xprt *xprt = req->rq_xprt; 1099 1100 if (task->tk_status != -ETIMEDOUT) 1101 return; 1102 1103 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1104 if (!req->rq_reply_bytes_recvd) { 1105 if (xprt->ops->timer) 1106 xprt->ops->timer(xprt, task); 1107 } else 1108 task->tk_status = 0; 1109 } 1110 1111 /** 1112 * xprt_wait_for_reply_request_def - wait for reply 1113 * @task: pointer to rpc_task 1114 * 1115 * Set a request's retransmit timeout based on the transport's 1116 * default timeout parameters. Used by transports that don't adjust 1117 * the retransmit timeout based on round-trip time estimation, 1118 * and put the task to sleep on the pending queue. 1119 */ 1120 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1121 { 1122 struct rpc_rqst *req = task->tk_rqstp; 1123 1124 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1125 xprt_request_timeout(req)); 1126 } 1127 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1128 1129 /** 1130 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1131 * @task: pointer to rpc_task 1132 * 1133 * Set a request's retransmit timeout using the RTT estimator, 1134 * and put the task to sleep on the pending queue. 1135 */ 1136 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1137 { 1138 int timer = task->tk_msg.rpc_proc->p_timer; 1139 struct rpc_clnt *clnt = task->tk_client; 1140 struct rpc_rtt *rtt = clnt->cl_rtt; 1141 struct rpc_rqst *req = task->tk_rqstp; 1142 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1143 unsigned long timeout; 1144 1145 timeout = rpc_calc_rto(rtt, timer); 1146 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1147 if (timeout > max_timeout || timeout == 0) 1148 timeout = max_timeout; 1149 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1150 jiffies + timeout); 1151 } 1152 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1153 1154 /** 1155 * xprt_request_wait_receive - wait for the reply to an RPC request 1156 * @task: RPC task about to send a request 1157 * 1158 */ 1159 void xprt_request_wait_receive(struct rpc_task *task) 1160 { 1161 struct rpc_rqst *req = task->tk_rqstp; 1162 struct rpc_xprt *xprt = req->rq_xprt; 1163 1164 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1165 return; 1166 /* 1167 * Sleep on the pending queue if we're expecting a reply. 1168 * The spinlock ensures atomicity between the test of 1169 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1170 */ 1171 spin_lock(&xprt->queue_lock); 1172 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1173 xprt->ops->wait_for_reply_request(task); 1174 /* 1175 * Send an extra queue wakeup call if the 1176 * connection was dropped in case the call to 1177 * rpc_sleep_on() raced. 1178 */ 1179 if (xprt_request_retransmit_after_disconnect(task)) 1180 rpc_wake_up_queued_task_set_status(&xprt->pending, 1181 task, -ENOTCONN); 1182 } 1183 spin_unlock(&xprt->queue_lock); 1184 } 1185 1186 static bool 1187 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1188 { 1189 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1190 } 1191 1192 /** 1193 * xprt_request_enqueue_transmit - queue a task for transmission 1194 * @task: pointer to rpc_task 1195 * 1196 * Add a task to the transmission queue. 1197 */ 1198 void 1199 xprt_request_enqueue_transmit(struct rpc_task *task) 1200 { 1201 struct rpc_rqst *pos, *req = task->tk_rqstp; 1202 struct rpc_xprt *xprt = req->rq_xprt; 1203 1204 if (xprt_request_need_enqueue_transmit(task, req)) { 1205 req->rq_bytes_sent = 0; 1206 spin_lock(&xprt->queue_lock); 1207 /* 1208 * Requests that carry congestion control credits are added 1209 * to the head of the list to avoid starvation issues. 1210 */ 1211 if (req->rq_cong) { 1212 xprt_clear_congestion_window_wait(xprt); 1213 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1214 if (pos->rq_cong) 1215 continue; 1216 /* Note: req is added _before_ pos */ 1217 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1218 INIT_LIST_HEAD(&req->rq_xmit2); 1219 trace_xprt_enq_xmit(task, 1); 1220 goto out; 1221 } 1222 } else if (RPC_IS_SWAPPER(task)) { 1223 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1224 if (pos->rq_cong || pos->rq_bytes_sent) 1225 continue; 1226 if (RPC_IS_SWAPPER(pos->rq_task)) 1227 continue; 1228 /* Note: req is added _before_ pos */ 1229 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1230 INIT_LIST_HEAD(&req->rq_xmit2); 1231 trace_xprt_enq_xmit(task, 2); 1232 goto out; 1233 } 1234 } else if (!req->rq_seqno) { 1235 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1236 if (pos->rq_task->tk_owner != task->tk_owner) 1237 continue; 1238 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1239 INIT_LIST_HEAD(&req->rq_xmit); 1240 trace_xprt_enq_xmit(task, 3); 1241 goto out; 1242 } 1243 } 1244 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1245 INIT_LIST_HEAD(&req->rq_xmit2); 1246 trace_xprt_enq_xmit(task, 4); 1247 out: 1248 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1249 spin_unlock(&xprt->queue_lock); 1250 } 1251 } 1252 1253 /** 1254 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1255 * @task: pointer to rpc_task 1256 * 1257 * Remove a task from the transmission queue 1258 * Caller must hold xprt->queue_lock 1259 */ 1260 static void 1261 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1262 { 1263 struct rpc_rqst *req = task->tk_rqstp; 1264 1265 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1266 return; 1267 if (!list_empty(&req->rq_xmit)) { 1268 list_del(&req->rq_xmit); 1269 if (!list_empty(&req->rq_xmit2)) { 1270 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1271 struct rpc_rqst, rq_xmit2); 1272 list_del(&req->rq_xmit2); 1273 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1274 } 1275 } else 1276 list_del(&req->rq_xmit2); 1277 } 1278 1279 /** 1280 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1281 * @task: pointer to rpc_task 1282 * 1283 * Remove a task from the transmission queue 1284 */ 1285 static void 1286 xprt_request_dequeue_transmit(struct rpc_task *task) 1287 { 1288 struct rpc_rqst *req = task->tk_rqstp; 1289 struct rpc_xprt *xprt = req->rq_xprt; 1290 1291 spin_lock(&xprt->queue_lock); 1292 xprt_request_dequeue_transmit_locked(task); 1293 spin_unlock(&xprt->queue_lock); 1294 } 1295 1296 /** 1297 * xprt_request_prepare - prepare an encoded request for transport 1298 * @req: pointer to rpc_rqst 1299 * 1300 * Calls into the transport layer to do whatever is needed to prepare 1301 * the request for transmission or receive. 1302 */ 1303 void 1304 xprt_request_prepare(struct rpc_rqst *req) 1305 { 1306 struct rpc_xprt *xprt = req->rq_xprt; 1307 1308 if (xprt->ops->prepare_request) 1309 xprt->ops->prepare_request(req); 1310 } 1311 1312 /** 1313 * xprt_request_need_retransmit - Test if a task needs retransmission 1314 * @task: pointer to rpc_task 1315 * 1316 * Test for whether a connection breakage requires the task to retransmit 1317 */ 1318 bool 1319 xprt_request_need_retransmit(struct rpc_task *task) 1320 { 1321 return xprt_request_retransmit_after_disconnect(task); 1322 } 1323 1324 /** 1325 * xprt_prepare_transmit - reserve the transport before sending a request 1326 * @task: RPC task about to send a request 1327 * 1328 */ 1329 bool xprt_prepare_transmit(struct rpc_task *task) 1330 { 1331 struct rpc_rqst *req = task->tk_rqstp; 1332 struct rpc_xprt *xprt = req->rq_xprt; 1333 1334 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1335 1336 if (!xprt_lock_write(xprt, task)) { 1337 /* Race breaker: someone may have transmitted us */ 1338 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1339 rpc_wake_up_queued_task_set_status(&xprt->sending, 1340 task, 0); 1341 return false; 1342 1343 } 1344 return true; 1345 } 1346 1347 void xprt_end_transmit(struct rpc_task *task) 1348 { 1349 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1350 } 1351 1352 /** 1353 * xprt_request_transmit - send an RPC request on a transport 1354 * @req: pointer to request to transmit 1355 * @snd_task: RPC task that owns the transport lock 1356 * 1357 * This performs the transmission of a single request. 1358 * Note that if the request is not the same as snd_task, then it 1359 * does need to be pinned. 1360 * Returns '0' on success. 1361 */ 1362 static int 1363 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1364 { 1365 struct rpc_xprt *xprt = req->rq_xprt; 1366 struct rpc_task *task = req->rq_task; 1367 unsigned int connect_cookie; 1368 int is_retrans = RPC_WAS_SENT(task); 1369 int status; 1370 1371 if (!req->rq_bytes_sent) { 1372 if (xprt_request_data_received(task)) { 1373 status = 0; 1374 goto out_dequeue; 1375 } 1376 /* Verify that our message lies in the RPCSEC_GSS window */ 1377 if (rpcauth_xmit_need_reencode(task)) { 1378 status = -EBADMSG; 1379 goto out_dequeue; 1380 } 1381 if (task->tk_ops->rpc_call_prepare_transmit) { 1382 task->tk_ops->rpc_call_prepare_transmit(task, 1383 task->tk_calldata); 1384 status = task->tk_status; 1385 if (status < 0) 1386 goto out_dequeue; 1387 } 1388 if (RPC_SIGNALLED(task)) { 1389 status = -ERESTARTSYS; 1390 goto out_dequeue; 1391 } 1392 } 1393 1394 /* 1395 * Update req->rq_ntrans before transmitting to avoid races with 1396 * xprt_update_rtt(), which needs to know that it is recording a 1397 * reply to the first transmission. 1398 */ 1399 req->rq_ntrans++; 1400 1401 connect_cookie = xprt->connect_cookie; 1402 status = xprt->ops->send_request(req); 1403 if (status != 0) { 1404 req->rq_ntrans--; 1405 trace_xprt_transmit(req, status); 1406 return status; 1407 } 1408 1409 if (is_retrans) 1410 task->tk_client->cl_stats->rpcretrans++; 1411 1412 xprt_inject_disconnect(xprt); 1413 1414 task->tk_flags |= RPC_TASK_SENT; 1415 spin_lock_bh(&xprt->transport_lock); 1416 1417 xprt->stat.sends++; 1418 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1419 xprt->stat.bklog_u += xprt->backlog.qlen; 1420 xprt->stat.sending_u += xprt->sending.qlen; 1421 xprt->stat.pending_u += xprt->pending.qlen; 1422 spin_unlock_bh(&xprt->transport_lock); 1423 1424 req->rq_connect_cookie = connect_cookie; 1425 out_dequeue: 1426 trace_xprt_transmit(req, status); 1427 xprt_request_dequeue_transmit(task); 1428 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1429 return status; 1430 } 1431 1432 /** 1433 * xprt_transmit - send an RPC request on a transport 1434 * @task: controlling RPC task 1435 * 1436 * Attempts to drain the transmit queue. On exit, either the transport 1437 * signalled an error that needs to be handled before transmission can 1438 * resume, or @task finished transmitting, and detected that it already 1439 * received a reply. 1440 */ 1441 void 1442 xprt_transmit(struct rpc_task *task) 1443 { 1444 struct rpc_rqst *next, *req = task->tk_rqstp; 1445 struct rpc_xprt *xprt = req->rq_xprt; 1446 int status; 1447 1448 spin_lock(&xprt->queue_lock); 1449 while (!list_empty(&xprt->xmit_queue)) { 1450 next = list_first_entry(&xprt->xmit_queue, 1451 struct rpc_rqst, rq_xmit); 1452 xprt_pin_rqst(next); 1453 spin_unlock(&xprt->queue_lock); 1454 status = xprt_request_transmit(next, task); 1455 if (status == -EBADMSG && next != req) 1456 status = 0; 1457 cond_resched(); 1458 spin_lock(&xprt->queue_lock); 1459 xprt_unpin_rqst(next); 1460 if (status == 0) { 1461 if (!xprt_request_data_received(task) || 1462 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1463 continue; 1464 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1465 task->tk_status = status; 1466 break; 1467 } 1468 spin_unlock(&xprt->queue_lock); 1469 } 1470 1471 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1472 { 1473 set_bit(XPRT_CONGESTED, &xprt->state); 1474 rpc_sleep_on(&xprt->backlog, task, NULL); 1475 } 1476 1477 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1478 { 1479 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1480 clear_bit(XPRT_CONGESTED, &xprt->state); 1481 } 1482 1483 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1484 { 1485 bool ret = false; 1486 1487 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1488 goto out; 1489 spin_lock(&xprt->reserve_lock); 1490 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1491 rpc_sleep_on(&xprt->backlog, task, NULL); 1492 ret = true; 1493 } 1494 spin_unlock(&xprt->reserve_lock); 1495 out: 1496 return ret; 1497 } 1498 1499 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1500 { 1501 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1502 1503 if (xprt->num_reqs >= xprt->max_reqs) 1504 goto out; 1505 ++xprt->num_reqs; 1506 spin_unlock(&xprt->reserve_lock); 1507 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1508 spin_lock(&xprt->reserve_lock); 1509 if (req != NULL) 1510 goto out; 1511 --xprt->num_reqs; 1512 req = ERR_PTR(-ENOMEM); 1513 out: 1514 return req; 1515 } 1516 1517 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1518 { 1519 if (xprt->num_reqs > xprt->min_reqs) { 1520 --xprt->num_reqs; 1521 kfree(req); 1522 return true; 1523 } 1524 return false; 1525 } 1526 1527 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1528 { 1529 struct rpc_rqst *req; 1530 1531 spin_lock(&xprt->reserve_lock); 1532 if (!list_empty(&xprt->free)) { 1533 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1534 list_del(&req->rq_list); 1535 goto out_init_req; 1536 } 1537 req = xprt_dynamic_alloc_slot(xprt); 1538 if (!IS_ERR(req)) 1539 goto out_init_req; 1540 switch (PTR_ERR(req)) { 1541 case -ENOMEM: 1542 dprintk("RPC: dynamic allocation of request slot " 1543 "failed! Retrying\n"); 1544 task->tk_status = -ENOMEM; 1545 break; 1546 case -EAGAIN: 1547 xprt_add_backlog(xprt, task); 1548 dprintk("RPC: waiting for request slot\n"); 1549 /* fall through */ 1550 default: 1551 task->tk_status = -EAGAIN; 1552 } 1553 spin_unlock(&xprt->reserve_lock); 1554 return; 1555 out_init_req: 1556 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1557 xprt->num_reqs); 1558 spin_unlock(&xprt->reserve_lock); 1559 1560 task->tk_status = 0; 1561 task->tk_rqstp = req; 1562 } 1563 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1564 1565 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1566 { 1567 spin_lock(&xprt->reserve_lock); 1568 if (!xprt_dynamic_free_slot(xprt, req)) { 1569 memset(req, 0, sizeof(*req)); /* mark unused */ 1570 list_add(&req->rq_list, &xprt->free); 1571 } 1572 xprt_wake_up_backlog(xprt); 1573 spin_unlock(&xprt->reserve_lock); 1574 } 1575 EXPORT_SYMBOL_GPL(xprt_free_slot); 1576 1577 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1578 { 1579 struct rpc_rqst *req; 1580 while (!list_empty(&xprt->free)) { 1581 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1582 list_del(&req->rq_list); 1583 kfree(req); 1584 } 1585 } 1586 1587 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1588 unsigned int num_prealloc, 1589 unsigned int max_alloc) 1590 { 1591 struct rpc_xprt *xprt; 1592 struct rpc_rqst *req; 1593 int i; 1594 1595 xprt = kzalloc(size, GFP_KERNEL); 1596 if (xprt == NULL) 1597 goto out; 1598 1599 xprt_init(xprt, net); 1600 1601 for (i = 0; i < num_prealloc; i++) { 1602 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1603 if (!req) 1604 goto out_free; 1605 list_add(&req->rq_list, &xprt->free); 1606 } 1607 if (max_alloc > num_prealloc) 1608 xprt->max_reqs = max_alloc; 1609 else 1610 xprt->max_reqs = num_prealloc; 1611 xprt->min_reqs = num_prealloc; 1612 xprt->num_reqs = num_prealloc; 1613 1614 return xprt; 1615 1616 out_free: 1617 xprt_free(xprt); 1618 out: 1619 return NULL; 1620 } 1621 EXPORT_SYMBOL_GPL(xprt_alloc); 1622 1623 void xprt_free(struct rpc_xprt *xprt) 1624 { 1625 put_net(xprt->xprt_net); 1626 xprt_free_all_slots(xprt); 1627 kfree_rcu(xprt, rcu); 1628 } 1629 EXPORT_SYMBOL_GPL(xprt_free); 1630 1631 static void 1632 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1633 { 1634 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1635 } 1636 1637 static __be32 1638 xprt_alloc_xid(struct rpc_xprt *xprt) 1639 { 1640 __be32 xid; 1641 1642 spin_lock(&xprt->reserve_lock); 1643 xid = (__force __be32)xprt->xid++; 1644 spin_unlock(&xprt->reserve_lock); 1645 return xid; 1646 } 1647 1648 static void 1649 xprt_init_xid(struct rpc_xprt *xprt) 1650 { 1651 xprt->xid = prandom_u32(); 1652 } 1653 1654 static void 1655 xprt_request_init(struct rpc_task *task) 1656 { 1657 struct rpc_xprt *xprt = task->tk_xprt; 1658 struct rpc_rqst *req = task->tk_rqstp; 1659 1660 req->rq_task = task; 1661 req->rq_xprt = xprt; 1662 req->rq_buffer = NULL; 1663 req->rq_xid = xprt_alloc_xid(xprt); 1664 xprt_init_connect_cookie(req, xprt); 1665 req->rq_snd_buf.len = 0; 1666 req->rq_snd_buf.buflen = 0; 1667 req->rq_rcv_buf.len = 0; 1668 req->rq_rcv_buf.buflen = 0; 1669 req->rq_snd_buf.bvec = NULL; 1670 req->rq_rcv_buf.bvec = NULL; 1671 req->rq_release_snd_buf = NULL; 1672 xprt_init_majortimeo(task, req); 1673 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1674 req, ntohl(req->rq_xid)); 1675 } 1676 1677 static void 1678 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1679 { 1680 xprt->ops->alloc_slot(xprt, task); 1681 if (task->tk_rqstp != NULL) 1682 xprt_request_init(task); 1683 } 1684 1685 /** 1686 * xprt_reserve - allocate an RPC request slot 1687 * @task: RPC task requesting a slot allocation 1688 * 1689 * If the transport is marked as being congested, or if no more 1690 * slots are available, place the task on the transport's 1691 * backlog queue. 1692 */ 1693 void xprt_reserve(struct rpc_task *task) 1694 { 1695 struct rpc_xprt *xprt = task->tk_xprt; 1696 1697 task->tk_status = 0; 1698 if (task->tk_rqstp != NULL) 1699 return; 1700 1701 task->tk_status = -EAGAIN; 1702 if (!xprt_throttle_congested(xprt, task)) 1703 xprt_do_reserve(xprt, task); 1704 } 1705 1706 /** 1707 * xprt_retry_reserve - allocate an RPC request slot 1708 * @task: RPC task requesting a slot allocation 1709 * 1710 * If no more slots are available, place the task on the transport's 1711 * backlog queue. 1712 * Note that the only difference with xprt_reserve is that we now 1713 * ignore the value of the XPRT_CONGESTED flag. 1714 */ 1715 void xprt_retry_reserve(struct rpc_task *task) 1716 { 1717 struct rpc_xprt *xprt = task->tk_xprt; 1718 1719 task->tk_status = 0; 1720 if (task->tk_rqstp != NULL) 1721 return; 1722 1723 task->tk_status = -EAGAIN; 1724 xprt_do_reserve(xprt, task); 1725 } 1726 1727 static void 1728 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) 1729 { 1730 struct rpc_xprt *xprt = req->rq_xprt; 1731 1732 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1733 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1734 xprt_is_pinned_rqst(req)) { 1735 spin_lock(&xprt->queue_lock); 1736 xprt_request_dequeue_transmit_locked(task); 1737 xprt_request_dequeue_receive_locked(task); 1738 while (xprt_is_pinned_rqst(req)) { 1739 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1740 spin_unlock(&xprt->queue_lock); 1741 xprt_wait_on_pinned_rqst(req); 1742 spin_lock(&xprt->queue_lock); 1743 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1744 } 1745 spin_unlock(&xprt->queue_lock); 1746 } 1747 } 1748 1749 /** 1750 * xprt_release - release an RPC request slot 1751 * @task: task which is finished with the slot 1752 * 1753 */ 1754 void xprt_release(struct rpc_task *task) 1755 { 1756 struct rpc_xprt *xprt; 1757 struct rpc_rqst *req = task->tk_rqstp; 1758 1759 if (req == NULL) { 1760 if (task->tk_client) { 1761 xprt = task->tk_xprt; 1762 xprt_release_write(xprt, task); 1763 } 1764 return; 1765 } 1766 1767 xprt = req->rq_xprt; 1768 if (task->tk_ops->rpc_count_stats != NULL) 1769 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1770 else if (task->tk_client) 1771 rpc_count_iostats(task, task->tk_client->cl_metrics); 1772 xprt_request_dequeue_all(task, req); 1773 spin_lock_bh(&xprt->transport_lock); 1774 xprt->ops->release_xprt(xprt, task); 1775 if (xprt->ops->release_request) 1776 xprt->ops->release_request(task); 1777 xprt->last_used = jiffies; 1778 xprt_schedule_autodisconnect(xprt); 1779 spin_unlock_bh(&xprt->transport_lock); 1780 if (req->rq_buffer) 1781 xprt->ops->buf_free(task); 1782 xprt_inject_disconnect(xprt); 1783 xdr_free_bvec(&req->rq_rcv_buf); 1784 xdr_free_bvec(&req->rq_snd_buf); 1785 if (req->rq_cred != NULL) 1786 put_rpccred(req->rq_cred); 1787 task->tk_rqstp = NULL; 1788 if (req->rq_release_snd_buf) 1789 req->rq_release_snd_buf(req); 1790 1791 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1792 if (likely(!bc_prealloc(req))) 1793 xprt->ops->free_slot(xprt, req); 1794 else 1795 xprt_free_bc_request(req); 1796 } 1797 1798 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1799 void 1800 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1801 { 1802 struct xdr_buf *xbufp = &req->rq_snd_buf; 1803 1804 task->tk_rqstp = req; 1805 req->rq_task = task; 1806 xprt_init_connect_cookie(req, req->rq_xprt); 1807 /* 1808 * Set up the xdr_buf length. 1809 * This also indicates that the buffer is XDR encoded already. 1810 */ 1811 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1812 xbufp->tail[0].iov_len; 1813 } 1814 #endif 1815 1816 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1817 { 1818 kref_init(&xprt->kref); 1819 1820 spin_lock_init(&xprt->transport_lock); 1821 spin_lock_init(&xprt->reserve_lock); 1822 spin_lock_init(&xprt->queue_lock); 1823 1824 INIT_LIST_HEAD(&xprt->free); 1825 xprt->recv_queue = RB_ROOT; 1826 INIT_LIST_HEAD(&xprt->xmit_queue); 1827 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1828 spin_lock_init(&xprt->bc_pa_lock); 1829 INIT_LIST_HEAD(&xprt->bc_pa_list); 1830 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1831 INIT_LIST_HEAD(&xprt->xprt_switch); 1832 1833 xprt->last_used = jiffies; 1834 xprt->cwnd = RPC_INITCWND; 1835 xprt->bind_index = 0; 1836 1837 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1838 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1839 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1840 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1841 1842 xprt_init_xid(xprt); 1843 1844 xprt->xprt_net = get_net(net); 1845 } 1846 1847 /** 1848 * xprt_create_transport - create an RPC transport 1849 * @args: rpc transport creation arguments 1850 * 1851 */ 1852 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1853 { 1854 struct rpc_xprt *xprt; 1855 struct xprt_class *t; 1856 1857 spin_lock(&xprt_list_lock); 1858 list_for_each_entry(t, &xprt_list, list) { 1859 if (t->ident == args->ident) { 1860 spin_unlock(&xprt_list_lock); 1861 goto found; 1862 } 1863 } 1864 spin_unlock(&xprt_list_lock); 1865 dprintk("RPC: transport (%d) not supported\n", args->ident); 1866 return ERR_PTR(-EIO); 1867 1868 found: 1869 xprt = t->setup(args); 1870 if (IS_ERR(xprt)) { 1871 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1872 -PTR_ERR(xprt)); 1873 goto out; 1874 } 1875 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1876 xprt->idle_timeout = 0; 1877 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1878 if (xprt_has_timer(xprt)) 1879 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1880 else 1881 timer_setup(&xprt->timer, NULL, 0); 1882 1883 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1884 xprt_destroy(xprt); 1885 return ERR_PTR(-EINVAL); 1886 } 1887 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1888 if (xprt->servername == NULL) { 1889 xprt_destroy(xprt); 1890 return ERR_PTR(-ENOMEM); 1891 } 1892 1893 rpc_xprt_debugfs_register(xprt); 1894 1895 dprintk("RPC: created transport %p with %u slots\n", xprt, 1896 xprt->max_reqs); 1897 out: 1898 return xprt; 1899 } 1900 1901 static void xprt_destroy_cb(struct work_struct *work) 1902 { 1903 struct rpc_xprt *xprt = 1904 container_of(work, struct rpc_xprt, task_cleanup); 1905 1906 rpc_xprt_debugfs_unregister(xprt); 1907 rpc_destroy_wait_queue(&xprt->binding); 1908 rpc_destroy_wait_queue(&xprt->pending); 1909 rpc_destroy_wait_queue(&xprt->sending); 1910 rpc_destroy_wait_queue(&xprt->backlog); 1911 kfree(xprt->servername); 1912 /* 1913 * Tear down transport state and free the rpc_xprt 1914 */ 1915 xprt->ops->destroy(xprt); 1916 } 1917 1918 /** 1919 * xprt_destroy - destroy an RPC transport, killing off all requests. 1920 * @xprt: transport to destroy 1921 * 1922 */ 1923 static void xprt_destroy(struct rpc_xprt *xprt) 1924 { 1925 dprintk("RPC: destroying transport %p\n", xprt); 1926 1927 /* 1928 * Exclude transport connect/disconnect handlers and autoclose 1929 */ 1930 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1931 1932 del_timer_sync(&xprt->timer); 1933 1934 /* 1935 * Destroy sockets etc from the system workqueue so they can 1936 * safely flush receive work running on rpciod. 1937 */ 1938 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1939 schedule_work(&xprt->task_cleanup); 1940 } 1941 1942 static void xprt_destroy_kref(struct kref *kref) 1943 { 1944 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1945 } 1946 1947 /** 1948 * xprt_get - return a reference to an RPC transport. 1949 * @xprt: pointer to the transport 1950 * 1951 */ 1952 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1953 { 1954 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1955 return xprt; 1956 return NULL; 1957 } 1958 EXPORT_SYMBOL_GPL(xprt_get); 1959 1960 /** 1961 * xprt_put - release a reference to an RPC transport. 1962 * @xprt: pointer to the transport 1963 * 1964 */ 1965 void xprt_put(struct rpc_xprt *xprt) 1966 { 1967 if (xprt != NULL) 1968 kref_put(&xprt->kref, xprt_destroy_kref); 1969 } 1970 EXPORT_SYMBOL_GPL(xprt_put); 1971