1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 59 /* 60 * Local variables 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_XPRT 65 #endif 66 67 /* 68 * Local functions 69 */ 70 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 71 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 72 static void xprt_destroy(struct rpc_xprt *xprt); 73 static void xprt_request_init(struct rpc_task *task); 74 75 static DEFINE_SPINLOCK(xprt_list_lock); 76 static LIST_HEAD(xprt_list); 77 78 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 79 { 80 unsigned long timeout = jiffies + req->rq_timeout; 81 82 if (time_before(timeout, req->rq_majortimeo)) 83 return timeout; 84 return req->rq_majortimeo; 85 } 86 87 /** 88 * xprt_register_transport - register a transport implementation 89 * @transport: transport to register 90 * 91 * If a transport implementation is loaded as a kernel module, it can 92 * call this interface to make itself known to the RPC client. 93 * 94 * Returns: 95 * 0: transport successfully registered 96 * -EEXIST: transport already registered 97 * -EINVAL: transport module being unloaded 98 */ 99 int xprt_register_transport(struct xprt_class *transport) 100 { 101 struct xprt_class *t; 102 int result; 103 104 result = -EEXIST; 105 spin_lock(&xprt_list_lock); 106 list_for_each_entry(t, &xprt_list, list) { 107 /* don't register the same transport class twice */ 108 if (t->ident == transport->ident) 109 goto out; 110 } 111 112 list_add_tail(&transport->list, &xprt_list); 113 printk(KERN_INFO "RPC: Registered %s transport module.\n", 114 transport->name); 115 result = 0; 116 117 out: 118 spin_unlock(&xprt_list_lock); 119 return result; 120 } 121 EXPORT_SYMBOL_GPL(xprt_register_transport); 122 123 /** 124 * xprt_unregister_transport - unregister a transport implementation 125 * @transport: transport to unregister 126 * 127 * Returns: 128 * 0: transport successfully unregistered 129 * -ENOENT: transport never registered 130 */ 131 int xprt_unregister_transport(struct xprt_class *transport) 132 { 133 struct xprt_class *t; 134 int result; 135 136 result = 0; 137 spin_lock(&xprt_list_lock); 138 list_for_each_entry(t, &xprt_list, list) { 139 if (t == transport) { 140 printk(KERN_INFO 141 "RPC: Unregistered %s transport module.\n", 142 transport->name); 143 list_del_init(&transport->list); 144 goto out; 145 } 146 } 147 result = -ENOENT; 148 149 out: 150 spin_unlock(&xprt_list_lock); 151 return result; 152 } 153 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 154 155 static void 156 xprt_class_release(const struct xprt_class *t) 157 { 158 module_put(t->owner); 159 } 160 161 static const struct xprt_class * 162 xprt_class_find_by_ident_locked(int ident) 163 { 164 const struct xprt_class *t; 165 166 list_for_each_entry(t, &xprt_list, list) { 167 if (t->ident != ident) 168 continue; 169 if (!try_module_get(t->owner)) 170 continue; 171 return t; 172 } 173 return NULL; 174 } 175 176 static const struct xprt_class * 177 xprt_class_find_by_ident(int ident) 178 { 179 const struct xprt_class *t; 180 181 spin_lock(&xprt_list_lock); 182 t = xprt_class_find_by_ident_locked(ident); 183 spin_unlock(&xprt_list_lock); 184 return t; 185 } 186 187 static const struct xprt_class * 188 xprt_class_find_by_netid_locked(const char *netid) 189 { 190 const struct xprt_class *t; 191 unsigned int i; 192 193 list_for_each_entry(t, &xprt_list, list) { 194 for (i = 0; t->netid[i][0] != '\0'; i++) { 195 if (strcmp(t->netid[i], netid) != 0) 196 continue; 197 if (!try_module_get(t->owner)) 198 continue; 199 return t; 200 } 201 } 202 return NULL; 203 } 204 205 static const struct xprt_class * 206 xprt_class_find_by_netid(const char *netid) 207 { 208 const struct xprt_class *t; 209 210 spin_lock(&xprt_list_lock); 211 t = xprt_class_find_by_netid_locked(netid); 212 if (!t) { 213 spin_unlock(&xprt_list_lock); 214 request_module("rpc%s", netid); 215 spin_lock(&xprt_list_lock); 216 t = xprt_class_find_by_netid_locked(netid); 217 } 218 spin_unlock(&xprt_list_lock); 219 return t; 220 } 221 222 /** 223 * xprt_find_transport_ident - convert a netid into a transport identifier 224 * @netid: transport to load 225 * 226 * Returns: 227 * > 0: transport identifier 228 * -ENOENT: transport module not available 229 */ 230 int xprt_find_transport_ident(const char *netid) 231 { 232 const struct xprt_class *t; 233 int ret; 234 235 t = xprt_class_find_by_netid(netid); 236 if (!t) 237 return -ENOENT; 238 ret = t->ident; 239 xprt_class_release(t); 240 return ret; 241 } 242 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 243 244 static void xprt_clear_locked(struct rpc_xprt *xprt) 245 { 246 xprt->snd_task = NULL; 247 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 248 smp_mb__before_atomic(); 249 clear_bit(XPRT_LOCKED, &xprt->state); 250 smp_mb__after_atomic(); 251 } else 252 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 253 } 254 255 /** 256 * xprt_reserve_xprt - serialize write access to transports 257 * @task: task that is requesting access to the transport 258 * @xprt: pointer to the target transport 259 * 260 * This prevents mixing the payload of separate requests, and prevents 261 * transport connects from colliding with writes. No congestion control 262 * is provided. 263 */ 264 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 265 { 266 struct rpc_rqst *req = task->tk_rqstp; 267 268 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 269 if (task == xprt->snd_task) 270 goto out_locked; 271 goto out_sleep; 272 } 273 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 274 goto out_unlock; 275 xprt->snd_task = task; 276 277 out_locked: 278 trace_xprt_reserve_xprt(xprt, task); 279 return 1; 280 281 out_unlock: 282 xprt_clear_locked(xprt); 283 out_sleep: 284 task->tk_status = -EAGAIN; 285 if (RPC_IS_SOFT(task)) 286 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 287 xprt_request_timeout(req)); 288 else 289 rpc_sleep_on(&xprt->sending, task, NULL); 290 return 0; 291 } 292 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 293 294 static bool 295 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 296 { 297 return test_bit(XPRT_CWND_WAIT, &xprt->state); 298 } 299 300 static void 301 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 302 { 303 if (!list_empty(&xprt->xmit_queue)) { 304 /* Peek at head of queue to see if it can make progress */ 305 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 306 rq_xmit)->rq_cong) 307 return; 308 } 309 set_bit(XPRT_CWND_WAIT, &xprt->state); 310 } 311 312 static void 313 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 314 { 315 if (!RPCXPRT_CONGESTED(xprt)) 316 clear_bit(XPRT_CWND_WAIT, &xprt->state); 317 } 318 319 /* 320 * xprt_reserve_xprt_cong - serialize write access to transports 321 * @task: task that is requesting access to the transport 322 * 323 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 324 * integrated into the decision of whether a request is allowed to be 325 * woken up and given access to the transport. 326 * Note that the lock is only granted if we know there are free slots. 327 */ 328 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 329 { 330 struct rpc_rqst *req = task->tk_rqstp; 331 332 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 333 if (task == xprt->snd_task) 334 goto out_locked; 335 goto out_sleep; 336 } 337 if (req == NULL) { 338 xprt->snd_task = task; 339 goto out_locked; 340 } 341 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 342 goto out_unlock; 343 if (!xprt_need_congestion_window_wait(xprt)) { 344 xprt->snd_task = task; 345 goto out_locked; 346 } 347 out_unlock: 348 xprt_clear_locked(xprt); 349 out_sleep: 350 task->tk_status = -EAGAIN; 351 if (RPC_IS_SOFT(task)) 352 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 353 xprt_request_timeout(req)); 354 else 355 rpc_sleep_on(&xprt->sending, task, NULL); 356 return 0; 357 out_locked: 358 trace_xprt_reserve_cong(xprt, task); 359 return 1; 360 } 361 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 362 363 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 364 { 365 int retval; 366 367 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 368 return 1; 369 spin_lock(&xprt->transport_lock); 370 retval = xprt->ops->reserve_xprt(xprt, task); 371 spin_unlock(&xprt->transport_lock); 372 return retval; 373 } 374 375 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 376 { 377 struct rpc_xprt *xprt = data; 378 379 xprt->snd_task = task; 380 return true; 381 } 382 383 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 384 { 385 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 386 return; 387 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 388 goto out_unlock; 389 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 390 __xprt_lock_write_func, xprt)) 391 return; 392 out_unlock: 393 xprt_clear_locked(xprt); 394 } 395 396 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 397 { 398 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 399 return; 400 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 401 goto out_unlock; 402 if (xprt_need_congestion_window_wait(xprt)) 403 goto out_unlock; 404 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 405 __xprt_lock_write_func, xprt)) 406 return; 407 out_unlock: 408 xprt_clear_locked(xprt); 409 } 410 411 /** 412 * xprt_release_xprt - allow other requests to use a transport 413 * @xprt: transport with other tasks potentially waiting 414 * @task: task that is releasing access to the transport 415 * 416 * Note that "task" can be NULL. No congestion control is provided. 417 */ 418 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 419 { 420 if (xprt->snd_task == task) { 421 xprt_clear_locked(xprt); 422 __xprt_lock_write_next(xprt); 423 } 424 trace_xprt_release_xprt(xprt, task); 425 } 426 EXPORT_SYMBOL_GPL(xprt_release_xprt); 427 428 /** 429 * xprt_release_xprt_cong - allow other requests to use a transport 430 * @xprt: transport with other tasks potentially waiting 431 * @task: task that is releasing access to the transport 432 * 433 * Note that "task" can be NULL. Another task is awoken to use the 434 * transport if the transport's congestion window allows it. 435 */ 436 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 437 { 438 if (xprt->snd_task == task) { 439 xprt_clear_locked(xprt); 440 __xprt_lock_write_next_cong(xprt); 441 } 442 trace_xprt_release_cong(xprt, task); 443 } 444 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 445 446 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 447 { 448 if (xprt->snd_task != task) 449 return; 450 spin_lock(&xprt->transport_lock); 451 xprt->ops->release_xprt(xprt, task); 452 spin_unlock(&xprt->transport_lock); 453 } 454 455 /* 456 * Van Jacobson congestion avoidance. Check if the congestion window 457 * overflowed. Put the task to sleep if this is the case. 458 */ 459 static int 460 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 461 { 462 if (req->rq_cong) 463 return 1; 464 trace_xprt_get_cong(xprt, req->rq_task); 465 if (RPCXPRT_CONGESTED(xprt)) { 466 xprt_set_congestion_window_wait(xprt); 467 return 0; 468 } 469 req->rq_cong = 1; 470 xprt->cong += RPC_CWNDSCALE; 471 return 1; 472 } 473 474 /* 475 * Adjust the congestion window, and wake up the next task 476 * that has been sleeping due to congestion 477 */ 478 static void 479 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 480 { 481 if (!req->rq_cong) 482 return; 483 req->rq_cong = 0; 484 xprt->cong -= RPC_CWNDSCALE; 485 xprt_test_and_clear_congestion_window_wait(xprt); 486 trace_xprt_put_cong(xprt, req->rq_task); 487 __xprt_lock_write_next_cong(xprt); 488 } 489 490 /** 491 * xprt_request_get_cong - Request congestion control credits 492 * @xprt: pointer to transport 493 * @req: pointer to RPC request 494 * 495 * Useful for transports that require congestion control. 496 */ 497 bool 498 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 499 { 500 bool ret = false; 501 502 if (req->rq_cong) 503 return true; 504 spin_lock(&xprt->transport_lock); 505 ret = __xprt_get_cong(xprt, req) != 0; 506 spin_unlock(&xprt->transport_lock); 507 return ret; 508 } 509 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 510 511 /** 512 * xprt_release_rqst_cong - housekeeping when request is complete 513 * @task: RPC request that recently completed 514 * 515 * Useful for transports that require congestion control. 516 */ 517 void xprt_release_rqst_cong(struct rpc_task *task) 518 { 519 struct rpc_rqst *req = task->tk_rqstp; 520 521 __xprt_put_cong(req->rq_xprt, req); 522 } 523 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 524 525 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 526 { 527 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 528 __xprt_lock_write_next_cong(xprt); 529 } 530 531 /* 532 * Clear the congestion window wait flag and wake up the next 533 * entry on xprt->sending 534 */ 535 static void 536 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 537 { 538 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 539 spin_lock(&xprt->transport_lock); 540 __xprt_lock_write_next_cong(xprt); 541 spin_unlock(&xprt->transport_lock); 542 } 543 } 544 545 /** 546 * xprt_adjust_cwnd - adjust transport congestion window 547 * @xprt: pointer to xprt 548 * @task: recently completed RPC request used to adjust window 549 * @result: result code of completed RPC request 550 * 551 * The transport code maintains an estimate on the maximum number of out- 552 * standing RPC requests, using a smoothed version of the congestion 553 * avoidance implemented in 44BSD. This is basically the Van Jacobson 554 * congestion algorithm: If a retransmit occurs, the congestion window is 555 * halved; otherwise, it is incremented by 1/cwnd when 556 * 557 * - a reply is received and 558 * - a full number of requests are outstanding and 559 * - the congestion window hasn't been updated recently. 560 */ 561 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 562 { 563 struct rpc_rqst *req = task->tk_rqstp; 564 unsigned long cwnd = xprt->cwnd; 565 566 if (result >= 0 && cwnd <= xprt->cong) { 567 /* The (cwnd >> 1) term makes sure 568 * the result gets rounded properly. */ 569 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 570 if (cwnd > RPC_MAXCWND(xprt)) 571 cwnd = RPC_MAXCWND(xprt); 572 __xprt_lock_write_next_cong(xprt); 573 } else if (result == -ETIMEDOUT) { 574 cwnd >>= 1; 575 if (cwnd < RPC_CWNDSCALE) 576 cwnd = RPC_CWNDSCALE; 577 } 578 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 579 xprt->cong, xprt->cwnd, cwnd); 580 xprt->cwnd = cwnd; 581 __xprt_put_cong(xprt, req); 582 } 583 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 584 585 /** 586 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 587 * @xprt: transport with waiting tasks 588 * @status: result code to plant in each task before waking it 589 * 590 */ 591 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 592 { 593 if (status < 0) 594 rpc_wake_up_status(&xprt->pending, status); 595 else 596 rpc_wake_up(&xprt->pending); 597 } 598 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 599 600 /** 601 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 602 * @xprt: transport 603 * 604 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 605 * we don't in general want to force a socket disconnection due to 606 * an incomplete RPC call transmission. 607 */ 608 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 609 { 610 set_bit(XPRT_WRITE_SPACE, &xprt->state); 611 } 612 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 613 614 static bool 615 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 616 { 617 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 618 __xprt_lock_write_next(xprt); 619 dprintk("RPC: write space: waking waiting task on " 620 "xprt %p\n", xprt); 621 return true; 622 } 623 return false; 624 } 625 626 /** 627 * xprt_write_space - wake the task waiting for transport output buffer space 628 * @xprt: transport with waiting tasks 629 * 630 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 631 */ 632 bool xprt_write_space(struct rpc_xprt *xprt) 633 { 634 bool ret; 635 636 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 637 return false; 638 spin_lock(&xprt->transport_lock); 639 ret = xprt_clear_write_space_locked(xprt); 640 spin_unlock(&xprt->transport_lock); 641 return ret; 642 } 643 EXPORT_SYMBOL_GPL(xprt_write_space); 644 645 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 646 { 647 s64 delta = ktime_to_ns(ktime_get() - abstime); 648 return likely(delta >= 0) ? 649 jiffies - nsecs_to_jiffies(delta) : 650 jiffies + nsecs_to_jiffies(-delta); 651 } 652 653 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 654 { 655 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 656 unsigned long majortimeo = req->rq_timeout; 657 658 if (to->to_exponential) 659 majortimeo <<= to->to_retries; 660 else 661 majortimeo += to->to_increment * to->to_retries; 662 if (majortimeo > to->to_maxval || majortimeo == 0) 663 majortimeo = to->to_maxval; 664 return majortimeo; 665 } 666 667 static void xprt_reset_majortimeo(struct rpc_rqst *req) 668 { 669 req->rq_majortimeo += xprt_calc_majortimeo(req); 670 } 671 672 static void xprt_reset_minortimeo(struct rpc_rqst *req) 673 { 674 req->rq_minortimeo += req->rq_timeout; 675 } 676 677 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 678 { 679 unsigned long time_init; 680 struct rpc_xprt *xprt = req->rq_xprt; 681 682 if (likely(xprt && xprt_connected(xprt))) 683 time_init = jiffies; 684 else 685 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 686 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 687 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 688 req->rq_minortimeo = time_init + req->rq_timeout; 689 } 690 691 /** 692 * xprt_adjust_timeout - adjust timeout values for next retransmit 693 * @req: RPC request containing parameters to use for the adjustment 694 * 695 */ 696 int xprt_adjust_timeout(struct rpc_rqst *req) 697 { 698 struct rpc_xprt *xprt = req->rq_xprt; 699 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 700 int status = 0; 701 702 if (time_before(jiffies, req->rq_majortimeo)) { 703 if (time_before(jiffies, req->rq_minortimeo)) 704 return status; 705 if (to->to_exponential) 706 req->rq_timeout <<= 1; 707 else 708 req->rq_timeout += to->to_increment; 709 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 710 req->rq_timeout = to->to_maxval; 711 req->rq_retries++; 712 } else { 713 req->rq_timeout = to->to_initval; 714 req->rq_retries = 0; 715 xprt_reset_majortimeo(req); 716 /* Reset the RTT counters == "slow start" */ 717 spin_lock(&xprt->transport_lock); 718 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 719 spin_unlock(&xprt->transport_lock); 720 status = -ETIMEDOUT; 721 } 722 xprt_reset_minortimeo(req); 723 724 if (req->rq_timeout == 0) { 725 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 726 req->rq_timeout = 5 * HZ; 727 } 728 return status; 729 } 730 731 static void xprt_autoclose(struct work_struct *work) 732 { 733 struct rpc_xprt *xprt = 734 container_of(work, struct rpc_xprt, task_cleanup); 735 unsigned int pflags = memalloc_nofs_save(); 736 737 trace_xprt_disconnect_auto(xprt); 738 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 739 xprt->ops->close(xprt); 740 xprt_release_write(xprt, NULL); 741 wake_up_bit(&xprt->state, XPRT_LOCKED); 742 memalloc_nofs_restore(pflags); 743 } 744 745 /** 746 * xprt_disconnect_done - mark a transport as disconnected 747 * @xprt: transport to flag for disconnect 748 * 749 */ 750 void xprt_disconnect_done(struct rpc_xprt *xprt) 751 { 752 trace_xprt_disconnect_done(xprt); 753 spin_lock(&xprt->transport_lock); 754 xprt_clear_connected(xprt); 755 xprt_clear_write_space_locked(xprt); 756 xprt_clear_congestion_window_wait_locked(xprt); 757 xprt_wake_pending_tasks(xprt, -ENOTCONN); 758 spin_unlock(&xprt->transport_lock); 759 } 760 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 761 762 /** 763 * xprt_force_disconnect - force a transport to disconnect 764 * @xprt: transport to disconnect 765 * 766 */ 767 void xprt_force_disconnect(struct rpc_xprt *xprt) 768 { 769 trace_xprt_disconnect_force(xprt); 770 771 /* Don't race with the test_bit() in xprt_clear_locked() */ 772 spin_lock(&xprt->transport_lock); 773 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 774 /* Try to schedule an autoclose RPC call */ 775 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 776 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 777 else if (xprt->snd_task) 778 rpc_wake_up_queued_task_set_status(&xprt->pending, 779 xprt->snd_task, -ENOTCONN); 780 spin_unlock(&xprt->transport_lock); 781 } 782 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 783 784 static unsigned int 785 xprt_connect_cookie(struct rpc_xprt *xprt) 786 { 787 return READ_ONCE(xprt->connect_cookie); 788 } 789 790 static bool 791 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 792 { 793 struct rpc_rqst *req = task->tk_rqstp; 794 struct rpc_xprt *xprt = req->rq_xprt; 795 796 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 797 !xprt_connected(xprt); 798 } 799 800 /** 801 * xprt_conditional_disconnect - force a transport to disconnect 802 * @xprt: transport to disconnect 803 * @cookie: 'connection cookie' 804 * 805 * This attempts to break the connection if and only if 'cookie' matches 806 * the current transport 'connection cookie'. It ensures that we don't 807 * try to break the connection more than once when we need to retransmit 808 * a batch of RPC requests. 809 * 810 */ 811 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 812 { 813 /* Don't race with the test_bit() in xprt_clear_locked() */ 814 spin_lock(&xprt->transport_lock); 815 if (cookie != xprt->connect_cookie) 816 goto out; 817 if (test_bit(XPRT_CLOSING, &xprt->state)) 818 goto out; 819 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 820 /* Try to schedule an autoclose RPC call */ 821 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 822 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 823 xprt_wake_pending_tasks(xprt, -EAGAIN); 824 out: 825 spin_unlock(&xprt->transport_lock); 826 } 827 828 static bool 829 xprt_has_timer(const struct rpc_xprt *xprt) 830 { 831 return xprt->idle_timeout != 0; 832 } 833 834 static void 835 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 836 __must_hold(&xprt->transport_lock) 837 { 838 xprt->last_used = jiffies; 839 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 840 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 841 } 842 843 static void 844 xprt_init_autodisconnect(struct timer_list *t) 845 { 846 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 847 848 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 849 return; 850 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 851 xprt->last_used = jiffies; 852 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 853 return; 854 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 855 } 856 857 bool xprt_lock_connect(struct rpc_xprt *xprt, 858 struct rpc_task *task, 859 void *cookie) 860 { 861 bool ret = false; 862 863 spin_lock(&xprt->transport_lock); 864 if (!test_bit(XPRT_LOCKED, &xprt->state)) 865 goto out; 866 if (xprt->snd_task != task) 867 goto out; 868 xprt->snd_task = cookie; 869 ret = true; 870 out: 871 spin_unlock(&xprt->transport_lock); 872 return ret; 873 } 874 875 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 876 { 877 spin_lock(&xprt->transport_lock); 878 if (xprt->snd_task != cookie) 879 goto out; 880 if (!test_bit(XPRT_LOCKED, &xprt->state)) 881 goto out; 882 xprt->snd_task =NULL; 883 xprt->ops->release_xprt(xprt, NULL); 884 xprt_schedule_autodisconnect(xprt); 885 out: 886 spin_unlock(&xprt->transport_lock); 887 wake_up_bit(&xprt->state, XPRT_LOCKED); 888 } 889 890 /** 891 * xprt_connect - schedule a transport connect operation 892 * @task: RPC task that is requesting the connect 893 * 894 */ 895 void xprt_connect(struct rpc_task *task) 896 { 897 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 898 899 trace_xprt_connect(xprt); 900 901 if (!xprt_bound(xprt)) { 902 task->tk_status = -EAGAIN; 903 return; 904 } 905 if (!xprt_lock_write(xprt, task)) 906 return; 907 908 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 909 trace_xprt_disconnect_cleanup(xprt); 910 xprt->ops->close(xprt); 911 } 912 913 if (!xprt_connected(xprt)) { 914 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 915 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 916 xprt_request_timeout(task->tk_rqstp)); 917 918 if (test_bit(XPRT_CLOSING, &xprt->state)) 919 return; 920 if (xprt_test_and_set_connecting(xprt)) 921 return; 922 /* Race breaker */ 923 if (!xprt_connected(xprt)) { 924 xprt->stat.connect_start = jiffies; 925 xprt->ops->connect(xprt, task); 926 } else { 927 xprt_clear_connecting(xprt); 928 task->tk_status = 0; 929 rpc_wake_up_queued_task(&xprt->pending, task); 930 } 931 } 932 xprt_release_write(xprt, task); 933 } 934 935 /** 936 * xprt_reconnect_delay - compute the wait before scheduling a connect 937 * @xprt: transport instance 938 * 939 */ 940 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 941 { 942 unsigned long start, now = jiffies; 943 944 start = xprt->stat.connect_start + xprt->reestablish_timeout; 945 if (time_after(start, now)) 946 return start - now; 947 return 0; 948 } 949 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 950 951 /** 952 * xprt_reconnect_backoff - compute the new re-establish timeout 953 * @xprt: transport instance 954 * @init_to: initial reestablish timeout 955 * 956 */ 957 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 958 { 959 xprt->reestablish_timeout <<= 1; 960 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 961 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 962 if (xprt->reestablish_timeout < init_to) 963 xprt->reestablish_timeout = init_to; 964 } 965 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 966 967 enum xprt_xid_rb_cmp { 968 XID_RB_EQUAL, 969 XID_RB_LEFT, 970 XID_RB_RIGHT, 971 }; 972 static enum xprt_xid_rb_cmp 973 xprt_xid_cmp(__be32 xid1, __be32 xid2) 974 { 975 if (xid1 == xid2) 976 return XID_RB_EQUAL; 977 if ((__force u32)xid1 < (__force u32)xid2) 978 return XID_RB_LEFT; 979 return XID_RB_RIGHT; 980 } 981 982 static struct rpc_rqst * 983 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 984 { 985 struct rb_node *n = xprt->recv_queue.rb_node; 986 struct rpc_rqst *req; 987 988 while (n != NULL) { 989 req = rb_entry(n, struct rpc_rqst, rq_recv); 990 switch (xprt_xid_cmp(xid, req->rq_xid)) { 991 case XID_RB_LEFT: 992 n = n->rb_left; 993 break; 994 case XID_RB_RIGHT: 995 n = n->rb_right; 996 break; 997 case XID_RB_EQUAL: 998 return req; 999 } 1000 } 1001 return NULL; 1002 } 1003 1004 static void 1005 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1006 { 1007 struct rb_node **p = &xprt->recv_queue.rb_node; 1008 struct rb_node *n = NULL; 1009 struct rpc_rqst *req; 1010 1011 while (*p != NULL) { 1012 n = *p; 1013 req = rb_entry(n, struct rpc_rqst, rq_recv); 1014 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1015 case XID_RB_LEFT: 1016 p = &n->rb_left; 1017 break; 1018 case XID_RB_RIGHT: 1019 p = &n->rb_right; 1020 break; 1021 case XID_RB_EQUAL: 1022 WARN_ON_ONCE(new != req); 1023 return; 1024 } 1025 } 1026 rb_link_node(&new->rq_recv, n, p); 1027 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1028 } 1029 1030 static void 1031 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1032 { 1033 rb_erase(&req->rq_recv, &xprt->recv_queue); 1034 } 1035 1036 /** 1037 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1038 * @xprt: transport on which the original request was transmitted 1039 * @xid: RPC XID of incoming reply 1040 * 1041 * Caller holds xprt->queue_lock. 1042 */ 1043 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1044 { 1045 struct rpc_rqst *entry; 1046 1047 entry = xprt_request_rb_find(xprt, xid); 1048 if (entry != NULL) { 1049 trace_xprt_lookup_rqst(xprt, xid, 0); 1050 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1051 return entry; 1052 } 1053 1054 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1055 ntohl(xid)); 1056 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1057 xprt->stat.bad_xids++; 1058 return NULL; 1059 } 1060 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1061 1062 static bool 1063 xprt_is_pinned_rqst(struct rpc_rqst *req) 1064 { 1065 return atomic_read(&req->rq_pin) != 0; 1066 } 1067 1068 /** 1069 * xprt_pin_rqst - Pin a request on the transport receive list 1070 * @req: Request to pin 1071 * 1072 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1073 * so should be holding xprt->queue_lock. 1074 */ 1075 void xprt_pin_rqst(struct rpc_rqst *req) 1076 { 1077 atomic_inc(&req->rq_pin); 1078 } 1079 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1080 1081 /** 1082 * xprt_unpin_rqst - Unpin a request on the transport receive list 1083 * @req: Request to pin 1084 * 1085 * Caller should be holding xprt->queue_lock. 1086 */ 1087 void xprt_unpin_rqst(struct rpc_rqst *req) 1088 { 1089 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1090 atomic_dec(&req->rq_pin); 1091 return; 1092 } 1093 if (atomic_dec_and_test(&req->rq_pin)) 1094 wake_up_var(&req->rq_pin); 1095 } 1096 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1097 1098 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1099 { 1100 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1101 } 1102 1103 static bool 1104 xprt_request_data_received(struct rpc_task *task) 1105 { 1106 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1107 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1108 } 1109 1110 static bool 1111 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1112 { 1113 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1114 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1115 } 1116 1117 /** 1118 * xprt_request_enqueue_receive - Add an request to the receive queue 1119 * @task: RPC task 1120 * 1121 */ 1122 void 1123 xprt_request_enqueue_receive(struct rpc_task *task) 1124 { 1125 struct rpc_rqst *req = task->tk_rqstp; 1126 struct rpc_xprt *xprt = req->rq_xprt; 1127 1128 if (!xprt_request_need_enqueue_receive(task, req)) 1129 return; 1130 1131 xprt_request_prepare(task->tk_rqstp); 1132 spin_lock(&xprt->queue_lock); 1133 1134 /* Update the softirq receive buffer */ 1135 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1136 sizeof(req->rq_private_buf)); 1137 1138 /* Add request to the receive list */ 1139 xprt_request_rb_insert(xprt, req); 1140 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1141 spin_unlock(&xprt->queue_lock); 1142 1143 /* Turn off autodisconnect */ 1144 del_singleshot_timer_sync(&xprt->timer); 1145 } 1146 1147 /** 1148 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1149 * @task: RPC task 1150 * 1151 * Caller must hold xprt->queue_lock. 1152 */ 1153 static void 1154 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1155 { 1156 struct rpc_rqst *req = task->tk_rqstp; 1157 1158 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1159 xprt_request_rb_remove(req->rq_xprt, req); 1160 } 1161 1162 /** 1163 * xprt_update_rtt - Update RPC RTT statistics 1164 * @task: RPC request that recently completed 1165 * 1166 * Caller holds xprt->queue_lock. 1167 */ 1168 void xprt_update_rtt(struct rpc_task *task) 1169 { 1170 struct rpc_rqst *req = task->tk_rqstp; 1171 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1172 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1173 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1174 1175 if (timer) { 1176 if (req->rq_ntrans == 1) 1177 rpc_update_rtt(rtt, timer, m); 1178 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1179 } 1180 } 1181 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1182 1183 /** 1184 * xprt_complete_rqst - called when reply processing is complete 1185 * @task: RPC request that recently completed 1186 * @copied: actual number of bytes received from the transport 1187 * 1188 * Caller holds xprt->queue_lock. 1189 */ 1190 void xprt_complete_rqst(struct rpc_task *task, int copied) 1191 { 1192 struct rpc_rqst *req = task->tk_rqstp; 1193 struct rpc_xprt *xprt = req->rq_xprt; 1194 1195 xprt->stat.recvs++; 1196 1197 req->rq_private_buf.len = copied; 1198 /* Ensure all writes are done before we update */ 1199 /* req->rq_reply_bytes_recvd */ 1200 smp_wmb(); 1201 req->rq_reply_bytes_recvd = copied; 1202 xprt_request_dequeue_receive_locked(task); 1203 rpc_wake_up_queued_task(&xprt->pending, task); 1204 } 1205 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1206 1207 static void xprt_timer(struct rpc_task *task) 1208 { 1209 struct rpc_rqst *req = task->tk_rqstp; 1210 struct rpc_xprt *xprt = req->rq_xprt; 1211 1212 if (task->tk_status != -ETIMEDOUT) 1213 return; 1214 1215 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1216 if (!req->rq_reply_bytes_recvd) { 1217 if (xprt->ops->timer) 1218 xprt->ops->timer(xprt, task); 1219 } else 1220 task->tk_status = 0; 1221 } 1222 1223 /** 1224 * xprt_wait_for_reply_request_def - wait for reply 1225 * @task: pointer to rpc_task 1226 * 1227 * Set a request's retransmit timeout based on the transport's 1228 * default timeout parameters. Used by transports that don't adjust 1229 * the retransmit timeout based on round-trip time estimation, 1230 * and put the task to sleep on the pending queue. 1231 */ 1232 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1233 { 1234 struct rpc_rqst *req = task->tk_rqstp; 1235 1236 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1237 xprt_request_timeout(req)); 1238 } 1239 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1240 1241 /** 1242 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1243 * @task: pointer to rpc_task 1244 * 1245 * Set a request's retransmit timeout using the RTT estimator, 1246 * and put the task to sleep on the pending queue. 1247 */ 1248 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1249 { 1250 int timer = task->tk_msg.rpc_proc->p_timer; 1251 struct rpc_clnt *clnt = task->tk_client; 1252 struct rpc_rtt *rtt = clnt->cl_rtt; 1253 struct rpc_rqst *req = task->tk_rqstp; 1254 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1255 unsigned long timeout; 1256 1257 timeout = rpc_calc_rto(rtt, timer); 1258 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1259 if (timeout > max_timeout || timeout == 0) 1260 timeout = max_timeout; 1261 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1262 jiffies + timeout); 1263 } 1264 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1265 1266 /** 1267 * xprt_request_wait_receive - wait for the reply to an RPC request 1268 * @task: RPC task about to send a request 1269 * 1270 */ 1271 void xprt_request_wait_receive(struct rpc_task *task) 1272 { 1273 struct rpc_rqst *req = task->tk_rqstp; 1274 struct rpc_xprt *xprt = req->rq_xprt; 1275 1276 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1277 return; 1278 /* 1279 * Sleep on the pending queue if we're expecting a reply. 1280 * The spinlock ensures atomicity between the test of 1281 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1282 */ 1283 spin_lock(&xprt->queue_lock); 1284 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1285 xprt->ops->wait_for_reply_request(task); 1286 /* 1287 * Send an extra queue wakeup call if the 1288 * connection was dropped in case the call to 1289 * rpc_sleep_on() raced. 1290 */ 1291 if (xprt_request_retransmit_after_disconnect(task)) 1292 rpc_wake_up_queued_task_set_status(&xprt->pending, 1293 task, -ENOTCONN); 1294 } 1295 spin_unlock(&xprt->queue_lock); 1296 } 1297 1298 static bool 1299 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1300 { 1301 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1302 } 1303 1304 /** 1305 * xprt_request_enqueue_transmit - queue a task for transmission 1306 * @task: pointer to rpc_task 1307 * 1308 * Add a task to the transmission queue. 1309 */ 1310 void 1311 xprt_request_enqueue_transmit(struct rpc_task *task) 1312 { 1313 struct rpc_rqst *pos, *req = task->tk_rqstp; 1314 struct rpc_xprt *xprt = req->rq_xprt; 1315 1316 if (xprt_request_need_enqueue_transmit(task, req)) { 1317 req->rq_bytes_sent = 0; 1318 spin_lock(&xprt->queue_lock); 1319 /* 1320 * Requests that carry congestion control credits are added 1321 * to the head of the list to avoid starvation issues. 1322 */ 1323 if (req->rq_cong) { 1324 xprt_clear_congestion_window_wait(xprt); 1325 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1326 if (pos->rq_cong) 1327 continue; 1328 /* Note: req is added _before_ pos */ 1329 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1330 INIT_LIST_HEAD(&req->rq_xmit2); 1331 goto out; 1332 } 1333 } else if (RPC_IS_SWAPPER(task)) { 1334 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1335 if (pos->rq_cong || pos->rq_bytes_sent) 1336 continue; 1337 if (RPC_IS_SWAPPER(pos->rq_task)) 1338 continue; 1339 /* Note: req is added _before_ pos */ 1340 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1341 INIT_LIST_HEAD(&req->rq_xmit2); 1342 goto out; 1343 } 1344 } else if (!req->rq_seqno) { 1345 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1346 if (pos->rq_task->tk_owner != task->tk_owner) 1347 continue; 1348 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1349 INIT_LIST_HEAD(&req->rq_xmit); 1350 goto out; 1351 } 1352 } 1353 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1354 INIT_LIST_HEAD(&req->rq_xmit2); 1355 out: 1356 atomic_long_inc(&xprt->xmit_queuelen); 1357 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1358 spin_unlock(&xprt->queue_lock); 1359 } 1360 } 1361 1362 /** 1363 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1364 * @task: pointer to rpc_task 1365 * 1366 * Remove a task from the transmission queue 1367 * Caller must hold xprt->queue_lock 1368 */ 1369 static void 1370 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1371 { 1372 struct rpc_rqst *req = task->tk_rqstp; 1373 1374 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1375 return; 1376 if (!list_empty(&req->rq_xmit)) { 1377 list_del(&req->rq_xmit); 1378 if (!list_empty(&req->rq_xmit2)) { 1379 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1380 struct rpc_rqst, rq_xmit2); 1381 list_del(&req->rq_xmit2); 1382 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1383 } 1384 } else 1385 list_del(&req->rq_xmit2); 1386 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1387 } 1388 1389 /** 1390 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1391 * @task: pointer to rpc_task 1392 * 1393 * Remove a task from the transmission queue 1394 */ 1395 static void 1396 xprt_request_dequeue_transmit(struct rpc_task *task) 1397 { 1398 struct rpc_rqst *req = task->tk_rqstp; 1399 struct rpc_xprt *xprt = req->rq_xprt; 1400 1401 spin_lock(&xprt->queue_lock); 1402 xprt_request_dequeue_transmit_locked(task); 1403 spin_unlock(&xprt->queue_lock); 1404 } 1405 1406 /** 1407 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1408 * @task: pointer to rpc_task 1409 * 1410 * Remove a task from the transmit and receive queues, and ensure that 1411 * it is not pinned by the receive work item. 1412 */ 1413 void 1414 xprt_request_dequeue_xprt(struct rpc_task *task) 1415 { 1416 struct rpc_rqst *req = task->tk_rqstp; 1417 struct rpc_xprt *xprt = req->rq_xprt; 1418 1419 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1420 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1421 xprt_is_pinned_rqst(req)) { 1422 spin_lock(&xprt->queue_lock); 1423 xprt_request_dequeue_transmit_locked(task); 1424 xprt_request_dequeue_receive_locked(task); 1425 while (xprt_is_pinned_rqst(req)) { 1426 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1427 spin_unlock(&xprt->queue_lock); 1428 xprt_wait_on_pinned_rqst(req); 1429 spin_lock(&xprt->queue_lock); 1430 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1431 } 1432 spin_unlock(&xprt->queue_lock); 1433 } 1434 } 1435 1436 /** 1437 * xprt_request_prepare - prepare an encoded request for transport 1438 * @req: pointer to rpc_rqst 1439 * 1440 * Calls into the transport layer to do whatever is needed to prepare 1441 * the request for transmission or receive. 1442 */ 1443 void 1444 xprt_request_prepare(struct rpc_rqst *req) 1445 { 1446 struct rpc_xprt *xprt = req->rq_xprt; 1447 1448 if (xprt->ops->prepare_request) 1449 xprt->ops->prepare_request(req); 1450 } 1451 1452 /** 1453 * xprt_request_need_retransmit - Test if a task needs retransmission 1454 * @task: pointer to rpc_task 1455 * 1456 * Test for whether a connection breakage requires the task to retransmit 1457 */ 1458 bool 1459 xprt_request_need_retransmit(struct rpc_task *task) 1460 { 1461 return xprt_request_retransmit_after_disconnect(task); 1462 } 1463 1464 /** 1465 * xprt_prepare_transmit - reserve the transport before sending a request 1466 * @task: RPC task about to send a request 1467 * 1468 */ 1469 bool xprt_prepare_transmit(struct rpc_task *task) 1470 { 1471 struct rpc_rqst *req = task->tk_rqstp; 1472 struct rpc_xprt *xprt = req->rq_xprt; 1473 1474 if (!xprt_lock_write(xprt, task)) { 1475 /* Race breaker: someone may have transmitted us */ 1476 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1477 rpc_wake_up_queued_task_set_status(&xprt->sending, 1478 task, 0); 1479 return false; 1480 1481 } 1482 return true; 1483 } 1484 1485 void xprt_end_transmit(struct rpc_task *task) 1486 { 1487 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1488 1489 xprt_inject_disconnect(xprt); 1490 xprt_release_write(xprt, task); 1491 } 1492 1493 /** 1494 * xprt_request_transmit - send an RPC request on a transport 1495 * @req: pointer to request to transmit 1496 * @snd_task: RPC task that owns the transport lock 1497 * 1498 * This performs the transmission of a single request. 1499 * Note that if the request is not the same as snd_task, then it 1500 * does need to be pinned. 1501 * Returns '0' on success. 1502 */ 1503 static int 1504 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1505 { 1506 struct rpc_xprt *xprt = req->rq_xprt; 1507 struct rpc_task *task = req->rq_task; 1508 unsigned int connect_cookie; 1509 int is_retrans = RPC_WAS_SENT(task); 1510 int status; 1511 1512 if (!req->rq_bytes_sent) { 1513 if (xprt_request_data_received(task)) { 1514 status = 0; 1515 goto out_dequeue; 1516 } 1517 /* Verify that our message lies in the RPCSEC_GSS window */ 1518 if (rpcauth_xmit_need_reencode(task)) { 1519 status = -EBADMSG; 1520 goto out_dequeue; 1521 } 1522 if (RPC_SIGNALLED(task)) { 1523 status = -ERESTARTSYS; 1524 goto out_dequeue; 1525 } 1526 } 1527 1528 /* 1529 * Update req->rq_ntrans before transmitting to avoid races with 1530 * xprt_update_rtt(), which needs to know that it is recording a 1531 * reply to the first transmission. 1532 */ 1533 req->rq_ntrans++; 1534 1535 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1536 connect_cookie = xprt->connect_cookie; 1537 status = xprt->ops->send_request(req); 1538 if (status != 0) { 1539 req->rq_ntrans--; 1540 trace_xprt_transmit(req, status); 1541 return status; 1542 } 1543 1544 if (is_retrans) { 1545 task->tk_client->cl_stats->rpcretrans++; 1546 trace_xprt_retransmit(req); 1547 } 1548 1549 xprt_inject_disconnect(xprt); 1550 1551 task->tk_flags |= RPC_TASK_SENT; 1552 spin_lock(&xprt->transport_lock); 1553 1554 xprt->stat.sends++; 1555 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1556 xprt->stat.bklog_u += xprt->backlog.qlen; 1557 xprt->stat.sending_u += xprt->sending.qlen; 1558 xprt->stat.pending_u += xprt->pending.qlen; 1559 spin_unlock(&xprt->transport_lock); 1560 1561 req->rq_connect_cookie = connect_cookie; 1562 out_dequeue: 1563 trace_xprt_transmit(req, status); 1564 xprt_request_dequeue_transmit(task); 1565 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1566 return status; 1567 } 1568 1569 /** 1570 * xprt_transmit - send an RPC request on a transport 1571 * @task: controlling RPC task 1572 * 1573 * Attempts to drain the transmit queue. On exit, either the transport 1574 * signalled an error that needs to be handled before transmission can 1575 * resume, or @task finished transmitting, and detected that it already 1576 * received a reply. 1577 */ 1578 void 1579 xprt_transmit(struct rpc_task *task) 1580 { 1581 struct rpc_rqst *next, *req = task->tk_rqstp; 1582 struct rpc_xprt *xprt = req->rq_xprt; 1583 int counter, status; 1584 1585 spin_lock(&xprt->queue_lock); 1586 counter = 0; 1587 while (!list_empty(&xprt->xmit_queue)) { 1588 if (++counter == 20) 1589 break; 1590 next = list_first_entry(&xprt->xmit_queue, 1591 struct rpc_rqst, rq_xmit); 1592 xprt_pin_rqst(next); 1593 spin_unlock(&xprt->queue_lock); 1594 status = xprt_request_transmit(next, task); 1595 if (status == -EBADMSG && next != req) 1596 status = 0; 1597 spin_lock(&xprt->queue_lock); 1598 xprt_unpin_rqst(next); 1599 if (status == 0) { 1600 if (!xprt_request_data_received(task) || 1601 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1602 continue; 1603 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1604 task->tk_status = status; 1605 break; 1606 } 1607 spin_unlock(&xprt->queue_lock); 1608 } 1609 1610 static void xprt_complete_request_init(struct rpc_task *task) 1611 { 1612 if (task->tk_rqstp) 1613 xprt_request_init(task); 1614 } 1615 1616 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1617 { 1618 set_bit(XPRT_CONGESTED, &xprt->state); 1619 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1620 } 1621 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1622 1623 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1624 { 1625 struct rpc_rqst *req = data; 1626 1627 if (task->tk_rqstp == NULL) { 1628 memset(req, 0, sizeof(*req)); /* mark unused */ 1629 task->tk_rqstp = req; 1630 return true; 1631 } 1632 return false; 1633 } 1634 1635 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1636 { 1637 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1638 clear_bit(XPRT_CONGESTED, &xprt->state); 1639 return false; 1640 } 1641 return true; 1642 } 1643 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1644 1645 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1646 { 1647 bool ret = false; 1648 1649 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1650 goto out; 1651 spin_lock(&xprt->reserve_lock); 1652 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1653 xprt_add_backlog(xprt, task); 1654 ret = true; 1655 } 1656 spin_unlock(&xprt->reserve_lock); 1657 out: 1658 return ret; 1659 } 1660 1661 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1662 { 1663 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1664 1665 if (xprt->num_reqs >= xprt->max_reqs) 1666 goto out; 1667 ++xprt->num_reqs; 1668 spin_unlock(&xprt->reserve_lock); 1669 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1670 spin_lock(&xprt->reserve_lock); 1671 if (req != NULL) 1672 goto out; 1673 --xprt->num_reqs; 1674 req = ERR_PTR(-ENOMEM); 1675 out: 1676 return req; 1677 } 1678 1679 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1680 { 1681 if (xprt->num_reqs > xprt->min_reqs) { 1682 --xprt->num_reqs; 1683 kfree(req); 1684 return true; 1685 } 1686 return false; 1687 } 1688 1689 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1690 { 1691 struct rpc_rqst *req; 1692 1693 spin_lock(&xprt->reserve_lock); 1694 if (!list_empty(&xprt->free)) { 1695 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1696 list_del(&req->rq_list); 1697 goto out_init_req; 1698 } 1699 req = xprt_dynamic_alloc_slot(xprt); 1700 if (!IS_ERR(req)) 1701 goto out_init_req; 1702 switch (PTR_ERR(req)) { 1703 case -ENOMEM: 1704 dprintk("RPC: dynamic allocation of request slot " 1705 "failed! Retrying\n"); 1706 task->tk_status = -ENOMEM; 1707 break; 1708 case -EAGAIN: 1709 xprt_add_backlog(xprt, task); 1710 dprintk("RPC: waiting for request slot\n"); 1711 fallthrough; 1712 default: 1713 task->tk_status = -EAGAIN; 1714 } 1715 spin_unlock(&xprt->reserve_lock); 1716 return; 1717 out_init_req: 1718 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1719 xprt->num_reqs); 1720 spin_unlock(&xprt->reserve_lock); 1721 1722 task->tk_status = 0; 1723 task->tk_rqstp = req; 1724 } 1725 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1726 1727 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1728 { 1729 spin_lock(&xprt->reserve_lock); 1730 if (!xprt_wake_up_backlog(xprt, req) && 1731 !xprt_dynamic_free_slot(xprt, req)) { 1732 memset(req, 0, sizeof(*req)); /* mark unused */ 1733 list_add(&req->rq_list, &xprt->free); 1734 } 1735 spin_unlock(&xprt->reserve_lock); 1736 } 1737 EXPORT_SYMBOL_GPL(xprt_free_slot); 1738 1739 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1740 { 1741 struct rpc_rqst *req; 1742 while (!list_empty(&xprt->free)) { 1743 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1744 list_del(&req->rq_list); 1745 kfree(req); 1746 } 1747 } 1748 1749 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1750 unsigned int num_prealloc, 1751 unsigned int max_alloc) 1752 { 1753 struct rpc_xprt *xprt; 1754 struct rpc_rqst *req; 1755 int i; 1756 1757 xprt = kzalloc(size, GFP_KERNEL); 1758 if (xprt == NULL) 1759 goto out; 1760 1761 xprt_init(xprt, net); 1762 1763 for (i = 0; i < num_prealloc; i++) { 1764 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1765 if (!req) 1766 goto out_free; 1767 list_add(&req->rq_list, &xprt->free); 1768 } 1769 if (max_alloc > num_prealloc) 1770 xprt->max_reqs = max_alloc; 1771 else 1772 xprt->max_reqs = num_prealloc; 1773 xprt->min_reqs = num_prealloc; 1774 xprt->num_reqs = num_prealloc; 1775 1776 return xprt; 1777 1778 out_free: 1779 xprt_free(xprt); 1780 out: 1781 return NULL; 1782 } 1783 EXPORT_SYMBOL_GPL(xprt_alloc); 1784 1785 void xprt_free(struct rpc_xprt *xprt) 1786 { 1787 put_net(xprt->xprt_net); 1788 xprt_free_all_slots(xprt); 1789 kfree_rcu(xprt, rcu); 1790 } 1791 EXPORT_SYMBOL_GPL(xprt_free); 1792 1793 static void 1794 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1795 { 1796 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1797 } 1798 1799 static __be32 1800 xprt_alloc_xid(struct rpc_xprt *xprt) 1801 { 1802 __be32 xid; 1803 1804 spin_lock(&xprt->reserve_lock); 1805 xid = (__force __be32)xprt->xid++; 1806 spin_unlock(&xprt->reserve_lock); 1807 return xid; 1808 } 1809 1810 static void 1811 xprt_init_xid(struct rpc_xprt *xprt) 1812 { 1813 xprt->xid = prandom_u32(); 1814 } 1815 1816 static void 1817 xprt_request_init(struct rpc_task *task) 1818 { 1819 struct rpc_xprt *xprt = task->tk_xprt; 1820 struct rpc_rqst *req = task->tk_rqstp; 1821 1822 req->rq_task = task; 1823 req->rq_xprt = xprt; 1824 req->rq_buffer = NULL; 1825 req->rq_xid = xprt_alloc_xid(xprt); 1826 xprt_init_connect_cookie(req, xprt); 1827 req->rq_snd_buf.len = 0; 1828 req->rq_snd_buf.buflen = 0; 1829 req->rq_rcv_buf.len = 0; 1830 req->rq_rcv_buf.buflen = 0; 1831 req->rq_snd_buf.bvec = NULL; 1832 req->rq_rcv_buf.bvec = NULL; 1833 req->rq_release_snd_buf = NULL; 1834 xprt_init_majortimeo(task, req); 1835 1836 trace_xprt_reserve(req); 1837 } 1838 1839 static void 1840 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1841 { 1842 xprt->ops->alloc_slot(xprt, task); 1843 if (task->tk_rqstp != NULL) 1844 xprt_request_init(task); 1845 } 1846 1847 /** 1848 * xprt_reserve - allocate an RPC request slot 1849 * @task: RPC task requesting a slot allocation 1850 * 1851 * If the transport is marked as being congested, or if no more 1852 * slots are available, place the task on the transport's 1853 * backlog queue. 1854 */ 1855 void xprt_reserve(struct rpc_task *task) 1856 { 1857 struct rpc_xprt *xprt = task->tk_xprt; 1858 1859 task->tk_status = 0; 1860 if (task->tk_rqstp != NULL) 1861 return; 1862 1863 task->tk_status = -EAGAIN; 1864 if (!xprt_throttle_congested(xprt, task)) 1865 xprt_do_reserve(xprt, task); 1866 } 1867 1868 /** 1869 * xprt_retry_reserve - allocate an RPC request slot 1870 * @task: RPC task requesting a slot allocation 1871 * 1872 * If no more slots are available, place the task on the transport's 1873 * backlog queue. 1874 * Note that the only difference with xprt_reserve is that we now 1875 * ignore the value of the XPRT_CONGESTED flag. 1876 */ 1877 void xprt_retry_reserve(struct rpc_task *task) 1878 { 1879 struct rpc_xprt *xprt = task->tk_xprt; 1880 1881 task->tk_status = 0; 1882 if (task->tk_rqstp != NULL) 1883 return; 1884 1885 task->tk_status = -EAGAIN; 1886 xprt_do_reserve(xprt, task); 1887 } 1888 1889 /** 1890 * xprt_release - release an RPC request slot 1891 * @task: task which is finished with the slot 1892 * 1893 */ 1894 void xprt_release(struct rpc_task *task) 1895 { 1896 struct rpc_xprt *xprt; 1897 struct rpc_rqst *req = task->tk_rqstp; 1898 1899 if (req == NULL) { 1900 if (task->tk_client) { 1901 xprt = task->tk_xprt; 1902 xprt_release_write(xprt, task); 1903 } 1904 return; 1905 } 1906 1907 xprt = req->rq_xprt; 1908 xprt_request_dequeue_xprt(task); 1909 spin_lock(&xprt->transport_lock); 1910 xprt->ops->release_xprt(xprt, task); 1911 if (xprt->ops->release_request) 1912 xprt->ops->release_request(task); 1913 xprt_schedule_autodisconnect(xprt); 1914 spin_unlock(&xprt->transport_lock); 1915 if (req->rq_buffer) 1916 xprt->ops->buf_free(task); 1917 xdr_free_bvec(&req->rq_rcv_buf); 1918 xdr_free_bvec(&req->rq_snd_buf); 1919 if (req->rq_cred != NULL) 1920 put_rpccred(req->rq_cred); 1921 if (req->rq_release_snd_buf) 1922 req->rq_release_snd_buf(req); 1923 1924 task->tk_rqstp = NULL; 1925 if (likely(!bc_prealloc(req))) 1926 xprt->ops->free_slot(xprt, req); 1927 else 1928 xprt_free_bc_request(req); 1929 } 1930 1931 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1932 void 1933 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1934 { 1935 struct xdr_buf *xbufp = &req->rq_snd_buf; 1936 1937 task->tk_rqstp = req; 1938 req->rq_task = task; 1939 xprt_init_connect_cookie(req, req->rq_xprt); 1940 /* 1941 * Set up the xdr_buf length. 1942 * This also indicates that the buffer is XDR encoded already. 1943 */ 1944 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1945 xbufp->tail[0].iov_len; 1946 } 1947 #endif 1948 1949 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1950 { 1951 kref_init(&xprt->kref); 1952 1953 spin_lock_init(&xprt->transport_lock); 1954 spin_lock_init(&xprt->reserve_lock); 1955 spin_lock_init(&xprt->queue_lock); 1956 1957 INIT_LIST_HEAD(&xprt->free); 1958 xprt->recv_queue = RB_ROOT; 1959 INIT_LIST_HEAD(&xprt->xmit_queue); 1960 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1961 spin_lock_init(&xprt->bc_pa_lock); 1962 INIT_LIST_HEAD(&xprt->bc_pa_list); 1963 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1964 INIT_LIST_HEAD(&xprt->xprt_switch); 1965 1966 xprt->last_used = jiffies; 1967 xprt->cwnd = RPC_INITCWND; 1968 xprt->bind_index = 0; 1969 1970 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1971 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1972 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1973 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1974 1975 xprt_init_xid(xprt); 1976 1977 xprt->xprt_net = get_net(net); 1978 } 1979 1980 /** 1981 * xprt_create_transport - create an RPC transport 1982 * @args: rpc transport creation arguments 1983 * 1984 */ 1985 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1986 { 1987 struct rpc_xprt *xprt; 1988 const struct xprt_class *t; 1989 1990 t = xprt_class_find_by_ident(args->ident); 1991 if (!t) { 1992 dprintk("RPC: transport (%d) not supported\n", args->ident); 1993 return ERR_PTR(-EIO); 1994 } 1995 1996 xprt = t->setup(args); 1997 xprt_class_release(t); 1998 1999 if (IS_ERR(xprt)) 2000 goto out; 2001 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2002 xprt->idle_timeout = 0; 2003 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2004 if (xprt_has_timer(xprt)) 2005 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2006 else 2007 timer_setup(&xprt->timer, NULL, 0); 2008 2009 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2010 xprt_destroy(xprt); 2011 return ERR_PTR(-EINVAL); 2012 } 2013 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2014 if (xprt->servername == NULL) { 2015 xprt_destroy(xprt); 2016 return ERR_PTR(-ENOMEM); 2017 } 2018 2019 rpc_xprt_debugfs_register(xprt); 2020 2021 trace_xprt_create(xprt); 2022 out: 2023 return xprt; 2024 } 2025 2026 static void xprt_destroy_cb(struct work_struct *work) 2027 { 2028 struct rpc_xprt *xprt = 2029 container_of(work, struct rpc_xprt, task_cleanup); 2030 2031 trace_xprt_destroy(xprt); 2032 2033 rpc_xprt_debugfs_unregister(xprt); 2034 rpc_destroy_wait_queue(&xprt->binding); 2035 rpc_destroy_wait_queue(&xprt->pending); 2036 rpc_destroy_wait_queue(&xprt->sending); 2037 rpc_destroy_wait_queue(&xprt->backlog); 2038 kfree(xprt->servername); 2039 /* 2040 * Destroy any existing back channel 2041 */ 2042 xprt_destroy_backchannel(xprt, UINT_MAX); 2043 2044 /* 2045 * Tear down transport state and free the rpc_xprt 2046 */ 2047 xprt->ops->destroy(xprt); 2048 } 2049 2050 /** 2051 * xprt_destroy - destroy an RPC transport, killing off all requests. 2052 * @xprt: transport to destroy 2053 * 2054 */ 2055 static void xprt_destroy(struct rpc_xprt *xprt) 2056 { 2057 /* 2058 * Exclude transport connect/disconnect handlers and autoclose 2059 */ 2060 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2061 2062 del_timer_sync(&xprt->timer); 2063 2064 /* 2065 * Destroy sockets etc from the system workqueue so they can 2066 * safely flush receive work running on rpciod. 2067 */ 2068 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2069 schedule_work(&xprt->task_cleanup); 2070 } 2071 2072 static void xprt_destroy_kref(struct kref *kref) 2073 { 2074 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2075 } 2076 2077 /** 2078 * xprt_get - return a reference to an RPC transport. 2079 * @xprt: pointer to the transport 2080 * 2081 */ 2082 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2083 { 2084 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2085 return xprt; 2086 return NULL; 2087 } 2088 EXPORT_SYMBOL_GPL(xprt_get); 2089 2090 /** 2091 * xprt_put - release a reference to an RPC transport. 2092 * @xprt: pointer to the transport 2093 * 2094 */ 2095 void xprt_put(struct rpc_xprt *xprt) 2096 { 2097 if (xprt != NULL) 2098 kref_put(&xprt->kref, xprt_destroy_kref); 2099 } 2100 EXPORT_SYMBOL_GPL(xprt_put); 2101