1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 59 /* 60 * Local variables 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_XPRT 65 #endif 66 67 /* 68 * Local functions 69 */ 70 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 71 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 72 static void xprt_destroy(struct rpc_xprt *xprt); 73 74 static DEFINE_SPINLOCK(xprt_list_lock); 75 static LIST_HEAD(xprt_list); 76 77 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 78 { 79 unsigned long timeout = jiffies + req->rq_timeout; 80 81 if (time_before(timeout, req->rq_majortimeo)) 82 return timeout; 83 return req->rq_majortimeo; 84 } 85 86 /** 87 * xprt_register_transport - register a transport implementation 88 * @transport: transport to register 89 * 90 * If a transport implementation is loaded as a kernel module, it can 91 * call this interface to make itself known to the RPC client. 92 * 93 * Returns: 94 * 0: transport successfully registered 95 * -EEXIST: transport already registered 96 * -EINVAL: transport module being unloaded 97 */ 98 int xprt_register_transport(struct xprt_class *transport) 99 { 100 struct xprt_class *t; 101 int result; 102 103 result = -EEXIST; 104 spin_lock(&xprt_list_lock); 105 list_for_each_entry(t, &xprt_list, list) { 106 /* don't register the same transport class twice */ 107 if (t->ident == transport->ident) 108 goto out; 109 } 110 111 list_add_tail(&transport->list, &xprt_list); 112 printk(KERN_INFO "RPC: Registered %s transport module.\n", 113 transport->name); 114 result = 0; 115 116 out: 117 spin_unlock(&xprt_list_lock); 118 return result; 119 } 120 EXPORT_SYMBOL_GPL(xprt_register_transport); 121 122 /** 123 * xprt_unregister_transport - unregister a transport implementation 124 * @transport: transport to unregister 125 * 126 * Returns: 127 * 0: transport successfully unregistered 128 * -ENOENT: transport never registered 129 */ 130 int xprt_unregister_transport(struct xprt_class *transport) 131 { 132 struct xprt_class *t; 133 int result; 134 135 result = 0; 136 spin_lock(&xprt_list_lock); 137 list_for_each_entry(t, &xprt_list, list) { 138 if (t == transport) { 139 printk(KERN_INFO 140 "RPC: Unregistered %s transport module.\n", 141 transport->name); 142 list_del_init(&transport->list); 143 goto out; 144 } 145 } 146 result = -ENOENT; 147 148 out: 149 spin_unlock(&xprt_list_lock); 150 return result; 151 } 152 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 153 154 /** 155 * xprt_load_transport - load a transport implementation 156 * @transport_name: transport to load 157 * 158 * Returns: 159 * 0: transport successfully loaded 160 * -ENOENT: transport module not available 161 */ 162 int xprt_load_transport(const char *transport_name) 163 { 164 struct xprt_class *t; 165 int result; 166 167 result = 0; 168 spin_lock(&xprt_list_lock); 169 list_for_each_entry(t, &xprt_list, list) { 170 if (strcmp(t->name, transport_name) == 0) { 171 spin_unlock(&xprt_list_lock); 172 goto out; 173 } 174 } 175 spin_unlock(&xprt_list_lock); 176 result = request_module("xprt%s", transport_name); 177 out: 178 return result; 179 } 180 EXPORT_SYMBOL_GPL(xprt_load_transport); 181 182 static void xprt_clear_locked(struct rpc_xprt *xprt) 183 { 184 xprt->snd_task = NULL; 185 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 186 smp_mb__before_atomic(); 187 clear_bit(XPRT_LOCKED, &xprt->state); 188 smp_mb__after_atomic(); 189 } else 190 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 191 } 192 193 /** 194 * xprt_reserve_xprt - serialize write access to transports 195 * @task: task that is requesting access to the transport 196 * @xprt: pointer to the target transport 197 * 198 * This prevents mixing the payload of separate requests, and prevents 199 * transport connects from colliding with writes. No congestion control 200 * is provided. 201 */ 202 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 203 { 204 struct rpc_rqst *req = task->tk_rqstp; 205 206 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 207 if (task == xprt->snd_task) 208 return 1; 209 goto out_sleep; 210 } 211 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 212 goto out_unlock; 213 xprt->snd_task = task; 214 215 return 1; 216 217 out_unlock: 218 xprt_clear_locked(xprt); 219 out_sleep: 220 dprintk("RPC: %5u failed to lock transport %p\n", 221 task->tk_pid, xprt); 222 task->tk_status = -EAGAIN; 223 if (RPC_IS_SOFT(task)) 224 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 225 xprt_request_timeout(req)); 226 else 227 rpc_sleep_on(&xprt->sending, task, NULL); 228 return 0; 229 } 230 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 231 232 static bool 233 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 234 { 235 return test_bit(XPRT_CWND_WAIT, &xprt->state); 236 } 237 238 static void 239 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 240 { 241 if (!list_empty(&xprt->xmit_queue)) { 242 /* Peek at head of queue to see if it can make progress */ 243 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 244 rq_xmit)->rq_cong) 245 return; 246 } 247 set_bit(XPRT_CWND_WAIT, &xprt->state); 248 } 249 250 static void 251 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 252 { 253 if (!RPCXPRT_CONGESTED(xprt)) 254 clear_bit(XPRT_CWND_WAIT, &xprt->state); 255 } 256 257 /* 258 * xprt_reserve_xprt_cong - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * 261 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 262 * integrated into the decision of whether a request is allowed to be 263 * woken up and given access to the transport. 264 * Note that the lock is only granted if we know there are free slots. 265 */ 266 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 return 1; 273 goto out_sleep; 274 } 275 if (req == NULL) { 276 xprt->snd_task = task; 277 return 1; 278 } 279 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 280 goto out_unlock; 281 if (!xprt_need_congestion_window_wait(xprt)) { 282 xprt->snd_task = task; 283 return 1; 284 } 285 out_unlock: 286 xprt_clear_locked(xprt); 287 out_sleep: 288 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 289 task->tk_status = -EAGAIN; 290 if (RPC_IS_SOFT(task)) 291 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 292 xprt_request_timeout(req)); 293 else 294 rpc_sleep_on(&xprt->sending, task, NULL); 295 return 0; 296 } 297 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 298 299 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 300 { 301 int retval; 302 303 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 304 return 1; 305 spin_lock(&xprt->transport_lock); 306 retval = xprt->ops->reserve_xprt(xprt, task); 307 spin_unlock(&xprt->transport_lock); 308 return retval; 309 } 310 311 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 312 { 313 struct rpc_xprt *xprt = data; 314 315 xprt->snd_task = task; 316 return true; 317 } 318 319 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 320 { 321 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 322 return; 323 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 324 goto out_unlock; 325 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 326 __xprt_lock_write_func, xprt)) 327 return; 328 out_unlock: 329 xprt_clear_locked(xprt); 330 } 331 332 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 333 { 334 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 335 return; 336 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 337 goto out_unlock; 338 if (xprt_need_congestion_window_wait(xprt)) 339 goto out_unlock; 340 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 341 __xprt_lock_write_func, xprt)) 342 return; 343 out_unlock: 344 xprt_clear_locked(xprt); 345 } 346 347 /** 348 * xprt_release_xprt - allow other requests to use a transport 349 * @xprt: transport with other tasks potentially waiting 350 * @task: task that is releasing access to the transport 351 * 352 * Note that "task" can be NULL. No congestion control is provided. 353 */ 354 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 355 { 356 if (xprt->snd_task == task) { 357 xprt_clear_locked(xprt); 358 __xprt_lock_write_next(xprt); 359 } 360 } 361 EXPORT_SYMBOL_GPL(xprt_release_xprt); 362 363 /** 364 * xprt_release_xprt_cong - allow other requests to use a transport 365 * @xprt: transport with other tasks potentially waiting 366 * @task: task that is releasing access to the transport 367 * 368 * Note that "task" can be NULL. Another task is awoken to use the 369 * transport if the transport's congestion window allows it. 370 */ 371 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 372 { 373 if (xprt->snd_task == task) { 374 xprt_clear_locked(xprt); 375 __xprt_lock_write_next_cong(xprt); 376 } 377 } 378 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 379 380 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 381 { 382 if (xprt->snd_task != task) 383 return; 384 spin_lock(&xprt->transport_lock); 385 xprt->ops->release_xprt(xprt, task); 386 spin_unlock(&xprt->transport_lock); 387 } 388 389 /* 390 * Van Jacobson congestion avoidance. Check if the congestion window 391 * overflowed. Put the task to sleep if this is the case. 392 */ 393 static int 394 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 395 { 396 if (req->rq_cong) 397 return 1; 398 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 399 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 400 if (RPCXPRT_CONGESTED(xprt)) { 401 xprt_set_congestion_window_wait(xprt); 402 return 0; 403 } 404 req->rq_cong = 1; 405 xprt->cong += RPC_CWNDSCALE; 406 return 1; 407 } 408 409 /* 410 * Adjust the congestion window, and wake up the next task 411 * that has been sleeping due to congestion 412 */ 413 static void 414 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 415 { 416 if (!req->rq_cong) 417 return; 418 req->rq_cong = 0; 419 xprt->cong -= RPC_CWNDSCALE; 420 xprt_test_and_clear_congestion_window_wait(xprt); 421 __xprt_lock_write_next_cong(xprt); 422 } 423 424 /** 425 * xprt_request_get_cong - Request congestion control credits 426 * @xprt: pointer to transport 427 * @req: pointer to RPC request 428 * 429 * Useful for transports that require congestion control. 430 */ 431 bool 432 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 433 { 434 bool ret = false; 435 436 if (req->rq_cong) 437 return true; 438 spin_lock(&xprt->transport_lock); 439 ret = __xprt_get_cong(xprt, req) != 0; 440 spin_unlock(&xprt->transport_lock); 441 return ret; 442 } 443 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 444 445 /** 446 * xprt_release_rqst_cong - housekeeping when request is complete 447 * @task: RPC request that recently completed 448 * 449 * Useful for transports that require congestion control. 450 */ 451 void xprt_release_rqst_cong(struct rpc_task *task) 452 { 453 struct rpc_rqst *req = task->tk_rqstp; 454 455 __xprt_put_cong(req->rq_xprt, req); 456 } 457 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 458 459 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 460 { 461 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 462 __xprt_lock_write_next_cong(xprt); 463 } 464 465 /* 466 * Clear the congestion window wait flag and wake up the next 467 * entry on xprt->sending 468 */ 469 static void 470 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 471 { 472 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 473 spin_lock(&xprt->transport_lock); 474 __xprt_lock_write_next_cong(xprt); 475 spin_unlock(&xprt->transport_lock); 476 } 477 } 478 479 /** 480 * xprt_adjust_cwnd - adjust transport congestion window 481 * @xprt: pointer to xprt 482 * @task: recently completed RPC request used to adjust window 483 * @result: result code of completed RPC request 484 * 485 * The transport code maintains an estimate on the maximum number of out- 486 * standing RPC requests, using a smoothed version of the congestion 487 * avoidance implemented in 44BSD. This is basically the Van Jacobson 488 * congestion algorithm: If a retransmit occurs, the congestion window is 489 * halved; otherwise, it is incremented by 1/cwnd when 490 * 491 * - a reply is received and 492 * - a full number of requests are outstanding and 493 * - the congestion window hasn't been updated recently. 494 */ 495 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 496 { 497 struct rpc_rqst *req = task->tk_rqstp; 498 unsigned long cwnd = xprt->cwnd; 499 500 if (result >= 0 && cwnd <= xprt->cong) { 501 /* The (cwnd >> 1) term makes sure 502 * the result gets rounded properly. */ 503 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 504 if (cwnd > RPC_MAXCWND(xprt)) 505 cwnd = RPC_MAXCWND(xprt); 506 __xprt_lock_write_next_cong(xprt); 507 } else if (result == -ETIMEDOUT) { 508 cwnd >>= 1; 509 if (cwnd < RPC_CWNDSCALE) 510 cwnd = RPC_CWNDSCALE; 511 } 512 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 513 xprt->cong, xprt->cwnd, cwnd); 514 xprt->cwnd = cwnd; 515 __xprt_put_cong(xprt, req); 516 } 517 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 518 519 /** 520 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 521 * @xprt: transport with waiting tasks 522 * @status: result code to plant in each task before waking it 523 * 524 */ 525 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 526 { 527 if (status < 0) 528 rpc_wake_up_status(&xprt->pending, status); 529 else 530 rpc_wake_up(&xprt->pending); 531 } 532 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 533 534 /** 535 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 536 * @xprt: transport 537 * 538 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 539 * we don't in general want to force a socket disconnection due to 540 * an incomplete RPC call transmission. 541 */ 542 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 543 { 544 set_bit(XPRT_WRITE_SPACE, &xprt->state); 545 } 546 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 547 548 static bool 549 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 550 { 551 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 552 __xprt_lock_write_next(xprt); 553 dprintk("RPC: write space: waking waiting task on " 554 "xprt %p\n", xprt); 555 return true; 556 } 557 return false; 558 } 559 560 /** 561 * xprt_write_space - wake the task waiting for transport output buffer space 562 * @xprt: transport with waiting tasks 563 * 564 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 565 */ 566 bool xprt_write_space(struct rpc_xprt *xprt) 567 { 568 bool ret; 569 570 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 571 return false; 572 spin_lock(&xprt->transport_lock); 573 ret = xprt_clear_write_space_locked(xprt); 574 spin_unlock(&xprt->transport_lock); 575 return ret; 576 } 577 EXPORT_SYMBOL_GPL(xprt_write_space); 578 579 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 580 { 581 s64 delta = ktime_to_ns(ktime_get() - abstime); 582 return likely(delta >= 0) ? 583 jiffies - nsecs_to_jiffies(delta) : 584 jiffies + nsecs_to_jiffies(-delta); 585 } 586 587 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 588 { 589 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 590 unsigned long majortimeo = req->rq_timeout; 591 592 if (to->to_exponential) 593 majortimeo <<= to->to_retries; 594 else 595 majortimeo += to->to_increment * to->to_retries; 596 if (majortimeo > to->to_maxval || majortimeo == 0) 597 majortimeo = to->to_maxval; 598 return majortimeo; 599 } 600 601 static void xprt_reset_majortimeo(struct rpc_rqst *req) 602 { 603 req->rq_majortimeo += xprt_calc_majortimeo(req); 604 } 605 606 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 607 { 608 unsigned long time_init; 609 struct rpc_xprt *xprt = req->rq_xprt; 610 611 if (likely(xprt && xprt_connected(xprt))) 612 time_init = jiffies; 613 else 614 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 615 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 616 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 617 } 618 619 /** 620 * xprt_adjust_timeout - adjust timeout values for next retransmit 621 * @req: RPC request containing parameters to use for the adjustment 622 * 623 */ 624 int xprt_adjust_timeout(struct rpc_rqst *req) 625 { 626 struct rpc_xprt *xprt = req->rq_xprt; 627 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 628 int status = 0; 629 630 if (time_before(jiffies, req->rq_majortimeo)) { 631 if (to->to_exponential) 632 req->rq_timeout <<= 1; 633 else 634 req->rq_timeout += to->to_increment; 635 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 636 req->rq_timeout = to->to_maxval; 637 req->rq_retries++; 638 } else { 639 req->rq_timeout = to->to_initval; 640 req->rq_retries = 0; 641 xprt_reset_majortimeo(req); 642 /* Reset the RTT counters == "slow start" */ 643 spin_lock(&xprt->transport_lock); 644 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 645 spin_unlock(&xprt->transport_lock); 646 status = -ETIMEDOUT; 647 } 648 649 if (req->rq_timeout == 0) { 650 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 651 req->rq_timeout = 5 * HZ; 652 } 653 return status; 654 } 655 656 static void xprt_autoclose(struct work_struct *work) 657 { 658 struct rpc_xprt *xprt = 659 container_of(work, struct rpc_xprt, task_cleanup); 660 unsigned int pflags = memalloc_nofs_save(); 661 662 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 663 xprt->ops->close(xprt); 664 xprt_release_write(xprt, NULL); 665 wake_up_bit(&xprt->state, XPRT_LOCKED); 666 memalloc_nofs_restore(pflags); 667 } 668 669 /** 670 * xprt_disconnect_done - mark a transport as disconnected 671 * @xprt: transport to flag for disconnect 672 * 673 */ 674 void xprt_disconnect_done(struct rpc_xprt *xprt) 675 { 676 dprintk("RPC: disconnected transport %p\n", xprt); 677 spin_lock(&xprt->transport_lock); 678 xprt_clear_connected(xprt); 679 xprt_clear_write_space_locked(xprt); 680 xprt_clear_congestion_window_wait_locked(xprt); 681 xprt_wake_pending_tasks(xprt, -ENOTCONN); 682 spin_unlock(&xprt->transport_lock); 683 } 684 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 685 686 /** 687 * xprt_force_disconnect - force a transport to disconnect 688 * @xprt: transport to disconnect 689 * 690 */ 691 void xprt_force_disconnect(struct rpc_xprt *xprt) 692 { 693 /* Don't race with the test_bit() in xprt_clear_locked() */ 694 spin_lock(&xprt->transport_lock); 695 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 696 /* Try to schedule an autoclose RPC call */ 697 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 698 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 699 else if (xprt->snd_task) 700 rpc_wake_up_queued_task_set_status(&xprt->pending, 701 xprt->snd_task, -ENOTCONN); 702 spin_unlock(&xprt->transport_lock); 703 } 704 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 705 706 static unsigned int 707 xprt_connect_cookie(struct rpc_xprt *xprt) 708 { 709 return READ_ONCE(xprt->connect_cookie); 710 } 711 712 static bool 713 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 714 { 715 struct rpc_rqst *req = task->tk_rqstp; 716 struct rpc_xprt *xprt = req->rq_xprt; 717 718 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 719 !xprt_connected(xprt); 720 } 721 722 /** 723 * xprt_conditional_disconnect - force a transport to disconnect 724 * @xprt: transport to disconnect 725 * @cookie: 'connection cookie' 726 * 727 * This attempts to break the connection if and only if 'cookie' matches 728 * the current transport 'connection cookie'. It ensures that we don't 729 * try to break the connection more than once when we need to retransmit 730 * a batch of RPC requests. 731 * 732 */ 733 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 734 { 735 /* Don't race with the test_bit() in xprt_clear_locked() */ 736 spin_lock(&xprt->transport_lock); 737 if (cookie != xprt->connect_cookie) 738 goto out; 739 if (test_bit(XPRT_CLOSING, &xprt->state)) 740 goto out; 741 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 742 /* Try to schedule an autoclose RPC call */ 743 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 744 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 745 xprt_wake_pending_tasks(xprt, -EAGAIN); 746 out: 747 spin_unlock(&xprt->transport_lock); 748 } 749 750 static bool 751 xprt_has_timer(const struct rpc_xprt *xprt) 752 { 753 return xprt->idle_timeout != 0; 754 } 755 756 static void 757 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 758 __must_hold(&xprt->transport_lock) 759 { 760 xprt->last_used = jiffies; 761 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 762 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 763 } 764 765 static void 766 xprt_init_autodisconnect(struct timer_list *t) 767 { 768 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 769 770 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 771 return; 772 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 773 xprt->last_used = jiffies; 774 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 775 return; 776 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 777 } 778 779 bool xprt_lock_connect(struct rpc_xprt *xprt, 780 struct rpc_task *task, 781 void *cookie) 782 { 783 bool ret = false; 784 785 spin_lock(&xprt->transport_lock); 786 if (!test_bit(XPRT_LOCKED, &xprt->state)) 787 goto out; 788 if (xprt->snd_task != task) 789 goto out; 790 xprt->snd_task = cookie; 791 ret = true; 792 out: 793 spin_unlock(&xprt->transport_lock); 794 return ret; 795 } 796 797 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 798 { 799 spin_lock(&xprt->transport_lock); 800 if (xprt->snd_task != cookie) 801 goto out; 802 if (!test_bit(XPRT_LOCKED, &xprt->state)) 803 goto out; 804 xprt->snd_task =NULL; 805 xprt->ops->release_xprt(xprt, NULL); 806 xprt_schedule_autodisconnect(xprt); 807 out: 808 spin_unlock(&xprt->transport_lock); 809 wake_up_bit(&xprt->state, XPRT_LOCKED); 810 } 811 812 /** 813 * xprt_connect - schedule a transport connect operation 814 * @task: RPC task that is requesting the connect 815 * 816 */ 817 void xprt_connect(struct rpc_task *task) 818 { 819 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 820 821 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 822 xprt, (xprt_connected(xprt) ? "is" : "is not")); 823 824 if (!xprt_bound(xprt)) { 825 task->tk_status = -EAGAIN; 826 return; 827 } 828 if (!xprt_lock_write(xprt, task)) 829 return; 830 831 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 832 xprt->ops->close(xprt); 833 834 if (!xprt_connected(xprt)) { 835 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 836 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 837 xprt_request_timeout(task->tk_rqstp)); 838 839 if (test_bit(XPRT_CLOSING, &xprt->state)) 840 return; 841 if (xprt_test_and_set_connecting(xprt)) 842 return; 843 /* Race breaker */ 844 if (!xprt_connected(xprt)) { 845 xprt->stat.connect_start = jiffies; 846 xprt->ops->connect(xprt, task); 847 } else { 848 xprt_clear_connecting(xprt); 849 task->tk_status = 0; 850 rpc_wake_up_queued_task(&xprt->pending, task); 851 } 852 } 853 xprt_release_write(xprt, task); 854 } 855 856 /** 857 * xprt_reconnect_delay - compute the wait before scheduling a connect 858 * @xprt: transport instance 859 * 860 */ 861 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 862 { 863 unsigned long start, now = jiffies; 864 865 start = xprt->stat.connect_start + xprt->reestablish_timeout; 866 if (time_after(start, now)) 867 return start - now; 868 return 0; 869 } 870 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 871 872 /** 873 * xprt_reconnect_backoff - compute the new re-establish timeout 874 * @xprt: transport instance 875 * @init_to: initial reestablish timeout 876 * 877 */ 878 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 879 { 880 xprt->reestablish_timeout <<= 1; 881 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 882 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 883 if (xprt->reestablish_timeout < init_to) 884 xprt->reestablish_timeout = init_to; 885 } 886 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 887 888 enum xprt_xid_rb_cmp { 889 XID_RB_EQUAL, 890 XID_RB_LEFT, 891 XID_RB_RIGHT, 892 }; 893 static enum xprt_xid_rb_cmp 894 xprt_xid_cmp(__be32 xid1, __be32 xid2) 895 { 896 if (xid1 == xid2) 897 return XID_RB_EQUAL; 898 if ((__force u32)xid1 < (__force u32)xid2) 899 return XID_RB_LEFT; 900 return XID_RB_RIGHT; 901 } 902 903 static struct rpc_rqst * 904 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 905 { 906 struct rb_node *n = xprt->recv_queue.rb_node; 907 struct rpc_rqst *req; 908 909 while (n != NULL) { 910 req = rb_entry(n, struct rpc_rqst, rq_recv); 911 switch (xprt_xid_cmp(xid, req->rq_xid)) { 912 case XID_RB_LEFT: 913 n = n->rb_left; 914 break; 915 case XID_RB_RIGHT: 916 n = n->rb_right; 917 break; 918 case XID_RB_EQUAL: 919 return req; 920 } 921 } 922 return NULL; 923 } 924 925 static void 926 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 927 { 928 struct rb_node **p = &xprt->recv_queue.rb_node; 929 struct rb_node *n = NULL; 930 struct rpc_rqst *req; 931 932 while (*p != NULL) { 933 n = *p; 934 req = rb_entry(n, struct rpc_rqst, rq_recv); 935 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 936 case XID_RB_LEFT: 937 p = &n->rb_left; 938 break; 939 case XID_RB_RIGHT: 940 p = &n->rb_right; 941 break; 942 case XID_RB_EQUAL: 943 WARN_ON_ONCE(new != req); 944 return; 945 } 946 } 947 rb_link_node(&new->rq_recv, n, p); 948 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 949 } 950 951 static void 952 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 953 { 954 rb_erase(&req->rq_recv, &xprt->recv_queue); 955 } 956 957 /** 958 * xprt_lookup_rqst - find an RPC request corresponding to an XID 959 * @xprt: transport on which the original request was transmitted 960 * @xid: RPC XID of incoming reply 961 * 962 * Caller holds xprt->queue_lock. 963 */ 964 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 965 { 966 struct rpc_rqst *entry; 967 968 entry = xprt_request_rb_find(xprt, xid); 969 if (entry != NULL) { 970 trace_xprt_lookup_rqst(xprt, xid, 0); 971 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 972 return entry; 973 } 974 975 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 976 ntohl(xid)); 977 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 978 xprt->stat.bad_xids++; 979 return NULL; 980 } 981 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 982 983 static bool 984 xprt_is_pinned_rqst(struct rpc_rqst *req) 985 { 986 return atomic_read(&req->rq_pin) != 0; 987 } 988 989 /** 990 * xprt_pin_rqst - Pin a request on the transport receive list 991 * @req: Request to pin 992 * 993 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 994 * so should be holding xprt->queue_lock. 995 */ 996 void xprt_pin_rqst(struct rpc_rqst *req) 997 { 998 atomic_inc(&req->rq_pin); 999 } 1000 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1001 1002 /** 1003 * xprt_unpin_rqst - Unpin a request on the transport receive list 1004 * @req: Request to pin 1005 * 1006 * Caller should be holding xprt->queue_lock. 1007 */ 1008 void xprt_unpin_rqst(struct rpc_rqst *req) 1009 { 1010 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1011 atomic_dec(&req->rq_pin); 1012 return; 1013 } 1014 if (atomic_dec_and_test(&req->rq_pin)) 1015 wake_up_var(&req->rq_pin); 1016 } 1017 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1018 1019 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1020 { 1021 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1022 } 1023 1024 static bool 1025 xprt_request_data_received(struct rpc_task *task) 1026 { 1027 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1028 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1029 } 1030 1031 static bool 1032 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1033 { 1034 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1035 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1036 } 1037 1038 /** 1039 * xprt_request_enqueue_receive - Add an request to the receive queue 1040 * @task: RPC task 1041 * 1042 */ 1043 void 1044 xprt_request_enqueue_receive(struct rpc_task *task) 1045 { 1046 struct rpc_rqst *req = task->tk_rqstp; 1047 struct rpc_xprt *xprt = req->rq_xprt; 1048 1049 if (!xprt_request_need_enqueue_receive(task, req)) 1050 return; 1051 1052 xprt_request_prepare(task->tk_rqstp); 1053 spin_lock(&xprt->queue_lock); 1054 1055 /* Update the softirq receive buffer */ 1056 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1057 sizeof(req->rq_private_buf)); 1058 1059 /* Add request to the receive list */ 1060 xprt_request_rb_insert(xprt, req); 1061 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1062 spin_unlock(&xprt->queue_lock); 1063 1064 /* Turn off autodisconnect */ 1065 del_singleshot_timer_sync(&xprt->timer); 1066 } 1067 1068 /** 1069 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1070 * @task: RPC task 1071 * 1072 * Caller must hold xprt->queue_lock. 1073 */ 1074 static void 1075 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1076 { 1077 struct rpc_rqst *req = task->tk_rqstp; 1078 1079 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1080 xprt_request_rb_remove(req->rq_xprt, req); 1081 } 1082 1083 /** 1084 * xprt_update_rtt - Update RPC RTT statistics 1085 * @task: RPC request that recently completed 1086 * 1087 * Caller holds xprt->queue_lock. 1088 */ 1089 void xprt_update_rtt(struct rpc_task *task) 1090 { 1091 struct rpc_rqst *req = task->tk_rqstp; 1092 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1093 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1094 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1095 1096 if (timer) { 1097 if (req->rq_ntrans == 1) 1098 rpc_update_rtt(rtt, timer, m); 1099 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1100 } 1101 } 1102 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1103 1104 /** 1105 * xprt_complete_rqst - called when reply processing is complete 1106 * @task: RPC request that recently completed 1107 * @copied: actual number of bytes received from the transport 1108 * 1109 * Caller holds xprt->queue_lock. 1110 */ 1111 void xprt_complete_rqst(struct rpc_task *task, int copied) 1112 { 1113 struct rpc_rqst *req = task->tk_rqstp; 1114 struct rpc_xprt *xprt = req->rq_xprt; 1115 1116 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1117 task->tk_pid, ntohl(req->rq_xid), copied); 1118 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1119 1120 xprt->stat.recvs++; 1121 1122 req->rq_private_buf.len = copied; 1123 /* Ensure all writes are done before we update */ 1124 /* req->rq_reply_bytes_recvd */ 1125 smp_wmb(); 1126 req->rq_reply_bytes_recvd = copied; 1127 xprt_request_dequeue_receive_locked(task); 1128 rpc_wake_up_queued_task(&xprt->pending, task); 1129 } 1130 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1131 1132 static void xprt_timer(struct rpc_task *task) 1133 { 1134 struct rpc_rqst *req = task->tk_rqstp; 1135 struct rpc_xprt *xprt = req->rq_xprt; 1136 1137 if (task->tk_status != -ETIMEDOUT) 1138 return; 1139 1140 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1141 if (!req->rq_reply_bytes_recvd) { 1142 if (xprt->ops->timer) 1143 xprt->ops->timer(xprt, task); 1144 } else 1145 task->tk_status = 0; 1146 } 1147 1148 /** 1149 * xprt_wait_for_reply_request_def - wait for reply 1150 * @task: pointer to rpc_task 1151 * 1152 * Set a request's retransmit timeout based on the transport's 1153 * default timeout parameters. Used by transports that don't adjust 1154 * the retransmit timeout based on round-trip time estimation, 1155 * and put the task to sleep on the pending queue. 1156 */ 1157 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1158 { 1159 struct rpc_rqst *req = task->tk_rqstp; 1160 1161 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1162 xprt_request_timeout(req)); 1163 } 1164 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1165 1166 /** 1167 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1168 * @task: pointer to rpc_task 1169 * 1170 * Set a request's retransmit timeout using the RTT estimator, 1171 * and put the task to sleep on the pending queue. 1172 */ 1173 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1174 { 1175 int timer = task->tk_msg.rpc_proc->p_timer; 1176 struct rpc_clnt *clnt = task->tk_client; 1177 struct rpc_rtt *rtt = clnt->cl_rtt; 1178 struct rpc_rqst *req = task->tk_rqstp; 1179 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1180 unsigned long timeout; 1181 1182 timeout = rpc_calc_rto(rtt, timer); 1183 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1184 if (timeout > max_timeout || timeout == 0) 1185 timeout = max_timeout; 1186 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1187 jiffies + timeout); 1188 } 1189 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1190 1191 /** 1192 * xprt_request_wait_receive - wait for the reply to an RPC request 1193 * @task: RPC task about to send a request 1194 * 1195 */ 1196 void xprt_request_wait_receive(struct rpc_task *task) 1197 { 1198 struct rpc_rqst *req = task->tk_rqstp; 1199 struct rpc_xprt *xprt = req->rq_xprt; 1200 1201 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1202 return; 1203 /* 1204 * Sleep on the pending queue if we're expecting a reply. 1205 * The spinlock ensures atomicity between the test of 1206 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1207 */ 1208 spin_lock(&xprt->queue_lock); 1209 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1210 xprt->ops->wait_for_reply_request(task); 1211 /* 1212 * Send an extra queue wakeup call if the 1213 * connection was dropped in case the call to 1214 * rpc_sleep_on() raced. 1215 */ 1216 if (xprt_request_retransmit_after_disconnect(task)) 1217 rpc_wake_up_queued_task_set_status(&xprt->pending, 1218 task, -ENOTCONN); 1219 } 1220 spin_unlock(&xprt->queue_lock); 1221 } 1222 1223 static bool 1224 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1225 { 1226 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1227 } 1228 1229 /** 1230 * xprt_request_enqueue_transmit - queue a task for transmission 1231 * @task: pointer to rpc_task 1232 * 1233 * Add a task to the transmission queue. 1234 */ 1235 void 1236 xprt_request_enqueue_transmit(struct rpc_task *task) 1237 { 1238 struct rpc_rqst *pos, *req = task->tk_rqstp; 1239 struct rpc_xprt *xprt = req->rq_xprt; 1240 1241 if (xprt_request_need_enqueue_transmit(task, req)) { 1242 req->rq_bytes_sent = 0; 1243 spin_lock(&xprt->queue_lock); 1244 /* 1245 * Requests that carry congestion control credits are added 1246 * to the head of the list to avoid starvation issues. 1247 */ 1248 if (req->rq_cong) { 1249 xprt_clear_congestion_window_wait(xprt); 1250 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1251 if (pos->rq_cong) 1252 continue; 1253 /* Note: req is added _before_ pos */ 1254 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1255 INIT_LIST_HEAD(&req->rq_xmit2); 1256 trace_xprt_enq_xmit(task, 1); 1257 goto out; 1258 } 1259 } else if (RPC_IS_SWAPPER(task)) { 1260 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1261 if (pos->rq_cong || pos->rq_bytes_sent) 1262 continue; 1263 if (RPC_IS_SWAPPER(pos->rq_task)) 1264 continue; 1265 /* Note: req is added _before_ pos */ 1266 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1267 INIT_LIST_HEAD(&req->rq_xmit2); 1268 trace_xprt_enq_xmit(task, 2); 1269 goto out; 1270 } 1271 } else if (!req->rq_seqno) { 1272 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1273 if (pos->rq_task->tk_owner != task->tk_owner) 1274 continue; 1275 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1276 INIT_LIST_HEAD(&req->rq_xmit); 1277 trace_xprt_enq_xmit(task, 3); 1278 goto out; 1279 } 1280 } 1281 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1282 INIT_LIST_HEAD(&req->rq_xmit2); 1283 trace_xprt_enq_xmit(task, 4); 1284 out: 1285 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1286 spin_unlock(&xprt->queue_lock); 1287 } 1288 } 1289 1290 /** 1291 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1292 * @task: pointer to rpc_task 1293 * 1294 * Remove a task from the transmission queue 1295 * Caller must hold xprt->queue_lock 1296 */ 1297 static void 1298 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1299 { 1300 struct rpc_rqst *req = task->tk_rqstp; 1301 1302 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1303 return; 1304 if (!list_empty(&req->rq_xmit)) { 1305 list_del(&req->rq_xmit); 1306 if (!list_empty(&req->rq_xmit2)) { 1307 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1308 struct rpc_rqst, rq_xmit2); 1309 list_del(&req->rq_xmit2); 1310 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1311 } 1312 } else 1313 list_del(&req->rq_xmit2); 1314 } 1315 1316 /** 1317 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1318 * @task: pointer to rpc_task 1319 * 1320 * Remove a task from the transmission queue 1321 */ 1322 static void 1323 xprt_request_dequeue_transmit(struct rpc_task *task) 1324 { 1325 struct rpc_rqst *req = task->tk_rqstp; 1326 struct rpc_xprt *xprt = req->rq_xprt; 1327 1328 spin_lock(&xprt->queue_lock); 1329 xprt_request_dequeue_transmit_locked(task); 1330 spin_unlock(&xprt->queue_lock); 1331 } 1332 1333 /** 1334 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1335 * @task: pointer to rpc_task 1336 * 1337 * Remove a task from the transmit and receive queues, and ensure that 1338 * it is not pinned by the receive work item. 1339 */ 1340 void 1341 xprt_request_dequeue_xprt(struct rpc_task *task) 1342 { 1343 struct rpc_rqst *req = task->tk_rqstp; 1344 struct rpc_xprt *xprt = req->rq_xprt; 1345 1346 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1347 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1348 xprt_is_pinned_rqst(req)) { 1349 spin_lock(&xprt->queue_lock); 1350 xprt_request_dequeue_transmit_locked(task); 1351 xprt_request_dequeue_receive_locked(task); 1352 while (xprt_is_pinned_rqst(req)) { 1353 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1354 spin_unlock(&xprt->queue_lock); 1355 xprt_wait_on_pinned_rqst(req); 1356 spin_lock(&xprt->queue_lock); 1357 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1358 } 1359 spin_unlock(&xprt->queue_lock); 1360 } 1361 } 1362 1363 /** 1364 * xprt_request_prepare - prepare an encoded request for transport 1365 * @req: pointer to rpc_rqst 1366 * 1367 * Calls into the transport layer to do whatever is needed to prepare 1368 * the request for transmission or receive. 1369 */ 1370 void 1371 xprt_request_prepare(struct rpc_rqst *req) 1372 { 1373 struct rpc_xprt *xprt = req->rq_xprt; 1374 1375 if (xprt->ops->prepare_request) 1376 xprt->ops->prepare_request(req); 1377 } 1378 1379 /** 1380 * xprt_request_need_retransmit - Test if a task needs retransmission 1381 * @task: pointer to rpc_task 1382 * 1383 * Test for whether a connection breakage requires the task to retransmit 1384 */ 1385 bool 1386 xprt_request_need_retransmit(struct rpc_task *task) 1387 { 1388 return xprt_request_retransmit_after_disconnect(task); 1389 } 1390 1391 /** 1392 * xprt_prepare_transmit - reserve the transport before sending a request 1393 * @task: RPC task about to send a request 1394 * 1395 */ 1396 bool xprt_prepare_transmit(struct rpc_task *task) 1397 { 1398 struct rpc_rqst *req = task->tk_rqstp; 1399 struct rpc_xprt *xprt = req->rq_xprt; 1400 1401 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1402 1403 if (!xprt_lock_write(xprt, task)) { 1404 /* Race breaker: someone may have transmitted us */ 1405 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1406 rpc_wake_up_queued_task_set_status(&xprt->sending, 1407 task, 0); 1408 return false; 1409 1410 } 1411 return true; 1412 } 1413 1414 void xprt_end_transmit(struct rpc_task *task) 1415 { 1416 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1417 } 1418 1419 /** 1420 * xprt_request_transmit - send an RPC request on a transport 1421 * @req: pointer to request to transmit 1422 * @snd_task: RPC task that owns the transport lock 1423 * 1424 * This performs the transmission of a single request. 1425 * Note that if the request is not the same as snd_task, then it 1426 * does need to be pinned. 1427 * Returns '0' on success. 1428 */ 1429 static int 1430 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1431 { 1432 struct rpc_xprt *xprt = req->rq_xprt; 1433 struct rpc_task *task = req->rq_task; 1434 unsigned int connect_cookie; 1435 int is_retrans = RPC_WAS_SENT(task); 1436 int status; 1437 1438 if (!req->rq_bytes_sent) { 1439 if (xprt_request_data_received(task)) { 1440 status = 0; 1441 goto out_dequeue; 1442 } 1443 /* Verify that our message lies in the RPCSEC_GSS window */ 1444 if (rpcauth_xmit_need_reencode(task)) { 1445 status = -EBADMSG; 1446 goto out_dequeue; 1447 } 1448 if (RPC_SIGNALLED(task)) { 1449 status = -ERESTARTSYS; 1450 goto out_dequeue; 1451 } 1452 } 1453 1454 /* 1455 * Update req->rq_ntrans before transmitting to avoid races with 1456 * xprt_update_rtt(), which needs to know that it is recording a 1457 * reply to the first transmission. 1458 */ 1459 req->rq_ntrans++; 1460 1461 connect_cookie = xprt->connect_cookie; 1462 status = xprt->ops->send_request(req); 1463 if (status != 0) { 1464 req->rq_ntrans--; 1465 trace_xprt_transmit(req, status); 1466 return status; 1467 } 1468 1469 if (is_retrans) 1470 task->tk_client->cl_stats->rpcretrans++; 1471 1472 xprt_inject_disconnect(xprt); 1473 1474 task->tk_flags |= RPC_TASK_SENT; 1475 spin_lock(&xprt->transport_lock); 1476 1477 xprt->stat.sends++; 1478 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1479 xprt->stat.bklog_u += xprt->backlog.qlen; 1480 xprt->stat.sending_u += xprt->sending.qlen; 1481 xprt->stat.pending_u += xprt->pending.qlen; 1482 spin_unlock(&xprt->transport_lock); 1483 1484 req->rq_connect_cookie = connect_cookie; 1485 out_dequeue: 1486 trace_xprt_transmit(req, status); 1487 xprt_request_dequeue_transmit(task); 1488 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1489 return status; 1490 } 1491 1492 /** 1493 * xprt_transmit - send an RPC request on a transport 1494 * @task: controlling RPC task 1495 * 1496 * Attempts to drain the transmit queue. On exit, either the transport 1497 * signalled an error that needs to be handled before transmission can 1498 * resume, or @task finished transmitting, and detected that it already 1499 * received a reply. 1500 */ 1501 void 1502 xprt_transmit(struct rpc_task *task) 1503 { 1504 struct rpc_rqst *next, *req = task->tk_rqstp; 1505 struct rpc_xprt *xprt = req->rq_xprt; 1506 int status; 1507 1508 spin_lock(&xprt->queue_lock); 1509 while (!list_empty(&xprt->xmit_queue)) { 1510 next = list_first_entry(&xprt->xmit_queue, 1511 struct rpc_rqst, rq_xmit); 1512 xprt_pin_rqst(next); 1513 spin_unlock(&xprt->queue_lock); 1514 status = xprt_request_transmit(next, task); 1515 if (status == -EBADMSG && next != req) 1516 status = 0; 1517 cond_resched(); 1518 spin_lock(&xprt->queue_lock); 1519 xprt_unpin_rqst(next); 1520 if (status == 0) { 1521 if (!xprt_request_data_received(task) || 1522 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1523 continue; 1524 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1525 task->tk_status = status; 1526 break; 1527 } 1528 spin_unlock(&xprt->queue_lock); 1529 } 1530 1531 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1532 { 1533 set_bit(XPRT_CONGESTED, &xprt->state); 1534 rpc_sleep_on(&xprt->backlog, task, NULL); 1535 } 1536 1537 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1538 { 1539 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1540 clear_bit(XPRT_CONGESTED, &xprt->state); 1541 } 1542 1543 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1544 { 1545 bool ret = false; 1546 1547 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1548 goto out; 1549 spin_lock(&xprt->reserve_lock); 1550 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1551 rpc_sleep_on(&xprt->backlog, task, NULL); 1552 ret = true; 1553 } 1554 spin_unlock(&xprt->reserve_lock); 1555 out: 1556 return ret; 1557 } 1558 1559 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1560 { 1561 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1562 1563 if (xprt->num_reqs >= xprt->max_reqs) 1564 goto out; 1565 ++xprt->num_reqs; 1566 spin_unlock(&xprt->reserve_lock); 1567 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1568 spin_lock(&xprt->reserve_lock); 1569 if (req != NULL) 1570 goto out; 1571 --xprt->num_reqs; 1572 req = ERR_PTR(-ENOMEM); 1573 out: 1574 return req; 1575 } 1576 1577 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1578 { 1579 if (xprt->num_reqs > xprt->min_reqs) { 1580 --xprt->num_reqs; 1581 kfree(req); 1582 return true; 1583 } 1584 return false; 1585 } 1586 1587 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1588 { 1589 struct rpc_rqst *req; 1590 1591 spin_lock(&xprt->reserve_lock); 1592 if (!list_empty(&xprt->free)) { 1593 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1594 list_del(&req->rq_list); 1595 goto out_init_req; 1596 } 1597 req = xprt_dynamic_alloc_slot(xprt); 1598 if (!IS_ERR(req)) 1599 goto out_init_req; 1600 switch (PTR_ERR(req)) { 1601 case -ENOMEM: 1602 dprintk("RPC: dynamic allocation of request slot " 1603 "failed! Retrying\n"); 1604 task->tk_status = -ENOMEM; 1605 break; 1606 case -EAGAIN: 1607 xprt_add_backlog(xprt, task); 1608 dprintk("RPC: waiting for request slot\n"); 1609 /* fall through */ 1610 default: 1611 task->tk_status = -EAGAIN; 1612 } 1613 spin_unlock(&xprt->reserve_lock); 1614 return; 1615 out_init_req: 1616 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1617 xprt->num_reqs); 1618 spin_unlock(&xprt->reserve_lock); 1619 1620 task->tk_status = 0; 1621 task->tk_rqstp = req; 1622 } 1623 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1624 1625 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1626 { 1627 spin_lock(&xprt->reserve_lock); 1628 if (!xprt_dynamic_free_slot(xprt, req)) { 1629 memset(req, 0, sizeof(*req)); /* mark unused */ 1630 list_add(&req->rq_list, &xprt->free); 1631 } 1632 xprt_wake_up_backlog(xprt); 1633 spin_unlock(&xprt->reserve_lock); 1634 } 1635 EXPORT_SYMBOL_GPL(xprt_free_slot); 1636 1637 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1638 { 1639 struct rpc_rqst *req; 1640 while (!list_empty(&xprt->free)) { 1641 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1642 list_del(&req->rq_list); 1643 kfree(req); 1644 } 1645 } 1646 1647 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1648 unsigned int num_prealloc, 1649 unsigned int max_alloc) 1650 { 1651 struct rpc_xprt *xprt; 1652 struct rpc_rqst *req; 1653 int i; 1654 1655 xprt = kzalloc(size, GFP_KERNEL); 1656 if (xprt == NULL) 1657 goto out; 1658 1659 xprt_init(xprt, net); 1660 1661 for (i = 0; i < num_prealloc; i++) { 1662 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1663 if (!req) 1664 goto out_free; 1665 list_add(&req->rq_list, &xprt->free); 1666 } 1667 if (max_alloc > num_prealloc) 1668 xprt->max_reqs = max_alloc; 1669 else 1670 xprt->max_reqs = num_prealloc; 1671 xprt->min_reqs = num_prealloc; 1672 xprt->num_reqs = num_prealloc; 1673 1674 return xprt; 1675 1676 out_free: 1677 xprt_free(xprt); 1678 out: 1679 return NULL; 1680 } 1681 EXPORT_SYMBOL_GPL(xprt_alloc); 1682 1683 void xprt_free(struct rpc_xprt *xprt) 1684 { 1685 put_net(xprt->xprt_net); 1686 xprt_free_all_slots(xprt); 1687 kfree_rcu(xprt, rcu); 1688 } 1689 EXPORT_SYMBOL_GPL(xprt_free); 1690 1691 static void 1692 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1693 { 1694 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1695 } 1696 1697 static __be32 1698 xprt_alloc_xid(struct rpc_xprt *xprt) 1699 { 1700 __be32 xid; 1701 1702 spin_lock(&xprt->reserve_lock); 1703 xid = (__force __be32)xprt->xid++; 1704 spin_unlock(&xprt->reserve_lock); 1705 return xid; 1706 } 1707 1708 static void 1709 xprt_init_xid(struct rpc_xprt *xprt) 1710 { 1711 xprt->xid = prandom_u32(); 1712 } 1713 1714 static void 1715 xprt_request_init(struct rpc_task *task) 1716 { 1717 struct rpc_xprt *xprt = task->tk_xprt; 1718 struct rpc_rqst *req = task->tk_rqstp; 1719 1720 req->rq_task = task; 1721 req->rq_xprt = xprt; 1722 req->rq_buffer = NULL; 1723 req->rq_xid = xprt_alloc_xid(xprt); 1724 xprt_init_connect_cookie(req, xprt); 1725 req->rq_snd_buf.len = 0; 1726 req->rq_snd_buf.buflen = 0; 1727 req->rq_rcv_buf.len = 0; 1728 req->rq_rcv_buf.buflen = 0; 1729 req->rq_snd_buf.bvec = NULL; 1730 req->rq_rcv_buf.bvec = NULL; 1731 req->rq_release_snd_buf = NULL; 1732 xprt_init_majortimeo(task, req); 1733 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1734 req, ntohl(req->rq_xid)); 1735 } 1736 1737 static void 1738 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1739 { 1740 xprt->ops->alloc_slot(xprt, task); 1741 if (task->tk_rqstp != NULL) 1742 xprt_request_init(task); 1743 } 1744 1745 /** 1746 * xprt_reserve - allocate an RPC request slot 1747 * @task: RPC task requesting a slot allocation 1748 * 1749 * If the transport is marked as being congested, or if no more 1750 * slots are available, place the task on the transport's 1751 * backlog queue. 1752 */ 1753 void xprt_reserve(struct rpc_task *task) 1754 { 1755 struct rpc_xprt *xprt = task->tk_xprt; 1756 1757 task->tk_status = 0; 1758 if (task->tk_rqstp != NULL) 1759 return; 1760 1761 task->tk_status = -EAGAIN; 1762 if (!xprt_throttle_congested(xprt, task)) 1763 xprt_do_reserve(xprt, task); 1764 } 1765 1766 /** 1767 * xprt_retry_reserve - allocate an RPC request slot 1768 * @task: RPC task requesting a slot allocation 1769 * 1770 * If no more slots are available, place the task on the transport's 1771 * backlog queue. 1772 * Note that the only difference with xprt_reserve is that we now 1773 * ignore the value of the XPRT_CONGESTED flag. 1774 */ 1775 void xprt_retry_reserve(struct rpc_task *task) 1776 { 1777 struct rpc_xprt *xprt = task->tk_xprt; 1778 1779 task->tk_status = 0; 1780 if (task->tk_rqstp != NULL) 1781 return; 1782 1783 task->tk_status = -EAGAIN; 1784 xprt_do_reserve(xprt, task); 1785 } 1786 1787 /** 1788 * xprt_release - release an RPC request slot 1789 * @task: task which is finished with the slot 1790 * 1791 */ 1792 void xprt_release(struct rpc_task *task) 1793 { 1794 struct rpc_xprt *xprt; 1795 struct rpc_rqst *req = task->tk_rqstp; 1796 1797 if (req == NULL) { 1798 if (task->tk_client) { 1799 xprt = task->tk_xprt; 1800 xprt_release_write(xprt, task); 1801 } 1802 return; 1803 } 1804 1805 xprt = req->rq_xprt; 1806 xprt_request_dequeue_xprt(task); 1807 spin_lock(&xprt->transport_lock); 1808 xprt->ops->release_xprt(xprt, task); 1809 if (xprt->ops->release_request) 1810 xprt->ops->release_request(task); 1811 xprt_schedule_autodisconnect(xprt); 1812 spin_unlock(&xprt->transport_lock); 1813 if (req->rq_buffer) 1814 xprt->ops->buf_free(task); 1815 xprt_inject_disconnect(xprt); 1816 xdr_free_bvec(&req->rq_rcv_buf); 1817 xdr_free_bvec(&req->rq_snd_buf); 1818 if (req->rq_cred != NULL) 1819 put_rpccred(req->rq_cred); 1820 task->tk_rqstp = NULL; 1821 if (req->rq_release_snd_buf) 1822 req->rq_release_snd_buf(req); 1823 1824 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1825 if (likely(!bc_prealloc(req))) 1826 xprt->ops->free_slot(xprt, req); 1827 else 1828 xprt_free_bc_request(req); 1829 } 1830 1831 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1832 void 1833 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1834 { 1835 struct xdr_buf *xbufp = &req->rq_snd_buf; 1836 1837 task->tk_rqstp = req; 1838 req->rq_task = task; 1839 xprt_init_connect_cookie(req, req->rq_xprt); 1840 /* 1841 * Set up the xdr_buf length. 1842 * This also indicates that the buffer is XDR encoded already. 1843 */ 1844 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1845 xbufp->tail[0].iov_len; 1846 } 1847 #endif 1848 1849 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1850 { 1851 kref_init(&xprt->kref); 1852 1853 spin_lock_init(&xprt->transport_lock); 1854 spin_lock_init(&xprt->reserve_lock); 1855 spin_lock_init(&xprt->queue_lock); 1856 1857 INIT_LIST_HEAD(&xprt->free); 1858 xprt->recv_queue = RB_ROOT; 1859 INIT_LIST_HEAD(&xprt->xmit_queue); 1860 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1861 spin_lock_init(&xprt->bc_pa_lock); 1862 INIT_LIST_HEAD(&xprt->bc_pa_list); 1863 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1864 INIT_LIST_HEAD(&xprt->xprt_switch); 1865 1866 xprt->last_used = jiffies; 1867 xprt->cwnd = RPC_INITCWND; 1868 xprt->bind_index = 0; 1869 1870 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1871 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1872 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1873 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1874 1875 xprt_init_xid(xprt); 1876 1877 xprt->xprt_net = get_net(net); 1878 } 1879 1880 /** 1881 * xprt_create_transport - create an RPC transport 1882 * @args: rpc transport creation arguments 1883 * 1884 */ 1885 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1886 { 1887 struct rpc_xprt *xprt; 1888 struct xprt_class *t; 1889 1890 spin_lock(&xprt_list_lock); 1891 list_for_each_entry(t, &xprt_list, list) { 1892 if (t->ident == args->ident) { 1893 spin_unlock(&xprt_list_lock); 1894 goto found; 1895 } 1896 } 1897 spin_unlock(&xprt_list_lock); 1898 dprintk("RPC: transport (%d) not supported\n", args->ident); 1899 return ERR_PTR(-EIO); 1900 1901 found: 1902 xprt = t->setup(args); 1903 if (IS_ERR(xprt)) { 1904 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1905 -PTR_ERR(xprt)); 1906 goto out; 1907 } 1908 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1909 xprt->idle_timeout = 0; 1910 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1911 if (xprt_has_timer(xprt)) 1912 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1913 else 1914 timer_setup(&xprt->timer, NULL, 0); 1915 1916 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1917 xprt_destroy(xprt); 1918 return ERR_PTR(-EINVAL); 1919 } 1920 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1921 if (xprt->servername == NULL) { 1922 xprt_destroy(xprt); 1923 return ERR_PTR(-ENOMEM); 1924 } 1925 1926 rpc_xprt_debugfs_register(xprt); 1927 1928 dprintk("RPC: created transport %p with %u slots\n", xprt, 1929 xprt->max_reqs); 1930 out: 1931 return xprt; 1932 } 1933 1934 static void xprt_destroy_cb(struct work_struct *work) 1935 { 1936 struct rpc_xprt *xprt = 1937 container_of(work, struct rpc_xprt, task_cleanup); 1938 1939 rpc_xprt_debugfs_unregister(xprt); 1940 rpc_destroy_wait_queue(&xprt->binding); 1941 rpc_destroy_wait_queue(&xprt->pending); 1942 rpc_destroy_wait_queue(&xprt->sending); 1943 rpc_destroy_wait_queue(&xprt->backlog); 1944 kfree(xprt->servername); 1945 /* 1946 * Destroy any existing back channel 1947 */ 1948 xprt_destroy_backchannel(xprt, UINT_MAX); 1949 1950 /* 1951 * Tear down transport state and free the rpc_xprt 1952 */ 1953 xprt->ops->destroy(xprt); 1954 } 1955 1956 /** 1957 * xprt_destroy - destroy an RPC transport, killing off all requests. 1958 * @xprt: transport to destroy 1959 * 1960 */ 1961 static void xprt_destroy(struct rpc_xprt *xprt) 1962 { 1963 dprintk("RPC: destroying transport %p\n", xprt); 1964 1965 /* 1966 * Exclude transport connect/disconnect handlers and autoclose 1967 */ 1968 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1969 1970 del_timer_sync(&xprt->timer); 1971 1972 /* 1973 * Destroy sockets etc from the system workqueue so they can 1974 * safely flush receive work running on rpciod. 1975 */ 1976 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1977 schedule_work(&xprt->task_cleanup); 1978 } 1979 1980 static void xprt_destroy_kref(struct kref *kref) 1981 { 1982 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1983 } 1984 1985 /** 1986 * xprt_get - return a reference to an RPC transport. 1987 * @xprt: pointer to the transport 1988 * 1989 */ 1990 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1991 { 1992 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1993 return xprt; 1994 return NULL; 1995 } 1996 EXPORT_SYMBOL_GPL(xprt_get); 1997 1998 /** 1999 * xprt_put - release a reference to an RPC transport. 2000 * @xprt: pointer to the transport 2001 * 2002 */ 2003 void xprt_put(struct rpc_xprt *xprt) 2004 { 2005 if (xprt != NULL) 2006 kref_put(&xprt->kref, xprt_destroy_kref); 2007 } 2008 EXPORT_SYMBOL_GPL(xprt_put); 2009