1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 #include <linux/rcupdate.h> 52 53 #include <trace/events/sunrpc.h> 54 55 #include "sunrpc.h" 56 57 /* 58 * Local variables 59 */ 60 61 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 62 # define RPCDBG_FACILITY RPCDBG_XPRT 63 #endif 64 65 /* 66 * Local functions 67 */ 68 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 69 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 70 static void xprt_connect_status(struct rpc_task *task); 71 static void xprt_destroy(struct rpc_xprt *xprt); 72 73 static DEFINE_SPINLOCK(xprt_list_lock); 74 static LIST_HEAD(xprt_list); 75 76 /** 77 * xprt_register_transport - register a transport implementation 78 * @transport: transport to register 79 * 80 * If a transport implementation is loaded as a kernel module, it can 81 * call this interface to make itself known to the RPC client. 82 * 83 * Returns: 84 * 0: transport successfully registered 85 * -EEXIST: transport already registered 86 * -EINVAL: transport module being unloaded 87 */ 88 int xprt_register_transport(struct xprt_class *transport) 89 { 90 struct xprt_class *t; 91 int result; 92 93 result = -EEXIST; 94 spin_lock(&xprt_list_lock); 95 list_for_each_entry(t, &xprt_list, list) { 96 /* don't register the same transport class twice */ 97 if (t->ident == transport->ident) 98 goto out; 99 } 100 101 list_add_tail(&transport->list, &xprt_list); 102 printk(KERN_INFO "RPC: Registered %s transport module.\n", 103 transport->name); 104 result = 0; 105 106 out: 107 spin_unlock(&xprt_list_lock); 108 return result; 109 } 110 EXPORT_SYMBOL_GPL(xprt_register_transport); 111 112 /** 113 * xprt_unregister_transport - unregister a transport implementation 114 * @transport: transport to unregister 115 * 116 * Returns: 117 * 0: transport successfully unregistered 118 * -ENOENT: transport never registered 119 */ 120 int xprt_unregister_transport(struct xprt_class *transport) 121 { 122 struct xprt_class *t; 123 int result; 124 125 result = 0; 126 spin_lock(&xprt_list_lock); 127 list_for_each_entry(t, &xprt_list, list) { 128 if (t == transport) { 129 printk(KERN_INFO 130 "RPC: Unregistered %s transport module.\n", 131 transport->name); 132 list_del_init(&transport->list); 133 goto out; 134 } 135 } 136 result = -ENOENT; 137 138 out: 139 spin_unlock(&xprt_list_lock); 140 return result; 141 } 142 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 143 144 /** 145 * xprt_load_transport - load a transport implementation 146 * @transport_name: transport to load 147 * 148 * Returns: 149 * 0: transport successfully loaded 150 * -ENOENT: transport module not available 151 */ 152 int xprt_load_transport(const char *transport_name) 153 { 154 struct xprt_class *t; 155 int result; 156 157 result = 0; 158 spin_lock(&xprt_list_lock); 159 list_for_each_entry(t, &xprt_list, list) { 160 if (strcmp(t->name, transport_name) == 0) { 161 spin_unlock(&xprt_list_lock); 162 goto out; 163 } 164 } 165 spin_unlock(&xprt_list_lock); 166 result = request_module("xprt%s", transport_name); 167 out: 168 return result; 169 } 170 EXPORT_SYMBOL_GPL(xprt_load_transport); 171 172 static void xprt_clear_locked(struct rpc_xprt *xprt) 173 { 174 xprt->snd_task = NULL; 175 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 176 smp_mb__before_atomic(); 177 clear_bit(XPRT_LOCKED, &xprt->state); 178 smp_mb__after_atomic(); 179 } else 180 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 181 } 182 183 /** 184 * xprt_reserve_xprt - serialize write access to transports 185 * @task: task that is requesting access to the transport 186 * @xprt: pointer to the target transport 187 * 188 * This prevents mixing the payload of separate requests, and prevents 189 * transport connects from colliding with writes. No congestion control 190 * is provided. 191 */ 192 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 193 { 194 struct rpc_rqst *req = task->tk_rqstp; 195 196 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 197 if (task == xprt->snd_task) 198 return 1; 199 goto out_sleep; 200 } 201 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 202 goto out_unlock; 203 xprt->snd_task = task; 204 205 return 1; 206 207 out_unlock: 208 xprt_clear_locked(xprt); 209 out_sleep: 210 dprintk("RPC: %5u failed to lock transport %p\n", 211 task->tk_pid, xprt); 212 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 213 task->tk_status = -EAGAIN; 214 rpc_sleep_on(&xprt->sending, task, NULL); 215 return 0; 216 } 217 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 218 219 static bool 220 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 221 { 222 return test_bit(XPRT_CWND_WAIT, &xprt->state); 223 } 224 225 static void 226 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 227 { 228 if (!list_empty(&xprt->xmit_queue)) { 229 /* Peek at head of queue to see if it can make progress */ 230 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 231 rq_xmit)->rq_cong) 232 return; 233 } 234 set_bit(XPRT_CWND_WAIT, &xprt->state); 235 } 236 237 static void 238 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 239 { 240 if (!RPCXPRT_CONGESTED(xprt)) 241 clear_bit(XPRT_CWND_WAIT, &xprt->state); 242 } 243 244 /* 245 * xprt_reserve_xprt_cong - serialize write access to transports 246 * @task: task that is requesting access to the transport 247 * 248 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 249 * integrated into the decision of whether a request is allowed to be 250 * woken up and given access to the transport. 251 * Note that the lock is only granted if we know there are free slots. 252 */ 253 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 254 { 255 struct rpc_rqst *req = task->tk_rqstp; 256 257 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 258 if (task == xprt->snd_task) 259 return 1; 260 goto out_sleep; 261 } 262 if (req == NULL) { 263 xprt->snd_task = task; 264 return 1; 265 } 266 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 267 goto out_unlock; 268 if (!xprt_need_congestion_window_wait(xprt)) { 269 xprt->snd_task = task; 270 return 1; 271 } 272 out_unlock: 273 xprt_clear_locked(xprt); 274 out_sleep: 275 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 276 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 277 task->tk_status = -EAGAIN; 278 rpc_sleep_on(&xprt->sending, task, NULL); 279 return 0; 280 } 281 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 282 283 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 284 { 285 int retval; 286 287 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 288 return 1; 289 spin_lock_bh(&xprt->transport_lock); 290 retval = xprt->ops->reserve_xprt(xprt, task); 291 spin_unlock_bh(&xprt->transport_lock); 292 return retval; 293 } 294 295 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 296 { 297 struct rpc_xprt *xprt = data; 298 299 xprt->snd_task = task; 300 return true; 301 } 302 303 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 304 { 305 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 306 return; 307 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 308 goto out_unlock; 309 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 310 __xprt_lock_write_func, xprt)) 311 return; 312 out_unlock: 313 xprt_clear_locked(xprt); 314 } 315 316 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 317 { 318 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 319 return; 320 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 321 goto out_unlock; 322 if (xprt_need_congestion_window_wait(xprt)) 323 goto out_unlock; 324 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 325 __xprt_lock_write_func, xprt)) 326 return; 327 out_unlock: 328 xprt_clear_locked(xprt); 329 } 330 331 /** 332 * xprt_release_xprt - allow other requests to use a transport 333 * @xprt: transport with other tasks potentially waiting 334 * @task: task that is releasing access to the transport 335 * 336 * Note that "task" can be NULL. No congestion control is provided. 337 */ 338 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 339 { 340 if (xprt->snd_task == task) { 341 xprt_clear_locked(xprt); 342 __xprt_lock_write_next(xprt); 343 } 344 } 345 EXPORT_SYMBOL_GPL(xprt_release_xprt); 346 347 /** 348 * xprt_release_xprt_cong - allow other requests to use a transport 349 * @xprt: transport with other tasks potentially waiting 350 * @task: task that is releasing access to the transport 351 * 352 * Note that "task" can be NULL. Another task is awoken to use the 353 * transport if the transport's congestion window allows it. 354 */ 355 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 356 { 357 if (xprt->snd_task == task) { 358 xprt_clear_locked(xprt); 359 __xprt_lock_write_next_cong(xprt); 360 } 361 } 362 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 363 364 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 if (xprt->snd_task != task) 367 return; 368 spin_lock_bh(&xprt->transport_lock); 369 xprt->ops->release_xprt(xprt, task); 370 spin_unlock_bh(&xprt->transport_lock); 371 } 372 373 /* 374 * Van Jacobson congestion avoidance. Check if the congestion window 375 * overflowed. Put the task to sleep if this is the case. 376 */ 377 static int 378 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 379 { 380 if (req->rq_cong) 381 return 1; 382 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 383 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 384 if (RPCXPRT_CONGESTED(xprt)) { 385 xprt_set_congestion_window_wait(xprt); 386 return 0; 387 } 388 req->rq_cong = 1; 389 xprt->cong += RPC_CWNDSCALE; 390 return 1; 391 } 392 393 /* 394 * Adjust the congestion window, and wake up the next task 395 * that has been sleeping due to congestion 396 */ 397 static void 398 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 399 { 400 if (!req->rq_cong) 401 return; 402 req->rq_cong = 0; 403 xprt->cong -= RPC_CWNDSCALE; 404 xprt_test_and_clear_congestion_window_wait(xprt); 405 __xprt_lock_write_next_cong(xprt); 406 } 407 408 /** 409 * xprt_request_get_cong - Request congestion control credits 410 * @xprt: pointer to transport 411 * @req: pointer to RPC request 412 * 413 * Useful for transports that require congestion control. 414 */ 415 bool 416 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 417 { 418 bool ret = false; 419 420 if (req->rq_cong) 421 return true; 422 spin_lock_bh(&xprt->transport_lock); 423 ret = __xprt_get_cong(xprt, req) != 0; 424 spin_unlock_bh(&xprt->transport_lock); 425 return ret; 426 } 427 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 428 429 /** 430 * xprt_release_rqst_cong - housekeeping when request is complete 431 * @task: RPC request that recently completed 432 * 433 * Useful for transports that require congestion control. 434 */ 435 void xprt_release_rqst_cong(struct rpc_task *task) 436 { 437 struct rpc_rqst *req = task->tk_rqstp; 438 439 __xprt_put_cong(req->rq_xprt, req); 440 } 441 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 442 443 /* 444 * Clear the congestion window wait flag and wake up the next 445 * entry on xprt->sending 446 */ 447 static void 448 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 449 { 450 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 451 spin_lock_bh(&xprt->transport_lock); 452 __xprt_lock_write_next_cong(xprt); 453 spin_unlock_bh(&xprt->transport_lock); 454 } 455 } 456 457 /** 458 * xprt_adjust_cwnd - adjust transport congestion window 459 * @xprt: pointer to xprt 460 * @task: recently completed RPC request used to adjust window 461 * @result: result code of completed RPC request 462 * 463 * The transport code maintains an estimate on the maximum number of out- 464 * standing RPC requests, using a smoothed version of the congestion 465 * avoidance implemented in 44BSD. This is basically the Van Jacobson 466 * congestion algorithm: If a retransmit occurs, the congestion window is 467 * halved; otherwise, it is incremented by 1/cwnd when 468 * 469 * - a reply is received and 470 * - a full number of requests are outstanding and 471 * - the congestion window hasn't been updated recently. 472 */ 473 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 474 { 475 struct rpc_rqst *req = task->tk_rqstp; 476 unsigned long cwnd = xprt->cwnd; 477 478 if (result >= 0 && cwnd <= xprt->cong) { 479 /* The (cwnd >> 1) term makes sure 480 * the result gets rounded properly. */ 481 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 482 if (cwnd > RPC_MAXCWND(xprt)) 483 cwnd = RPC_MAXCWND(xprt); 484 __xprt_lock_write_next_cong(xprt); 485 } else if (result == -ETIMEDOUT) { 486 cwnd >>= 1; 487 if (cwnd < RPC_CWNDSCALE) 488 cwnd = RPC_CWNDSCALE; 489 } 490 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 491 xprt->cong, xprt->cwnd, cwnd); 492 xprt->cwnd = cwnd; 493 __xprt_put_cong(xprt, req); 494 } 495 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 496 497 /** 498 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 499 * @xprt: transport with waiting tasks 500 * @status: result code to plant in each task before waking it 501 * 502 */ 503 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 504 { 505 if (status < 0) 506 rpc_wake_up_status(&xprt->pending, status); 507 else 508 rpc_wake_up(&xprt->pending); 509 } 510 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 511 512 /** 513 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 514 * @xprt: transport 515 * 516 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 517 * we don't in general want to force a socket disconnection due to 518 * an incomplete RPC call transmission. 519 */ 520 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 521 { 522 set_bit(XPRT_WRITE_SPACE, &xprt->state); 523 } 524 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 525 526 static bool 527 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 528 { 529 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 530 __xprt_lock_write_next(xprt); 531 dprintk("RPC: write space: waking waiting task on " 532 "xprt %p\n", xprt); 533 return true; 534 } 535 return false; 536 } 537 538 /** 539 * xprt_write_space - wake the task waiting for transport output buffer space 540 * @xprt: transport with waiting tasks 541 * 542 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 543 */ 544 bool xprt_write_space(struct rpc_xprt *xprt) 545 { 546 bool ret; 547 548 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 549 return false; 550 spin_lock_bh(&xprt->transport_lock); 551 ret = xprt_clear_write_space_locked(xprt); 552 spin_unlock_bh(&xprt->transport_lock); 553 return ret; 554 } 555 EXPORT_SYMBOL_GPL(xprt_write_space); 556 557 /** 558 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 559 * @task: task whose timeout is to be set 560 * 561 * Set a request's retransmit timeout based on the transport's 562 * default timeout parameters. Used by transports that don't adjust 563 * the retransmit timeout based on round-trip time estimation. 564 */ 565 void xprt_set_retrans_timeout_def(struct rpc_task *task) 566 { 567 task->tk_timeout = task->tk_rqstp->rq_timeout; 568 } 569 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 570 571 /** 572 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 573 * @task: task whose timeout is to be set 574 * 575 * Set a request's retransmit timeout using the RTT estimator. 576 */ 577 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 578 { 579 int timer = task->tk_msg.rpc_proc->p_timer; 580 struct rpc_clnt *clnt = task->tk_client; 581 struct rpc_rtt *rtt = clnt->cl_rtt; 582 struct rpc_rqst *req = task->tk_rqstp; 583 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 584 585 task->tk_timeout = rpc_calc_rto(rtt, timer); 586 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 587 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 588 task->tk_timeout = max_timeout; 589 } 590 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 591 592 static void xprt_reset_majortimeo(struct rpc_rqst *req) 593 { 594 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 595 596 req->rq_majortimeo = req->rq_timeout; 597 if (to->to_exponential) 598 req->rq_majortimeo <<= to->to_retries; 599 else 600 req->rq_majortimeo += to->to_increment * to->to_retries; 601 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 602 req->rq_majortimeo = to->to_maxval; 603 req->rq_majortimeo += jiffies; 604 } 605 606 /** 607 * xprt_adjust_timeout - adjust timeout values for next retransmit 608 * @req: RPC request containing parameters to use for the adjustment 609 * 610 */ 611 int xprt_adjust_timeout(struct rpc_rqst *req) 612 { 613 struct rpc_xprt *xprt = req->rq_xprt; 614 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 615 int status = 0; 616 617 if (time_before(jiffies, req->rq_majortimeo)) { 618 if (to->to_exponential) 619 req->rq_timeout <<= 1; 620 else 621 req->rq_timeout += to->to_increment; 622 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 623 req->rq_timeout = to->to_maxval; 624 req->rq_retries++; 625 } else { 626 req->rq_timeout = to->to_initval; 627 req->rq_retries = 0; 628 xprt_reset_majortimeo(req); 629 /* Reset the RTT counters == "slow start" */ 630 spin_lock_bh(&xprt->transport_lock); 631 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 632 spin_unlock_bh(&xprt->transport_lock); 633 status = -ETIMEDOUT; 634 } 635 636 if (req->rq_timeout == 0) { 637 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 638 req->rq_timeout = 5 * HZ; 639 } 640 return status; 641 } 642 643 static void xprt_autoclose(struct work_struct *work) 644 { 645 struct rpc_xprt *xprt = 646 container_of(work, struct rpc_xprt, task_cleanup); 647 648 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 649 xprt->ops->close(xprt); 650 xprt_release_write(xprt, NULL); 651 wake_up_bit(&xprt->state, XPRT_LOCKED); 652 } 653 654 /** 655 * xprt_disconnect_done - mark a transport as disconnected 656 * @xprt: transport to flag for disconnect 657 * 658 */ 659 void xprt_disconnect_done(struct rpc_xprt *xprt) 660 { 661 dprintk("RPC: disconnected transport %p\n", xprt); 662 spin_lock_bh(&xprt->transport_lock); 663 xprt_clear_connected(xprt); 664 xprt_clear_write_space_locked(xprt); 665 xprt_wake_pending_tasks(xprt, -EAGAIN); 666 spin_unlock_bh(&xprt->transport_lock); 667 } 668 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 669 670 /** 671 * xprt_force_disconnect - force a transport to disconnect 672 * @xprt: transport to disconnect 673 * 674 */ 675 void xprt_force_disconnect(struct rpc_xprt *xprt) 676 { 677 /* Don't race with the test_bit() in xprt_clear_locked() */ 678 spin_lock_bh(&xprt->transport_lock); 679 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 680 /* Try to schedule an autoclose RPC call */ 681 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 682 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 683 xprt_wake_pending_tasks(xprt, -EAGAIN); 684 spin_unlock_bh(&xprt->transport_lock); 685 } 686 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 687 688 static unsigned int 689 xprt_connect_cookie(struct rpc_xprt *xprt) 690 { 691 return READ_ONCE(xprt->connect_cookie); 692 } 693 694 static bool 695 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 696 { 697 struct rpc_rqst *req = task->tk_rqstp; 698 struct rpc_xprt *xprt = req->rq_xprt; 699 700 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 701 !xprt_connected(xprt); 702 } 703 704 /** 705 * xprt_conditional_disconnect - force a transport to disconnect 706 * @xprt: transport to disconnect 707 * @cookie: 'connection cookie' 708 * 709 * This attempts to break the connection if and only if 'cookie' matches 710 * the current transport 'connection cookie'. It ensures that we don't 711 * try to break the connection more than once when we need to retransmit 712 * a batch of RPC requests. 713 * 714 */ 715 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 716 { 717 /* Don't race with the test_bit() in xprt_clear_locked() */ 718 spin_lock_bh(&xprt->transport_lock); 719 if (cookie != xprt->connect_cookie) 720 goto out; 721 if (test_bit(XPRT_CLOSING, &xprt->state)) 722 goto out; 723 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 724 /* Try to schedule an autoclose RPC call */ 725 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 726 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 727 xprt_wake_pending_tasks(xprt, -EAGAIN); 728 out: 729 spin_unlock_bh(&xprt->transport_lock); 730 } 731 732 static bool 733 xprt_has_timer(const struct rpc_xprt *xprt) 734 { 735 return xprt->idle_timeout != 0; 736 } 737 738 static void 739 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 740 __must_hold(&xprt->transport_lock) 741 { 742 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 743 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 744 } 745 746 static void 747 xprt_init_autodisconnect(struct timer_list *t) 748 { 749 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 750 751 spin_lock(&xprt->transport_lock); 752 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 753 goto out_abort; 754 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 755 xprt->last_used = jiffies; 756 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 757 goto out_abort; 758 spin_unlock(&xprt->transport_lock); 759 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 760 return; 761 out_abort: 762 spin_unlock(&xprt->transport_lock); 763 } 764 765 bool xprt_lock_connect(struct rpc_xprt *xprt, 766 struct rpc_task *task, 767 void *cookie) 768 { 769 bool ret = false; 770 771 spin_lock_bh(&xprt->transport_lock); 772 if (!test_bit(XPRT_LOCKED, &xprt->state)) 773 goto out; 774 if (xprt->snd_task != task) 775 goto out; 776 xprt->snd_task = cookie; 777 ret = true; 778 out: 779 spin_unlock_bh(&xprt->transport_lock); 780 return ret; 781 } 782 783 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 784 { 785 spin_lock_bh(&xprt->transport_lock); 786 if (xprt->snd_task != cookie) 787 goto out; 788 if (!test_bit(XPRT_LOCKED, &xprt->state)) 789 goto out; 790 xprt->snd_task =NULL; 791 xprt->ops->release_xprt(xprt, NULL); 792 xprt_schedule_autodisconnect(xprt); 793 out: 794 spin_unlock_bh(&xprt->transport_lock); 795 wake_up_bit(&xprt->state, XPRT_LOCKED); 796 } 797 798 /** 799 * xprt_connect - schedule a transport connect operation 800 * @task: RPC task that is requesting the connect 801 * 802 */ 803 void xprt_connect(struct rpc_task *task) 804 { 805 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 806 807 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 808 xprt, (xprt_connected(xprt) ? "is" : "is not")); 809 810 if (!xprt_bound(xprt)) { 811 task->tk_status = -EAGAIN; 812 return; 813 } 814 if (!xprt_lock_write(xprt, task)) 815 return; 816 817 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 818 xprt->ops->close(xprt); 819 820 if (!xprt_connected(xprt)) { 821 task->tk_timeout = task->tk_rqstp->rq_timeout; 822 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 823 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 824 825 if (test_bit(XPRT_CLOSING, &xprt->state)) 826 return; 827 if (xprt_test_and_set_connecting(xprt)) 828 return; 829 xprt->stat.connect_start = jiffies; 830 xprt->ops->connect(xprt, task); 831 } 832 xprt_release_write(xprt, task); 833 } 834 835 static void xprt_connect_status(struct rpc_task *task) 836 { 837 switch (task->tk_status) { 838 case 0: 839 dprintk("RPC: %5u xprt_connect_status: connection established\n", 840 task->tk_pid); 841 break; 842 case -ECONNREFUSED: 843 case -ECONNRESET: 844 case -ECONNABORTED: 845 case -ENETUNREACH: 846 case -EHOSTUNREACH: 847 case -EPIPE: 848 case -EAGAIN: 849 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 850 break; 851 case -ETIMEDOUT: 852 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 853 "out\n", task->tk_pid); 854 break; 855 default: 856 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 857 "server %s\n", task->tk_pid, -task->tk_status, 858 task->tk_rqstp->rq_xprt->servername); 859 task->tk_status = -EIO; 860 } 861 } 862 863 enum xprt_xid_rb_cmp { 864 XID_RB_EQUAL, 865 XID_RB_LEFT, 866 XID_RB_RIGHT, 867 }; 868 static enum xprt_xid_rb_cmp 869 xprt_xid_cmp(__be32 xid1, __be32 xid2) 870 { 871 if (xid1 == xid2) 872 return XID_RB_EQUAL; 873 if ((__force u32)xid1 < (__force u32)xid2) 874 return XID_RB_LEFT; 875 return XID_RB_RIGHT; 876 } 877 878 static struct rpc_rqst * 879 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 880 { 881 struct rb_node *n = xprt->recv_queue.rb_node; 882 struct rpc_rqst *req; 883 884 while (n != NULL) { 885 req = rb_entry(n, struct rpc_rqst, rq_recv); 886 switch (xprt_xid_cmp(xid, req->rq_xid)) { 887 case XID_RB_LEFT: 888 n = n->rb_left; 889 break; 890 case XID_RB_RIGHT: 891 n = n->rb_right; 892 break; 893 case XID_RB_EQUAL: 894 return req; 895 } 896 } 897 return NULL; 898 } 899 900 static void 901 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 902 { 903 struct rb_node **p = &xprt->recv_queue.rb_node; 904 struct rb_node *n = NULL; 905 struct rpc_rqst *req; 906 907 while (*p != NULL) { 908 n = *p; 909 req = rb_entry(n, struct rpc_rqst, rq_recv); 910 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 911 case XID_RB_LEFT: 912 p = &n->rb_left; 913 break; 914 case XID_RB_RIGHT: 915 p = &n->rb_right; 916 break; 917 case XID_RB_EQUAL: 918 WARN_ON_ONCE(new != req); 919 return; 920 } 921 } 922 rb_link_node(&new->rq_recv, n, p); 923 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 924 } 925 926 static void 927 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 928 { 929 rb_erase(&req->rq_recv, &xprt->recv_queue); 930 } 931 932 /** 933 * xprt_lookup_rqst - find an RPC request corresponding to an XID 934 * @xprt: transport on which the original request was transmitted 935 * @xid: RPC XID of incoming reply 936 * 937 * Caller holds xprt->queue_lock. 938 */ 939 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 940 { 941 struct rpc_rqst *entry; 942 943 entry = xprt_request_rb_find(xprt, xid); 944 if (entry != NULL) { 945 trace_xprt_lookup_rqst(xprt, xid, 0); 946 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 947 return entry; 948 } 949 950 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 951 ntohl(xid)); 952 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 953 xprt->stat.bad_xids++; 954 return NULL; 955 } 956 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 957 958 static bool 959 xprt_is_pinned_rqst(struct rpc_rqst *req) 960 { 961 return atomic_read(&req->rq_pin) != 0; 962 } 963 964 /** 965 * xprt_pin_rqst - Pin a request on the transport receive list 966 * @req: Request to pin 967 * 968 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 969 * so should be holding the xprt receive lock. 970 */ 971 void xprt_pin_rqst(struct rpc_rqst *req) 972 { 973 atomic_inc(&req->rq_pin); 974 } 975 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 976 977 /** 978 * xprt_unpin_rqst - Unpin a request on the transport receive list 979 * @req: Request to pin 980 * 981 * Caller should be holding the xprt receive lock. 982 */ 983 void xprt_unpin_rqst(struct rpc_rqst *req) 984 { 985 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 986 atomic_dec(&req->rq_pin); 987 return; 988 } 989 if (atomic_dec_and_test(&req->rq_pin)) 990 wake_up_var(&req->rq_pin); 991 } 992 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 993 994 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 995 { 996 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 997 } 998 999 static bool 1000 xprt_request_data_received(struct rpc_task *task) 1001 { 1002 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1003 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1004 } 1005 1006 static bool 1007 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1008 { 1009 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1010 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1011 } 1012 1013 /** 1014 * xprt_request_enqueue_receive - Add an request to the receive queue 1015 * @task: RPC task 1016 * 1017 */ 1018 void 1019 xprt_request_enqueue_receive(struct rpc_task *task) 1020 { 1021 struct rpc_rqst *req = task->tk_rqstp; 1022 struct rpc_xprt *xprt = req->rq_xprt; 1023 1024 if (!xprt_request_need_enqueue_receive(task, req)) 1025 return; 1026 spin_lock(&xprt->queue_lock); 1027 1028 /* Update the softirq receive buffer */ 1029 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1030 sizeof(req->rq_private_buf)); 1031 1032 /* Add request to the receive list */ 1033 xprt_request_rb_insert(xprt, req); 1034 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1035 spin_unlock(&xprt->queue_lock); 1036 1037 xprt_reset_majortimeo(req); 1038 /* Turn off autodisconnect */ 1039 del_singleshot_timer_sync(&xprt->timer); 1040 } 1041 1042 /** 1043 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1044 * @task: RPC task 1045 * 1046 * Caller must hold xprt->queue_lock. 1047 */ 1048 static void 1049 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1050 { 1051 struct rpc_rqst *req = task->tk_rqstp; 1052 1053 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1054 xprt_request_rb_remove(req->rq_xprt, req); 1055 } 1056 1057 /** 1058 * xprt_update_rtt - Update RPC RTT statistics 1059 * @task: RPC request that recently completed 1060 * 1061 * Caller holds xprt->queue_lock. 1062 */ 1063 void xprt_update_rtt(struct rpc_task *task) 1064 { 1065 struct rpc_rqst *req = task->tk_rqstp; 1066 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1067 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1068 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1069 1070 if (timer) { 1071 if (req->rq_ntrans == 1) 1072 rpc_update_rtt(rtt, timer, m); 1073 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1074 } 1075 } 1076 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1077 1078 /** 1079 * xprt_complete_rqst - called when reply processing is complete 1080 * @task: RPC request that recently completed 1081 * @copied: actual number of bytes received from the transport 1082 * 1083 * Caller holds xprt->queue_lock. 1084 */ 1085 void xprt_complete_rqst(struct rpc_task *task, int copied) 1086 { 1087 struct rpc_rqst *req = task->tk_rqstp; 1088 struct rpc_xprt *xprt = req->rq_xprt; 1089 1090 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1091 task->tk_pid, ntohl(req->rq_xid), copied); 1092 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1093 1094 xprt->stat.recvs++; 1095 1096 req->rq_private_buf.len = copied; 1097 /* Ensure all writes are done before we update */ 1098 /* req->rq_reply_bytes_recvd */ 1099 smp_wmb(); 1100 req->rq_reply_bytes_recvd = copied; 1101 xprt_request_dequeue_receive_locked(task); 1102 rpc_wake_up_queued_task(&xprt->pending, task); 1103 } 1104 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1105 1106 static void xprt_timer(struct rpc_task *task) 1107 { 1108 struct rpc_rqst *req = task->tk_rqstp; 1109 struct rpc_xprt *xprt = req->rq_xprt; 1110 1111 if (task->tk_status != -ETIMEDOUT) 1112 return; 1113 1114 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1115 if (!req->rq_reply_bytes_recvd) { 1116 if (xprt->ops->timer) 1117 xprt->ops->timer(xprt, task); 1118 } else 1119 task->tk_status = 0; 1120 } 1121 1122 /** 1123 * xprt_request_wait_receive - wait for the reply to an RPC request 1124 * @task: RPC task about to send a request 1125 * 1126 */ 1127 void xprt_request_wait_receive(struct rpc_task *task) 1128 { 1129 struct rpc_rqst *req = task->tk_rqstp; 1130 struct rpc_xprt *xprt = req->rq_xprt; 1131 1132 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1133 return; 1134 /* 1135 * Sleep on the pending queue if we're expecting a reply. 1136 * The spinlock ensures atomicity between the test of 1137 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1138 */ 1139 spin_lock(&xprt->queue_lock); 1140 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1141 xprt->ops->set_retrans_timeout(task); 1142 rpc_sleep_on(&xprt->pending, task, xprt_timer); 1143 /* 1144 * Send an extra queue wakeup call if the 1145 * connection was dropped in case the call to 1146 * rpc_sleep_on() raced. 1147 */ 1148 if (xprt_request_retransmit_after_disconnect(task)) 1149 rpc_wake_up_queued_task_set_status(&xprt->pending, 1150 task, -ENOTCONN); 1151 } 1152 spin_unlock(&xprt->queue_lock); 1153 } 1154 1155 static bool 1156 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1157 { 1158 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1159 } 1160 1161 /** 1162 * xprt_request_enqueue_transmit - queue a task for transmission 1163 * @task: pointer to rpc_task 1164 * 1165 * Add a task to the transmission queue. 1166 */ 1167 void 1168 xprt_request_enqueue_transmit(struct rpc_task *task) 1169 { 1170 struct rpc_rqst *pos, *req = task->tk_rqstp; 1171 struct rpc_xprt *xprt = req->rq_xprt; 1172 1173 if (xprt_request_need_enqueue_transmit(task, req)) { 1174 spin_lock(&xprt->queue_lock); 1175 /* 1176 * Requests that carry congestion control credits are added 1177 * to the head of the list to avoid starvation issues. 1178 */ 1179 if (req->rq_cong) { 1180 xprt_clear_congestion_window_wait(xprt); 1181 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1182 if (pos->rq_cong) 1183 continue; 1184 /* Note: req is added _before_ pos */ 1185 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1186 INIT_LIST_HEAD(&req->rq_xmit2); 1187 goto out; 1188 } 1189 } else if (RPC_IS_SWAPPER(task)) { 1190 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1191 if (pos->rq_cong || pos->rq_bytes_sent) 1192 continue; 1193 if (RPC_IS_SWAPPER(pos->rq_task)) 1194 continue; 1195 /* Note: req is added _before_ pos */ 1196 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1197 INIT_LIST_HEAD(&req->rq_xmit2); 1198 goto out; 1199 } 1200 } else { 1201 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1202 if (pos->rq_task->tk_owner != task->tk_owner) 1203 continue; 1204 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1205 INIT_LIST_HEAD(&req->rq_xmit); 1206 goto out; 1207 } 1208 } 1209 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1210 INIT_LIST_HEAD(&req->rq_xmit2); 1211 out: 1212 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1213 spin_unlock(&xprt->queue_lock); 1214 } 1215 } 1216 1217 /** 1218 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1219 * @task: pointer to rpc_task 1220 * 1221 * Remove a task from the transmission queue 1222 * Caller must hold xprt->queue_lock 1223 */ 1224 static void 1225 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1226 { 1227 struct rpc_rqst *req = task->tk_rqstp; 1228 1229 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1230 return; 1231 if (!list_empty(&req->rq_xmit)) { 1232 list_del(&req->rq_xmit); 1233 if (!list_empty(&req->rq_xmit2)) { 1234 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1235 struct rpc_rqst, rq_xmit2); 1236 list_del(&req->rq_xmit2); 1237 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1238 } 1239 } else 1240 list_del(&req->rq_xmit2); 1241 } 1242 1243 /** 1244 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1245 * @task: pointer to rpc_task 1246 * 1247 * Remove a task from the transmission queue 1248 */ 1249 static void 1250 xprt_request_dequeue_transmit(struct rpc_task *task) 1251 { 1252 struct rpc_rqst *req = task->tk_rqstp; 1253 struct rpc_xprt *xprt = req->rq_xprt; 1254 1255 spin_lock(&xprt->queue_lock); 1256 xprt_request_dequeue_transmit_locked(task); 1257 spin_unlock(&xprt->queue_lock); 1258 } 1259 1260 /** 1261 * xprt_request_prepare - prepare an encoded request for transport 1262 * @req: pointer to rpc_rqst 1263 * 1264 * Calls into the transport layer to do whatever is needed to prepare 1265 * the request for transmission or receive. 1266 */ 1267 void 1268 xprt_request_prepare(struct rpc_rqst *req) 1269 { 1270 struct rpc_xprt *xprt = req->rq_xprt; 1271 1272 if (xprt->ops->prepare_request) 1273 xprt->ops->prepare_request(req); 1274 } 1275 1276 /** 1277 * xprt_request_need_retransmit - Test if a task needs retransmission 1278 * @task: pointer to rpc_task 1279 * 1280 * Test for whether a connection breakage requires the task to retransmit 1281 */ 1282 bool 1283 xprt_request_need_retransmit(struct rpc_task *task) 1284 { 1285 return xprt_request_retransmit_after_disconnect(task); 1286 } 1287 1288 /** 1289 * xprt_prepare_transmit - reserve the transport before sending a request 1290 * @task: RPC task about to send a request 1291 * 1292 */ 1293 bool xprt_prepare_transmit(struct rpc_task *task) 1294 { 1295 struct rpc_rqst *req = task->tk_rqstp; 1296 struct rpc_xprt *xprt = req->rq_xprt; 1297 1298 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1299 1300 if (!xprt_lock_write(xprt, task)) { 1301 /* Race breaker: someone may have transmitted us */ 1302 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1303 rpc_wake_up_queued_task_set_status(&xprt->sending, 1304 task, 0); 1305 return false; 1306 1307 } 1308 return true; 1309 } 1310 1311 void xprt_end_transmit(struct rpc_task *task) 1312 { 1313 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1314 } 1315 1316 /** 1317 * xprt_request_transmit - send an RPC request on a transport 1318 * @req: pointer to request to transmit 1319 * @snd_task: RPC task that owns the transport lock 1320 * 1321 * This performs the transmission of a single request. 1322 * Note that if the request is not the same as snd_task, then it 1323 * does need to be pinned. 1324 * Returns '0' on success. 1325 */ 1326 static int 1327 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1328 { 1329 struct rpc_xprt *xprt = req->rq_xprt; 1330 struct rpc_task *task = req->rq_task; 1331 unsigned int connect_cookie; 1332 int is_retrans = RPC_WAS_SENT(task); 1333 int status; 1334 1335 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 1336 1337 if (!req->rq_bytes_sent) { 1338 if (xprt_request_data_received(task)) { 1339 status = 0; 1340 goto out_dequeue; 1341 } 1342 /* Verify that our message lies in the RPCSEC_GSS window */ 1343 if (rpcauth_xmit_need_reencode(task)) { 1344 status = -EBADMSG; 1345 goto out_dequeue; 1346 } 1347 } 1348 1349 /* 1350 * Update req->rq_ntrans before transmitting to avoid races with 1351 * xprt_update_rtt(), which needs to know that it is recording a 1352 * reply to the first transmission. 1353 */ 1354 req->rq_ntrans++; 1355 1356 connect_cookie = xprt->connect_cookie; 1357 status = xprt->ops->send_request(req); 1358 trace_xprt_transmit(xprt, req->rq_xid, status); 1359 if (status != 0) { 1360 req->rq_ntrans--; 1361 return status; 1362 } 1363 1364 if (is_retrans) 1365 task->tk_client->cl_stats->rpcretrans++; 1366 1367 xprt_inject_disconnect(xprt); 1368 1369 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 1370 task->tk_flags |= RPC_TASK_SENT; 1371 spin_lock_bh(&xprt->transport_lock); 1372 1373 xprt->stat.sends++; 1374 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1375 xprt->stat.bklog_u += xprt->backlog.qlen; 1376 xprt->stat.sending_u += xprt->sending.qlen; 1377 xprt->stat.pending_u += xprt->pending.qlen; 1378 spin_unlock_bh(&xprt->transport_lock); 1379 1380 req->rq_connect_cookie = connect_cookie; 1381 out_dequeue: 1382 xprt_request_dequeue_transmit(task); 1383 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1384 return status; 1385 } 1386 1387 /** 1388 * xprt_transmit - send an RPC request on a transport 1389 * @task: controlling RPC task 1390 * 1391 * Attempts to drain the transmit queue. On exit, either the transport 1392 * signalled an error that needs to be handled before transmission can 1393 * resume, or @task finished transmitting, and detected that it already 1394 * received a reply. 1395 */ 1396 void 1397 xprt_transmit(struct rpc_task *task) 1398 { 1399 struct rpc_rqst *next, *req = task->tk_rqstp; 1400 struct rpc_xprt *xprt = req->rq_xprt; 1401 int status; 1402 1403 spin_lock(&xprt->queue_lock); 1404 while (!list_empty(&xprt->xmit_queue)) { 1405 next = list_first_entry(&xprt->xmit_queue, 1406 struct rpc_rqst, rq_xmit); 1407 xprt_pin_rqst(next); 1408 spin_unlock(&xprt->queue_lock); 1409 status = xprt_request_transmit(next, task); 1410 if (status == -EBADMSG && next != req) 1411 status = 0; 1412 cond_resched(); 1413 spin_lock(&xprt->queue_lock); 1414 xprt_unpin_rqst(next); 1415 if (status == 0) { 1416 if (!xprt_request_data_received(task) || 1417 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1418 continue; 1419 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1420 task->tk_status = status; 1421 break; 1422 } 1423 spin_unlock(&xprt->queue_lock); 1424 } 1425 1426 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1427 { 1428 set_bit(XPRT_CONGESTED, &xprt->state); 1429 rpc_sleep_on(&xprt->backlog, task, NULL); 1430 } 1431 1432 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1433 { 1434 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1435 clear_bit(XPRT_CONGESTED, &xprt->state); 1436 } 1437 1438 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1439 { 1440 bool ret = false; 1441 1442 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1443 goto out; 1444 spin_lock(&xprt->reserve_lock); 1445 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1446 rpc_sleep_on(&xprt->backlog, task, NULL); 1447 ret = true; 1448 } 1449 spin_unlock(&xprt->reserve_lock); 1450 out: 1451 return ret; 1452 } 1453 1454 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1455 { 1456 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1457 1458 if (xprt->num_reqs >= xprt->max_reqs) 1459 goto out; 1460 ++xprt->num_reqs; 1461 spin_unlock(&xprt->reserve_lock); 1462 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1463 spin_lock(&xprt->reserve_lock); 1464 if (req != NULL) 1465 goto out; 1466 --xprt->num_reqs; 1467 req = ERR_PTR(-ENOMEM); 1468 out: 1469 return req; 1470 } 1471 1472 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1473 { 1474 if (xprt->num_reqs > xprt->min_reqs) { 1475 --xprt->num_reqs; 1476 kfree(req); 1477 return true; 1478 } 1479 return false; 1480 } 1481 1482 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1483 { 1484 struct rpc_rqst *req; 1485 1486 spin_lock(&xprt->reserve_lock); 1487 if (!list_empty(&xprt->free)) { 1488 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1489 list_del(&req->rq_list); 1490 goto out_init_req; 1491 } 1492 req = xprt_dynamic_alloc_slot(xprt); 1493 if (!IS_ERR(req)) 1494 goto out_init_req; 1495 switch (PTR_ERR(req)) { 1496 case -ENOMEM: 1497 dprintk("RPC: dynamic allocation of request slot " 1498 "failed! Retrying\n"); 1499 task->tk_status = -ENOMEM; 1500 break; 1501 case -EAGAIN: 1502 xprt_add_backlog(xprt, task); 1503 dprintk("RPC: waiting for request slot\n"); 1504 /* fall through */ 1505 default: 1506 task->tk_status = -EAGAIN; 1507 } 1508 spin_unlock(&xprt->reserve_lock); 1509 return; 1510 out_init_req: 1511 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1512 xprt->num_reqs); 1513 spin_unlock(&xprt->reserve_lock); 1514 1515 task->tk_status = 0; 1516 task->tk_rqstp = req; 1517 } 1518 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1519 1520 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1521 { 1522 spin_lock(&xprt->reserve_lock); 1523 if (!xprt_dynamic_free_slot(xprt, req)) { 1524 memset(req, 0, sizeof(*req)); /* mark unused */ 1525 list_add(&req->rq_list, &xprt->free); 1526 } 1527 xprt_wake_up_backlog(xprt); 1528 spin_unlock(&xprt->reserve_lock); 1529 } 1530 EXPORT_SYMBOL_GPL(xprt_free_slot); 1531 1532 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1533 { 1534 struct rpc_rqst *req; 1535 while (!list_empty(&xprt->free)) { 1536 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1537 list_del(&req->rq_list); 1538 kfree(req); 1539 } 1540 } 1541 1542 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1543 unsigned int num_prealloc, 1544 unsigned int max_alloc) 1545 { 1546 struct rpc_xprt *xprt; 1547 struct rpc_rqst *req; 1548 int i; 1549 1550 xprt = kzalloc(size, GFP_KERNEL); 1551 if (xprt == NULL) 1552 goto out; 1553 1554 xprt_init(xprt, net); 1555 1556 for (i = 0; i < num_prealloc; i++) { 1557 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1558 if (!req) 1559 goto out_free; 1560 list_add(&req->rq_list, &xprt->free); 1561 } 1562 if (max_alloc > num_prealloc) 1563 xprt->max_reqs = max_alloc; 1564 else 1565 xprt->max_reqs = num_prealloc; 1566 xprt->min_reqs = num_prealloc; 1567 xprt->num_reqs = num_prealloc; 1568 1569 return xprt; 1570 1571 out_free: 1572 xprt_free(xprt); 1573 out: 1574 return NULL; 1575 } 1576 EXPORT_SYMBOL_GPL(xprt_alloc); 1577 1578 void xprt_free(struct rpc_xprt *xprt) 1579 { 1580 put_net(xprt->xprt_net); 1581 xprt_free_all_slots(xprt); 1582 kfree_rcu(xprt, rcu); 1583 } 1584 EXPORT_SYMBOL_GPL(xprt_free); 1585 1586 static void 1587 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1588 { 1589 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1590 } 1591 1592 static __be32 1593 xprt_alloc_xid(struct rpc_xprt *xprt) 1594 { 1595 __be32 xid; 1596 1597 spin_lock(&xprt->reserve_lock); 1598 xid = (__force __be32)xprt->xid++; 1599 spin_unlock(&xprt->reserve_lock); 1600 return xid; 1601 } 1602 1603 static void 1604 xprt_init_xid(struct rpc_xprt *xprt) 1605 { 1606 xprt->xid = prandom_u32(); 1607 } 1608 1609 static void 1610 xprt_request_init(struct rpc_task *task) 1611 { 1612 struct rpc_xprt *xprt = task->tk_xprt; 1613 struct rpc_rqst *req = task->tk_rqstp; 1614 1615 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1616 req->rq_task = task; 1617 req->rq_xprt = xprt; 1618 req->rq_buffer = NULL; 1619 req->rq_xid = xprt_alloc_xid(xprt); 1620 xprt_init_connect_cookie(req, xprt); 1621 req->rq_bytes_sent = 0; 1622 req->rq_snd_buf.len = 0; 1623 req->rq_snd_buf.buflen = 0; 1624 req->rq_rcv_buf.len = 0; 1625 req->rq_rcv_buf.buflen = 0; 1626 req->rq_release_snd_buf = NULL; 1627 xprt_reset_majortimeo(req); 1628 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1629 req, ntohl(req->rq_xid)); 1630 } 1631 1632 static void 1633 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1634 { 1635 xprt->ops->alloc_slot(xprt, task); 1636 if (task->tk_rqstp != NULL) 1637 xprt_request_init(task); 1638 } 1639 1640 /** 1641 * xprt_reserve - allocate an RPC request slot 1642 * @task: RPC task requesting a slot allocation 1643 * 1644 * If the transport is marked as being congested, or if no more 1645 * slots are available, place the task on the transport's 1646 * backlog queue. 1647 */ 1648 void xprt_reserve(struct rpc_task *task) 1649 { 1650 struct rpc_xprt *xprt = task->tk_xprt; 1651 1652 task->tk_status = 0; 1653 if (task->tk_rqstp != NULL) 1654 return; 1655 1656 task->tk_timeout = 0; 1657 task->tk_status = -EAGAIN; 1658 if (!xprt_throttle_congested(xprt, task)) 1659 xprt_do_reserve(xprt, task); 1660 } 1661 1662 /** 1663 * xprt_retry_reserve - allocate an RPC request slot 1664 * @task: RPC task requesting a slot allocation 1665 * 1666 * If no more slots are available, place the task on the transport's 1667 * backlog queue. 1668 * Note that the only difference with xprt_reserve is that we now 1669 * ignore the value of the XPRT_CONGESTED flag. 1670 */ 1671 void xprt_retry_reserve(struct rpc_task *task) 1672 { 1673 struct rpc_xprt *xprt = task->tk_xprt; 1674 1675 task->tk_status = 0; 1676 if (task->tk_rqstp != NULL) 1677 return; 1678 1679 task->tk_timeout = 0; 1680 task->tk_status = -EAGAIN; 1681 xprt_do_reserve(xprt, task); 1682 } 1683 1684 static void 1685 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) 1686 { 1687 struct rpc_xprt *xprt = req->rq_xprt; 1688 1689 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1690 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1691 xprt_is_pinned_rqst(req)) { 1692 spin_lock(&xprt->queue_lock); 1693 xprt_request_dequeue_transmit_locked(task); 1694 xprt_request_dequeue_receive_locked(task); 1695 while (xprt_is_pinned_rqst(req)) { 1696 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1697 spin_unlock(&xprt->queue_lock); 1698 xprt_wait_on_pinned_rqst(req); 1699 spin_lock(&xprt->queue_lock); 1700 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1701 } 1702 spin_unlock(&xprt->queue_lock); 1703 } 1704 } 1705 1706 /** 1707 * xprt_release - release an RPC request slot 1708 * @task: task which is finished with the slot 1709 * 1710 */ 1711 void xprt_release(struct rpc_task *task) 1712 { 1713 struct rpc_xprt *xprt; 1714 struct rpc_rqst *req = task->tk_rqstp; 1715 1716 if (req == NULL) { 1717 if (task->tk_client) { 1718 xprt = task->tk_xprt; 1719 xprt_release_write(xprt, task); 1720 } 1721 return; 1722 } 1723 1724 xprt = req->rq_xprt; 1725 if (task->tk_ops->rpc_count_stats != NULL) 1726 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1727 else if (task->tk_client) 1728 rpc_count_iostats(task, task->tk_client->cl_metrics); 1729 xprt_request_dequeue_all(task, req); 1730 spin_lock_bh(&xprt->transport_lock); 1731 xprt->ops->release_xprt(xprt, task); 1732 if (xprt->ops->release_request) 1733 xprt->ops->release_request(task); 1734 xprt->last_used = jiffies; 1735 xprt_schedule_autodisconnect(xprt); 1736 spin_unlock_bh(&xprt->transport_lock); 1737 if (req->rq_buffer) 1738 xprt->ops->buf_free(task); 1739 xprt_inject_disconnect(xprt); 1740 xdr_free_bvec(&req->rq_rcv_buf); 1741 if (req->rq_cred != NULL) 1742 put_rpccred(req->rq_cred); 1743 task->tk_rqstp = NULL; 1744 if (req->rq_release_snd_buf) 1745 req->rq_release_snd_buf(req); 1746 1747 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1748 if (likely(!bc_prealloc(req))) 1749 xprt->ops->free_slot(xprt, req); 1750 else 1751 xprt_free_bc_request(req); 1752 } 1753 1754 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1755 void 1756 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1757 { 1758 struct xdr_buf *xbufp = &req->rq_snd_buf; 1759 1760 task->tk_rqstp = req; 1761 req->rq_task = task; 1762 xprt_init_connect_cookie(req, req->rq_xprt); 1763 /* 1764 * Set up the xdr_buf length. 1765 * This also indicates that the buffer is XDR encoded already. 1766 */ 1767 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1768 xbufp->tail[0].iov_len; 1769 req->rq_bytes_sent = 0; 1770 } 1771 #endif 1772 1773 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1774 { 1775 kref_init(&xprt->kref); 1776 1777 spin_lock_init(&xprt->transport_lock); 1778 spin_lock_init(&xprt->reserve_lock); 1779 spin_lock_init(&xprt->queue_lock); 1780 1781 INIT_LIST_HEAD(&xprt->free); 1782 xprt->recv_queue = RB_ROOT; 1783 INIT_LIST_HEAD(&xprt->xmit_queue); 1784 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1785 spin_lock_init(&xprt->bc_pa_lock); 1786 INIT_LIST_HEAD(&xprt->bc_pa_list); 1787 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1788 INIT_LIST_HEAD(&xprt->xprt_switch); 1789 1790 xprt->last_used = jiffies; 1791 xprt->cwnd = RPC_INITCWND; 1792 xprt->bind_index = 0; 1793 1794 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1795 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1796 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1797 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1798 1799 xprt_init_xid(xprt); 1800 1801 xprt->xprt_net = get_net(net); 1802 } 1803 1804 /** 1805 * xprt_create_transport - create an RPC transport 1806 * @args: rpc transport creation arguments 1807 * 1808 */ 1809 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1810 { 1811 struct rpc_xprt *xprt; 1812 struct xprt_class *t; 1813 1814 spin_lock(&xprt_list_lock); 1815 list_for_each_entry(t, &xprt_list, list) { 1816 if (t->ident == args->ident) { 1817 spin_unlock(&xprt_list_lock); 1818 goto found; 1819 } 1820 } 1821 spin_unlock(&xprt_list_lock); 1822 dprintk("RPC: transport (%d) not supported\n", args->ident); 1823 return ERR_PTR(-EIO); 1824 1825 found: 1826 xprt = t->setup(args); 1827 if (IS_ERR(xprt)) { 1828 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1829 -PTR_ERR(xprt)); 1830 goto out; 1831 } 1832 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1833 xprt->idle_timeout = 0; 1834 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1835 if (xprt_has_timer(xprt)) 1836 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1837 else 1838 timer_setup(&xprt->timer, NULL, 0); 1839 1840 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1841 xprt_destroy(xprt); 1842 return ERR_PTR(-EINVAL); 1843 } 1844 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1845 if (xprt->servername == NULL) { 1846 xprt_destroy(xprt); 1847 return ERR_PTR(-ENOMEM); 1848 } 1849 1850 rpc_xprt_debugfs_register(xprt); 1851 1852 dprintk("RPC: created transport %p with %u slots\n", xprt, 1853 xprt->max_reqs); 1854 out: 1855 return xprt; 1856 } 1857 1858 static void xprt_destroy_cb(struct work_struct *work) 1859 { 1860 struct rpc_xprt *xprt = 1861 container_of(work, struct rpc_xprt, task_cleanup); 1862 1863 rpc_xprt_debugfs_unregister(xprt); 1864 rpc_destroy_wait_queue(&xprt->binding); 1865 rpc_destroy_wait_queue(&xprt->pending); 1866 rpc_destroy_wait_queue(&xprt->sending); 1867 rpc_destroy_wait_queue(&xprt->backlog); 1868 kfree(xprt->servername); 1869 /* 1870 * Tear down transport state and free the rpc_xprt 1871 */ 1872 xprt->ops->destroy(xprt); 1873 } 1874 1875 /** 1876 * xprt_destroy - destroy an RPC transport, killing off all requests. 1877 * @xprt: transport to destroy 1878 * 1879 */ 1880 static void xprt_destroy(struct rpc_xprt *xprt) 1881 { 1882 dprintk("RPC: destroying transport %p\n", xprt); 1883 1884 /* 1885 * Exclude transport connect/disconnect handlers and autoclose 1886 */ 1887 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1888 1889 del_timer_sync(&xprt->timer); 1890 1891 /* 1892 * Destroy sockets etc from the system workqueue so they can 1893 * safely flush receive work running on rpciod. 1894 */ 1895 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1896 schedule_work(&xprt->task_cleanup); 1897 } 1898 1899 static void xprt_destroy_kref(struct kref *kref) 1900 { 1901 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1902 } 1903 1904 /** 1905 * xprt_get - return a reference to an RPC transport. 1906 * @xprt: pointer to the transport 1907 * 1908 */ 1909 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1910 { 1911 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1912 return xprt; 1913 return NULL; 1914 } 1915 EXPORT_SYMBOL_GPL(xprt_get); 1916 1917 /** 1918 * xprt_put - release a reference to an RPC transport. 1919 * @xprt: pointer to the transport 1920 * 1921 */ 1922 void xprt_put(struct rpc_xprt *xprt) 1923 { 1924 if (xprt != NULL) 1925 kref_put(&xprt->kref, xprt_destroy_kref); 1926 } 1927 EXPORT_SYMBOL_GPL(xprt_put); 1928