1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_call(). 14 * - xprt_call transmits the message and installs the caller on the 15 * socket's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that socket. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 * 36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> 37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> 38 * TCP NFS related read + write fixes 39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 40 * 41 * Rewrite of larges part of the code in order to stabilize TCP stuff. 42 * Fix behaviour when socket buffer is full. 43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 44 */ 45 46 #include <linux/types.h> 47 #include <linux/slab.h> 48 #include <linux/capability.h> 49 #include <linux/sched.h> 50 #include <linux/errno.h> 51 #include <linux/socket.h> 52 #include <linux/in.h> 53 #include <linux/net.h> 54 #include <linux/mm.h> 55 #include <linux/udp.h> 56 #include <linux/tcp.h> 57 #include <linux/sunrpc/clnt.h> 58 #include <linux/file.h> 59 #include <linux/workqueue.h> 60 #include <linux/random.h> 61 62 #include <net/sock.h> 63 #include <net/checksum.h> 64 #include <net/udp.h> 65 #include <net/tcp.h> 66 67 /* 68 * Local variables 69 */ 70 71 #ifdef RPC_DEBUG 72 # undef RPC_DEBUG_DATA 73 # define RPCDBG_FACILITY RPCDBG_XPRT 74 #endif 75 76 #define XPRT_MAX_BACKOFF (8) 77 #define XPRT_IDLE_TIMEOUT (5*60*HZ) 78 #define XPRT_MAX_RESVPORT (800) 79 80 /* 81 * Local functions 82 */ 83 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 84 static inline void do_xprt_reserve(struct rpc_task *); 85 static void xprt_disconnect(struct rpc_xprt *); 86 static void xprt_connect_status(struct rpc_task *task); 87 static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, 88 struct rpc_timeout *to); 89 static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); 90 static void xprt_bind_socket(struct rpc_xprt *, struct socket *); 91 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 92 93 static int xprt_clear_backlog(struct rpc_xprt *xprt); 94 95 #ifdef RPC_DEBUG_DATA 96 /* 97 * Print the buffer contents (first 128 bytes only--just enough for 98 * diropres return). 99 */ 100 static void 101 xprt_pktdump(char *msg, u32 *packet, unsigned int count) 102 { 103 u8 *buf = (u8 *) packet; 104 int j; 105 106 dprintk("RPC: %s\n", msg); 107 for (j = 0; j < count && j < 128; j += 4) { 108 if (!(j & 31)) { 109 if (j) 110 dprintk("\n"); 111 dprintk("0x%04x ", j); 112 } 113 dprintk("%02x%02x%02x%02x ", 114 buf[j], buf[j+1], buf[j+2], buf[j+3]); 115 } 116 dprintk("\n"); 117 } 118 #else 119 static inline void 120 xprt_pktdump(char *msg, u32 *packet, unsigned int count) 121 { 122 /* NOP */ 123 } 124 #endif 125 126 /* 127 * Look up RPC transport given an INET socket 128 */ 129 static inline struct rpc_xprt * 130 xprt_from_sock(struct sock *sk) 131 { 132 return (struct rpc_xprt *) sk->sk_user_data; 133 } 134 135 /* 136 * Serialize write access to sockets, in order to prevent different 137 * requests from interfering with each other. 138 * Also prevents TCP socket connects from colliding with writes. 139 */ 140 static int 141 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 142 { 143 struct rpc_rqst *req = task->tk_rqstp; 144 145 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { 146 if (task == xprt->snd_task) 147 return 1; 148 goto out_sleep; 149 } 150 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 151 xprt->snd_task = task; 152 if (req) { 153 req->rq_bytes_sent = 0; 154 req->rq_ntrans++; 155 } 156 return 1; 157 } 158 smp_mb__before_clear_bit(); 159 clear_bit(XPRT_LOCKED, &xprt->sockstate); 160 smp_mb__after_clear_bit(); 161 out_sleep: 162 dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); 163 task->tk_timeout = 0; 164 task->tk_status = -EAGAIN; 165 if (req && req->rq_ntrans) 166 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 167 else 168 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 169 return 0; 170 } 171 172 static inline int 173 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 174 { 175 int retval; 176 177 spin_lock_bh(&xprt->sock_lock); 178 retval = __xprt_lock_write(xprt, task); 179 spin_unlock_bh(&xprt->sock_lock); 180 return retval; 181 } 182 183 184 static void 185 __xprt_lock_write_next(struct rpc_xprt *xprt) 186 { 187 struct rpc_task *task; 188 189 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 190 return; 191 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) 192 goto out_unlock; 193 task = rpc_wake_up_next(&xprt->resend); 194 if (!task) { 195 task = rpc_wake_up_next(&xprt->sending); 196 if (!task) 197 goto out_unlock; 198 } 199 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 200 struct rpc_rqst *req = task->tk_rqstp; 201 xprt->snd_task = task; 202 if (req) { 203 req->rq_bytes_sent = 0; 204 req->rq_ntrans++; 205 } 206 return; 207 } 208 out_unlock: 209 smp_mb__before_clear_bit(); 210 clear_bit(XPRT_LOCKED, &xprt->sockstate); 211 smp_mb__after_clear_bit(); 212 } 213 214 /* 215 * Releases the socket for use by other requests. 216 */ 217 static void 218 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 219 { 220 if (xprt->snd_task == task) { 221 xprt->snd_task = NULL; 222 smp_mb__before_clear_bit(); 223 clear_bit(XPRT_LOCKED, &xprt->sockstate); 224 smp_mb__after_clear_bit(); 225 __xprt_lock_write_next(xprt); 226 } 227 } 228 229 static inline void 230 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 231 { 232 spin_lock_bh(&xprt->sock_lock); 233 __xprt_release_write(xprt, task); 234 spin_unlock_bh(&xprt->sock_lock); 235 } 236 237 /* 238 * Write data to socket. 239 */ 240 static inline int 241 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) 242 { 243 struct socket *sock = xprt->sock; 244 struct xdr_buf *xdr = &req->rq_snd_buf; 245 struct sockaddr *addr = NULL; 246 int addrlen = 0; 247 unsigned int skip; 248 int result; 249 250 if (!sock) 251 return -ENOTCONN; 252 253 xprt_pktdump("packet data:", 254 req->rq_svec->iov_base, 255 req->rq_svec->iov_len); 256 257 /* For UDP, we need to provide an address */ 258 if (!xprt->stream) { 259 addr = (struct sockaddr *) &xprt->addr; 260 addrlen = sizeof(xprt->addr); 261 } 262 /* Dont repeat bytes */ 263 skip = req->rq_bytes_sent; 264 265 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 266 result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); 267 268 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); 269 270 if (result >= 0) 271 return result; 272 273 switch (result) { 274 case -ECONNREFUSED: 275 /* When the server has died, an ICMP port unreachable message 276 * prompts ECONNREFUSED. 277 */ 278 case -EAGAIN: 279 break; 280 case -ECONNRESET: 281 case -ENOTCONN: 282 case -EPIPE: 283 /* connection broken */ 284 if (xprt->stream) 285 result = -ENOTCONN; 286 break; 287 default: 288 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); 289 } 290 return result; 291 } 292 293 /* 294 * Van Jacobson congestion avoidance. Check if the congestion window 295 * overflowed. Put the task to sleep if this is the case. 296 */ 297 static int 298 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 299 { 300 struct rpc_rqst *req = task->tk_rqstp; 301 302 if (req->rq_cong) 303 return 1; 304 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 305 task->tk_pid, xprt->cong, xprt->cwnd); 306 if (RPCXPRT_CONGESTED(xprt)) 307 return 0; 308 req->rq_cong = 1; 309 xprt->cong += RPC_CWNDSCALE; 310 return 1; 311 } 312 313 /* 314 * Adjust the congestion window, and wake up the next task 315 * that has been sleeping due to congestion 316 */ 317 static void 318 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 319 { 320 if (!req->rq_cong) 321 return; 322 req->rq_cong = 0; 323 xprt->cong -= RPC_CWNDSCALE; 324 __xprt_lock_write_next(xprt); 325 } 326 327 /* 328 * Adjust RPC congestion window 329 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 330 */ 331 static void 332 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 333 { 334 unsigned long cwnd; 335 336 cwnd = xprt->cwnd; 337 if (result >= 0 && cwnd <= xprt->cong) { 338 /* The (cwnd >> 1) term makes sure 339 * the result gets rounded properly. */ 340 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 341 if (cwnd > RPC_MAXCWND(xprt)) 342 cwnd = RPC_MAXCWND(xprt); 343 __xprt_lock_write_next(xprt); 344 } else if (result == -ETIMEDOUT) { 345 cwnd >>= 1; 346 if (cwnd < RPC_CWNDSCALE) 347 cwnd = RPC_CWNDSCALE; 348 } 349 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 350 xprt->cong, xprt->cwnd, cwnd); 351 xprt->cwnd = cwnd; 352 } 353 354 /* 355 * Reset the major timeout value 356 */ 357 static void xprt_reset_majortimeo(struct rpc_rqst *req) 358 { 359 struct rpc_timeout *to = &req->rq_xprt->timeout; 360 361 req->rq_majortimeo = req->rq_timeout; 362 if (to->to_exponential) 363 req->rq_majortimeo <<= to->to_retries; 364 else 365 req->rq_majortimeo += to->to_increment * to->to_retries; 366 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 367 req->rq_majortimeo = to->to_maxval; 368 req->rq_majortimeo += jiffies; 369 } 370 371 /* 372 * Adjust timeout values etc for next retransmit 373 */ 374 int xprt_adjust_timeout(struct rpc_rqst *req) 375 { 376 struct rpc_xprt *xprt = req->rq_xprt; 377 struct rpc_timeout *to = &xprt->timeout; 378 int status = 0; 379 380 if (time_before(jiffies, req->rq_majortimeo)) { 381 if (to->to_exponential) 382 req->rq_timeout <<= 1; 383 else 384 req->rq_timeout += to->to_increment; 385 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 386 req->rq_timeout = to->to_maxval; 387 req->rq_retries++; 388 pprintk("RPC: %lu retrans\n", jiffies); 389 } else { 390 req->rq_timeout = to->to_initval; 391 req->rq_retries = 0; 392 xprt_reset_majortimeo(req); 393 /* Reset the RTT counters == "slow start" */ 394 spin_lock_bh(&xprt->sock_lock); 395 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 396 spin_unlock_bh(&xprt->sock_lock); 397 pprintk("RPC: %lu timeout\n", jiffies); 398 status = -ETIMEDOUT; 399 } 400 401 if (req->rq_timeout == 0) { 402 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 403 req->rq_timeout = 5 * HZ; 404 } 405 return status; 406 } 407 408 /* 409 * Close down a transport socket 410 */ 411 static void 412 xprt_close(struct rpc_xprt *xprt) 413 { 414 struct socket *sock = xprt->sock; 415 struct sock *sk = xprt->inet; 416 417 if (!sk) 418 return; 419 420 write_lock_bh(&sk->sk_callback_lock); 421 xprt->inet = NULL; 422 xprt->sock = NULL; 423 424 sk->sk_user_data = NULL; 425 sk->sk_data_ready = xprt->old_data_ready; 426 sk->sk_state_change = xprt->old_state_change; 427 sk->sk_write_space = xprt->old_write_space; 428 write_unlock_bh(&sk->sk_callback_lock); 429 430 sk->sk_no_check = 0; 431 432 sock_release(sock); 433 } 434 435 static void 436 xprt_socket_autoclose(void *args) 437 { 438 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 439 440 xprt_disconnect(xprt); 441 xprt_close(xprt); 442 xprt_release_write(xprt, NULL); 443 } 444 445 /* 446 * Mark a transport as disconnected 447 */ 448 static void 449 xprt_disconnect(struct rpc_xprt *xprt) 450 { 451 dprintk("RPC: disconnected transport %p\n", xprt); 452 spin_lock_bh(&xprt->sock_lock); 453 xprt_clear_connected(xprt); 454 rpc_wake_up_status(&xprt->pending, -ENOTCONN); 455 spin_unlock_bh(&xprt->sock_lock); 456 } 457 458 /* 459 * Used to allow disconnection when we've been idle 460 */ 461 static void 462 xprt_init_autodisconnect(unsigned long data) 463 { 464 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 465 466 spin_lock(&xprt->sock_lock); 467 if (!list_empty(&xprt->recv) || xprt->shutdown) 468 goto out_abort; 469 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 470 goto out_abort; 471 spin_unlock(&xprt->sock_lock); 472 /* Let keventd close the socket */ 473 if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) 474 xprt_release_write(xprt, NULL); 475 else 476 schedule_work(&xprt->task_cleanup); 477 return; 478 out_abort: 479 spin_unlock(&xprt->sock_lock); 480 } 481 482 static void xprt_socket_connect(void *args) 483 { 484 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 485 struct socket *sock = xprt->sock; 486 int status = -EIO; 487 488 if (xprt->shutdown || xprt->addr.sin_port == 0) 489 goto out; 490 491 /* 492 * Start by resetting any existing state 493 */ 494 xprt_close(xprt); 495 sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); 496 if (sock == NULL) { 497 /* couldn't create socket or bind to reserved port; 498 * this is likely a permanent error, so cause an abort */ 499 goto out; 500 } 501 xprt_bind_socket(xprt, sock); 502 xprt_sock_setbufsize(xprt); 503 504 status = 0; 505 if (!xprt->stream) 506 goto out; 507 508 /* 509 * Tell the socket layer to start connecting... 510 */ 511 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, 512 sizeof(xprt->addr), O_NONBLOCK); 513 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 514 xprt, -status, xprt_connected(xprt), sock->sk->sk_state); 515 if (status < 0) { 516 switch (status) { 517 case -EINPROGRESS: 518 case -EALREADY: 519 goto out_clear; 520 } 521 } 522 out: 523 if (status < 0) 524 rpc_wake_up_status(&xprt->pending, status); 525 else 526 rpc_wake_up(&xprt->pending); 527 out_clear: 528 smp_mb__before_clear_bit(); 529 clear_bit(XPRT_CONNECTING, &xprt->sockstate); 530 smp_mb__after_clear_bit(); 531 } 532 533 /* 534 * Attempt to connect a TCP socket. 535 * 536 */ 537 void xprt_connect(struct rpc_task *task) 538 { 539 struct rpc_xprt *xprt = task->tk_xprt; 540 541 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 542 xprt, (xprt_connected(xprt) ? "is" : "is not")); 543 544 if (xprt->shutdown) { 545 task->tk_status = -EIO; 546 return; 547 } 548 if (!xprt->addr.sin_port) { 549 task->tk_status = -EIO; 550 return; 551 } 552 if (!xprt_lock_write(xprt, task)) 553 return; 554 if (xprt_connected(xprt)) 555 goto out_write; 556 557 if (task->tk_rqstp) 558 task->tk_rqstp->rq_bytes_sent = 0; 559 560 task->tk_timeout = RPC_CONNECT_TIMEOUT; 561 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 562 if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { 563 /* Note: if we are here due to a dropped connection 564 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ 565 * seconds 566 */ 567 if (xprt->sock != NULL) 568 schedule_delayed_work(&xprt->sock_connect, 569 RPC_REESTABLISH_TIMEOUT); 570 else { 571 schedule_work(&xprt->sock_connect); 572 if (!RPC_IS_ASYNC(task)) 573 flush_scheduled_work(); 574 } 575 } 576 return; 577 out_write: 578 xprt_release_write(xprt, task); 579 } 580 581 /* 582 * We arrive here when awoken from waiting on connection establishment. 583 */ 584 static void 585 xprt_connect_status(struct rpc_task *task) 586 { 587 struct rpc_xprt *xprt = task->tk_xprt; 588 589 if (task->tk_status >= 0) { 590 dprintk("RPC: %4d xprt_connect_status: connection established\n", 591 task->tk_pid); 592 return; 593 } 594 595 /* if soft mounted, just cause this RPC to fail */ 596 if (RPC_IS_SOFT(task)) 597 task->tk_status = -EIO; 598 599 switch (task->tk_status) { 600 case -ECONNREFUSED: 601 case -ECONNRESET: 602 case -ENOTCONN: 603 return; 604 case -ETIMEDOUT: 605 dprintk("RPC: %4d xprt_connect_status: timed out\n", 606 task->tk_pid); 607 break; 608 default: 609 printk(KERN_ERR "RPC: error %d connecting to server %s\n", 610 -task->tk_status, task->tk_client->cl_server); 611 } 612 xprt_release_write(xprt, task); 613 } 614 615 /* 616 * Look up the RPC request corresponding to a reply, and then lock it. 617 */ 618 static inline struct rpc_rqst * 619 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 620 { 621 struct list_head *pos; 622 struct rpc_rqst *req = NULL; 623 624 list_for_each(pos, &xprt->recv) { 625 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 626 if (entry->rq_xid == xid) { 627 req = entry; 628 break; 629 } 630 } 631 return req; 632 } 633 634 /* 635 * Complete reply received. 636 * The TCP code relies on us to remove the request from xprt->pending. 637 */ 638 static void 639 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 640 { 641 struct rpc_task *task = req->rq_task; 642 struct rpc_clnt *clnt = task->tk_client; 643 644 /* Adjust congestion window */ 645 if (!xprt->nocong) { 646 unsigned timer = task->tk_msg.rpc_proc->p_timer; 647 xprt_adjust_cwnd(xprt, copied); 648 __xprt_put_cong(xprt, req); 649 if (timer) { 650 if (req->rq_ntrans == 1) 651 rpc_update_rtt(clnt->cl_rtt, timer, 652 (long)jiffies - req->rq_xtime); 653 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 654 } 655 } 656 657 #ifdef RPC_PROFILE 658 /* Profile only reads for now */ 659 if (copied > 1024) { 660 static unsigned long nextstat; 661 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 662 663 pkt_cnt++; 664 pkt_len += req->rq_slen + copied; 665 pkt_rtt += jiffies - req->rq_xtime; 666 if (time_before(nextstat, jiffies)) { 667 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 668 printk("RPC: %ld %ld %ld %ld stat\n", 669 jiffies, pkt_cnt, pkt_len, pkt_rtt); 670 pkt_rtt = pkt_len = pkt_cnt = 0; 671 nextstat = jiffies + 5 * HZ; 672 } 673 } 674 #endif 675 676 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 677 list_del_init(&req->rq_list); 678 req->rq_received = req->rq_private_buf.len = copied; 679 680 /* ... and wake up the process. */ 681 rpc_wake_up_task(task); 682 return; 683 } 684 685 static size_t 686 skb_read_bits(skb_reader_t *desc, void *to, size_t len) 687 { 688 if (len > desc->count) 689 len = desc->count; 690 if (skb_copy_bits(desc->skb, desc->offset, to, len)) 691 return 0; 692 desc->count -= len; 693 desc->offset += len; 694 return len; 695 } 696 697 static size_t 698 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len) 699 { 700 unsigned int csum2, pos; 701 702 if (len > desc->count) 703 len = desc->count; 704 pos = desc->offset; 705 csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0); 706 desc->csum = csum_block_add(desc->csum, csum2, pos); 707 desc->count -= len; 708 desc->offset += len; 709 return len; 710 } 711 712 /* 713 * We have set things up such that we perform the checksum of the UDP 714 * packet in parallel with the copies into the RPC client iovec. -DaveM 715 */ 716 int 717 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) 718 { 719 skb_reader_t desc; 720 721 desc.skb = skb; 722 desc.offset = sizeof(struct udphdr); 723 desc.count = skb->len - desc.offset; 724 725 if (skb->ip_summed == CHECKSUM_UNNECESSARY) 726 goto no_checksum; 727 728 desc.csum = csum_partial(skb->data, desc.offset, skb->csum); 729 if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits) < 0) 730 return -1; 731 if (desc.offset != skb->len) { 732 unsigned int csum2; 733 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0); 734 desc.csum = csum_block_add(desc.csum, csum2, desc.offset); 735 } 736 if (desc.count) 737 return -1; 738 if ((unsigned short)csum_fold(desc.csum)) 739 return -1; 740 return 0; 741 no_checksum: 742 if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits) < 0) 743 return -1; 744 if (desc.count) 745 return -1; 746 return 0; 747 } 748 749 /* 750 * Input handler for RPC replies. Called from a bottom half and hence 751 * atomic. 752 */ 753 static void 754 udp_data_ready(struct sock *sk, int len) 755 { 756 struct rpc_task *task; 757 struct rpc_xprt *xprt; 758 struct rpc_rqst *rovr; 759 struct sk_buff *skb; 760 int err, repsize, copied; 761 u32 _xid, *xp; 762 763 read_lock(&sk->sk_callback_lock); 764 dprintk("RPC: udp_data_ready...\n"); 765 if (!(xprt = xprt_from_sock(sk))) { 766 printk("RPC: udp_data_ready request not found!\n"); 767 goto out; 768 } 769 770 dprintk("RPC: udp_data_ready client %p\n", xprt); 771 772 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 773 goto out; 774 775 if (xprt->shutdown) 776 goto dropit; 777 778 repsize = skb->len - sizeof(struct udphdr); 779 if (repsize < 4) { 780 printk("RPC: impossible RPC reply size %d!\n", repsize); 781 goto dropit; 782 } 783 784 /* Copy the XID from the skb... */ 785 xp = skb_header_pointer(skb, sizeof(struct udphdr), 786 sizeof(_xid), &_xid); 787 if (xp == NULL) 788 goto dropit; 789 790 /* Look up and lock the request corresponding to the given XID */ 791 spin_lock(&xprt->sock_lock); 792 rovr = xprt_lookup_rqst(xprt, *xp); 793 if (!rovr) 794 goto out_unlock; 795 task = rovr->rq_task; 796 797 dprintk("RPC: %4d received reply\n", task->tk_pid); 798 799 if ((copied = rovr->rq_private_buf.buflen) > repsize) 800 copied = repsize; 801 802 /* Suck it into the iovec, verify checksum if not done by hw. */ 803 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) 804 goto out_unlock; 805 806 /* Something worked... */ 807 dst_confirm(skb->dst); 808 809 xprt_complete_rqst(xprt, rovr, copied); 810 811 out_unlock: 812 spin_unlock(&xprt->sock_lock); 813 dropit: 814 skb_free_datagram(sk, skb); 815 out: 816 read_unlock(&sk->sk_callback_lock); 817 } 818 819 /* 820 * Copy from an skb into memory and shrink the skb. 821 */ 822 static inline size_t 823 tcp_copy_data(skb_reader_t *desc, void *p, size_t len) 824 { 825 if (len > desc->count) 826 len = desc->count; 827 if (skb_copy_bits(desc->skb, desc->offset, p, len)) { 828 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", 829 len, desc->count); 830 return 0; 831 } 832 desc->offset += len; 833 desc->count -= len; 834 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", 835 len, desc->count); 836 return len; 837 } 838 839 /* 840 * TCP read fragment marker 841 */ 842 static inline void 843 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) 844 { 845 size_t len, used; 846 char *p; 847 848 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; 849 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; 850 used = tcp_copy_data(desc, p, len); 851 xprt->tcp_offset += used; 852 if (used != len) 853 return; 854 xprt->tcp_reclen = ntohl(xprt->tcp_recm); 855 if (xprt->tcp_reclen & 0x80000000) 856 xprt->tcp_flags |= XPRT_LAST_FRAG; 857 else 858 xprt->tcp_flags &= ~XPRT_LAST_FRAG; 859 xprt->tcp_reclen &= 0x7fffffff; 860 xprt->tcp_flags &= ~XPRT_COPY_RECM; 861 xprt->tcp_offset = 0; 862 /* Sanity check of the record length */ 863 if (xprt->tcp_reclen < 4) { 864 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); 865 xprt_disconnect(xprt); 866 } 867 dprintk("RPC: reading TCP record fragment of length %d\n", 868 xprt->tcp_reclen); 869 } 870 871 static void 872 tcp_check_recm(struct rpc_xprt *xprt) 873 { 874 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", 875 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); 876 if (xprt->tcp_offset == xprt->tcp_reclen) { 877 xprt->tcp_flags |= XPRT_COPY_RECM; 878 xprt->tcp_offset = 0; 879 if (xprt->tcp_flags & XPRT_LAST_FRAG) { 880 xprt->tcp_flags &= ~XPRT_COPY_DATA; 881 xprt->tcp_flags |= XPRT_COPY_XID; 882 xprt->tcp_copied = 0; 883 } 884 } 885 } 886 887 /* 888 * TCP read xid 889 */ 890 static inline void 891 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) 892 { 893 size_t len, used; 894 char *p; 895 896 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; 897 dprintk("RPC: reading XID (%Zu bytes)\n", len); 898 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; 899 used = tcp_copy_data(desc, p, len); 900 xprt->tcp_offset += used; 901 if (used != len) 902 return; 903 xprt->tcp_flags &= ~XPRT_COPY_XID; 904 xprt->tcp_flags |= XPRT_COPY_DATA; 905 xprt->tcp_copied = 4; 906 dprintk("RPC: reading reply for XID %08x\n", 907 ntohl(xprt->tcp_xid)); 908 tcp_check_recm(xprt); 909 } 910 911 /* 912 * TCP read and complete request 913 */ 914 static inline void 915 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) 916 { 917 struct rpc_rqst *req; 918 struct xdr_buf *rcvbuf; 919 size_t len; 920 ssize_t r; 921 922 /* Find and lock the request corresponding to this xid */ 923 spin_lock(&xprt->sock_lock); 924 req = xprt_lookup_rqst(xprt, xprt->tcp_xid); 925 if (!req) { 926 xprt->tcp_flags &= ~XPRT_COPY_DATA; 927 dprintk("RPC: XID %08x request not found!\n", 928 ntohl(xprt->tcp_xid)); 929 spin_unlock(&xprt->sock_lock); 930 return; 931 } 932 933 rcvbuf = &req->rq_private_buf; 934 len = desc->count; 935 if (len > xprt->tcp_reclen - xprt->tcp_offset) { 936 skb_reader_t my_desc; 937 938 len = xprt->tcp_reclen - xprt->tcp_offset; 939 memcpy(&my_desc, desc, sizeof(my_desc)); 940 my_desc.count = len; 941 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 942 &my_desc, tcp_copy_data); 943 desc->count -= r; 944 desc->offset += r; 945 } else 946 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 947 desc, tcp_copy_data); 948 949 if (r > 0) { 950 xprt->tcp_copied += r; 951 xprt->tcp_offset += r; 952 } 953 if (r != len) { 954 /* Error when copying to the receive buffer, 955 * usually because we weren't able to allocate 956 * additional buffer pages. All we can do now 957 * is turn off XPRT_COPY_DATA, so the request 958 * will not receive any additional updates, 959 * and time out. 960 * Any remaining data from this record will 961 * be discarded. 962 */ 963 xprt->tcp_flags &= ~XPRT_COPY_DATA; 964 dprintk("RPC: XID %08x truncated request\n", 965 ntohl(xprt->tcp_xid)); 966 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 967 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 968 goto out; 969 } 970 971 dprintk("RPC: XID %08x read %Zd bytes\n", 972 ntohl(xprt->tcp_xid), r); 973 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 974 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 975 976 if (xprt->tcp_copied == req->rq_private_buf.buflen) 977 xprt->tcp_flags &= ~XPRT_COPY_DATA; 978 else if (xprt->tcp_offset == xprt->tcp_reclen) { 979 if (xprt->tcp_flags & XPRT_LAST_FRAG) 980 xprt->tcp_flags &= ~XPRT_COPY_DATA; 981 } 982 983 out: 984 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { 985 dprintk("RPC: %4d received reply complete\n", 986 req->rq_task->tk_pid); 987 xprt_complete_rqst(xprt, req, xprt->tcp_copied); 988 } 989 spin_unlock(&xprt->sock_lock); 990 tcp_check_recm(xprt); 991 } 992 993 /* 994 * TCP discard extra bytes from a short read 995 */ 996 static inline void 997 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) 998 { 999 size_t len; 1000 1001 len = xprt->tcp_reclen - xprt->tcp_offset; 1002 if (len > desc->count) 1003 len = desc->count; 1004 desc->count -= len; 1005 desc->offset += len; 1006 xprt->tcp_offset += len; 1007 dprintk("RPC: discarded %Zu bytes\n", len); 1008 tcp_check_recm(xprt); 1009 } 1010 1011 /* 1012 * TCP record receive routine 1013 * We first have to grab the record marker, then the XID, then the data. 1014 */ 1015 static int 1016 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, 1017 unsigned int offset, size_t len) 1018 { 1019 struct rpc_xprt *xprt = rd_desc->arg.data; 1020 skb_reader_t desc = { 1021 .skb = skb, 1022 .offset = offset, 1023 .count = len, 1024 .csum = 0 1025 }; 1026 1027 dprintk("RPC: tcp_data_recv\n"); 1028 do { 1029 /* Read in a new fragment marker if necessary */ 1030 /* Can we ever really expect to get completely empty fragments? */ 1031 if (xprt->tcp_flags & XPRT_COPY_RECM) { 1032 tcp_read_fraghdr(xprt, &desc); 1033 continue; 1034 } 1035 /* Read in the xid if necessary */ 1036 if (xprt->tcp_flags & XPRT_COPY_XID) { 1037 tcp_read_xid(xprt, &desc); 1038 continue; 1039 } 1040 /* Read in the request data */ 1041 if (xprt->tcp_flags & XPRT_COPY_DATA) { 1042 tcp_read_request(xprt, &desc); 1043 continue; 1044 } 1045 /* Skip over any trailing bytes on short reads */ 1046 tcp_read_discard(xprt, &desc); 1047 } while (desc.count); 1048 dprintk("RPC: tcp_data_recv done\n"); 1049 return len - desc.count; 1050 } 1051 1052 static void tcp_data_ready(struct sock *sk, int bytes) 1053 { 1054 struct rpc_xprt *xprt; 1055 read_descriptor_t rd_desc; 1056 1057 read_lock(&sk->sk_callback_lock); 1058 dprintk("RPC: tcp_data_ready...\n"); 1059 if (!(xprt = xprt_from_sock(sk))) { 1060 printk("RPC: tcp_data_ready socket info not found!\n"); 1061 goto out; 1062 } 1063 if (xprt->shutdown) 1064 goto out; 1065 1066 /* We use rd_desc to pass struct xprt to tcp_data_recv */ 1067 rd_desc.arg.data = xprt; 1068 rd_desc.count = 65536; 1069 tcp_read_sock(sk, &rd_desc, tcp_data_recv); 1070 out: 1071 read_unlock(&sk->sk_callback_lock); 1072 } 1073 1074 static void 1075 tcp_state_change(struct sock *sk) 1076 { 1077 struct rpc_xprt *xprt; 1078 1079 read_lock(&sk->sk_callback_lock); 1080 if (!(xprt = xprt_from_sock(sk))) 1081 goto out; 1082 dprintk("RPC: tcp_state_change client %p...\n", xprt); 1083 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 1084 sk->sk_state, xprt_connected(xprt), 1085 sock_flag(sk, SOCK_DEAD), 1086 sock_flag(sk, SOCK_ZAPPED)); 1087 1088 switch (sk->sk_state) { 1089 case TCP_ESTABLISHED: 1090 spin_lock_bh(&xprt->sock_lock); 1091 if (!xprt_test_and_set_connected(xprt)) { 1092 /* Reset TCP record info */ 1093 xprt->tcp_offset = 0; 1094 xprt->tcp_reclen = 0; 1095 xprt->tcp_copied = 0; 1096 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; 1097 rpc_wake_up(&xprt->pending); 1098 } 1099 spin_unlock_bh(&xprt->sock_lock); 1100 break; 1101 case TCP_SYN_SENT: 1102 case TCP_SYN_RECV: 1103 break; 1104 default: 1105 xprt_disconnect(xprt); 1106 break; 1107 } 1108 out: 1109 read_unlock(&sk->sk_callback_lock); 1110 } 1111 1112 /* 1113 * Called when more output buffer space is available for this socket. 1114 * We try not to wake our writers until they can make "significant" 1115 * progress, otherwise we'll waste resources thrashing sock_sendmsg 1116 * with a bunch of small requests. 1117 */ 1118 static void 1119 xprt_write_space(struct sock *sk) 1120 { 1121 struct rpc_xprt *xprt; 1122 struct socket *sock; 1123 1124 read_lock(&sk->sk_callback_lock); 1125 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) 1126 goto out; 1127 if (xprt->shutdown) 1128 goto out; 1129 1130 /* Wait until we have enough socket memory */ 1131 if (xprt->stream) { 1132 /* from net/core/stream.c:sk_stream_write_space */ 1133 if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) 1134 goto out; 1135 } else { 1136 /* from net/core/sock.c:sock_def_write_space */ 1137 if (!sock_writeable(sk)) 1138 goto out; 1139 } 1140 1141 if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) 1142 goto out; 1143 1144 spin_lock_bh(&xprt->sock_lock); 1145 if (xprt->snd_task) 1146 rpc_wake_up_task(xprt->snd_task); 1147 spin_unlock_bh(&xprt->sock_lock); 1148 out: 1149 read_unlock(&sk->sk_callback_lock); 1150 } 1151 1152 /* 1153 * RPC receive timeout handler. 1154 */ 1155 static void 1156 xprt_timer(struct rpc_task *task) 1157 { 1158 struct rpc_rqst *req = task->tk_rqstp; 1159 struct rpc_xprt *xprt = req->rq_xprt; 1160 1161 spin_lock(&xprt->sock_lock); 1162 if (req->rq_received) 1163 goto out; 1164 1165 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 1166 __xprt_put_cong(xprt, req); 1167 1168 dprintk("RPC: %4d xprt_timer (%s request)\n", 1169 task->tk_pid, req ? "pending" : "backlogged"); 1170 1171 task->tk_status = -ETIMEDOUT; 1172 out: 1173 task->tk_timeout = 0; 1174 rpc_wake_up_task(task); 1175 spin_unlock(&xprt->sock_lock); 1176 } 1177 1178 /* 1179 * Place the actual RPC call. 1180 * We have to copy the iovec because sendmsg fiddles with its contents. 1181 */ 1182 int 1183 xprt_prepare_transmit(struct rpc_task *task) 1184 { 1185 struct rpc_rqst *req = task->tk_rqstp; 1186 struct rpc_xprt *xprt = req->rq_xprt; 1187 int err = 0; 1188 1189 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 1190 1191 if (xprt->shutdown) 1192 return -EIO; 1193 1194 spin_lock_bh(&xprt->sock_lock); 1195 if (req->rq_received && !req->rq_bytes_sent) { 1196 err = req->rq_received; 1197 goto out_unlock; 1198 } 1199 if (!__xprt_lock_write(xprt, task)) { 1200 err = -EAGAIN; 1201 goto out_unlock; 1202 } 1203 1204 if (!xprt_connected(xprt)) { 1205 err = -ENOTCONN; 1206 goto out_unlock; 1207 } 1208 out_unlock: 1209 spin_unlock_bh(&xprt->sock_lock); 1210 return err; 1211 } 1212 1213 void 1214 xprt_transmit(struct rpc_task *task) 1215 { 1216 struct rpc_clnt *clnt = task->tk_client; 1217 struct rpc_rqst *req = task->tk_rqstp; 1218 struct rpc_xprt *xprt = req->rq_xprt; 1219 int status, retry = 0; 1220 1221 1222 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 1223 1224 /* set up everything as needed. */ 1225 /* Write the record marker */ 1226 if (xprt->stream) { 1227 u32 *marker = req->rq_svec[0].iov_base; 1228 1229 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 1230 } 1231 1232 smp_rmb(); 1233 if (!req->rq_received) { 1234 if (list_empty(&req->rq_list)) { 1235 spin_lock_bh(&xprt->sock_lock); 1236 /* Update the softirq receive buffer */ 1237 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1238 sizeof(req->rq_private_buf)); 1239 /* Add request to the receive list */ 1240 list_add_tail(&req->rq_list, &xprt->recv); 1241 spin_unlock_bh(&xprt->sock_lock); 1242 xprt_reset_majortimeo(req); 1243 /* Turn off autodisconnect */ 1244 del_singleshot_timer_sync(&xprt->timer); 1245 } 1246 } else if (!req->rq_bytes_sent) 1247 return; 1248 1249 /* Continue transmitting the packet/record. We must be careful 1250 * to cope with writespace callbacks arriving _after_ we have 1251 * called xprt_sendmsg(). 1252 */ 1253 while (1) { 1254 req->rq_xtime = jiffies; 1255 status = xprt_sendmsg(xprt, req); 1256 1257 if (status < 0) 1258 break; 1259 1260 if (xprt->stream) { 1261 req->rq_bytes_sent += status; 1262 1263 /* If we've sent the entire packet, immediately 1264 * reset the count of bytes sent. */ 1265 if (req->rq_bytes_sent >= req->rq_slen) { 1266 req->rq_bytes_sent = 0; 1267 goto out_receive; 1268 } 1269 } else { 1270 if (status >= req->rq_slen) 1271 goto out_receive; 1272 status = -EAGAIN; 1273 break; 1274 } 1275 1276 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", 1277 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 1278 req->rq_slen); 1279 1280 status = -EAGAIN; 1281 if (retry++ > 50) 1282 break; 1283 } 1284 1285 /* Note: at this point, task->tk_sleeping has not yet been set, 1286 * hence there is no danger of the waking up task being put on 1287 * schedq, and being picked up by a parallel run of rpciod(). 1288 */ 1289 task->tk_status = status; 1290 1291 switch (status) { 1292 case -EAGAIN: 1293 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 1294 /* Protect against races with xprt_write_space */ 1295 spin_lock_bh(&xprt->sock_lock); 1296 /* Don't race with disconnect */ 1297 if (!xprt_connected(xprt)) 1298 task->tk_status = -ENOTCONN; 1299 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { 1300 task->tk_timeout = req->rq_timeout; 1301 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 1302 } 1303 spin_unlock_bh(&xprt->sock_lock); 1304 return; 1305 } 1306 /* Keep holding the socket if it is blocked */ 1307 rpc_delay(task, HZ>>4); 1308 return; 1309 case -ECONNREFUSED: 1310 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 1311 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 1312 case -ENOTCONN: 1313 return; 1314 default: 1315 if (xprt->stream) 1316 xprt_disconnect(xprt); 1317 } 1318 xprt_release_write(xprt, task); 1319 return; 1320 out_receive: 1321 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 1322 /* Set the task's receive timeout value */ 1323 spin_lock_bh(&xprt->sock_lock); 1324 if (!xprt->nocong) { 1325 int timer = task->tk_msg.rpc_proc->p_timer; 1326 task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); 1327 task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; 1328 if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) 1329 task->tk_timeout = xprt->timeout.to_maxval; 1330 } else 1331 task->tk_timeout = req->rq_timeout; 1332 /* Don't race with disconnect */ 1333 if (!xprt_connected(xprt)) 1334 task->tk_status = -ENOTCONN; 1335 else if (!req->rq_received) 1336 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 1337 __xprt_release_write(xprt, task); 1338 spin_unlock_bh(&xprt->sock_lock); 1339 } 1340 1341 /* 1342 * Reserve an RPC call slot. 1343 */ 1344 static inline void 1345 do_xprt_reserve(struct rpc_task *task) 1346 { 1347 struct rpc_xprt *xprt = task->tk_xprt; 1348 1349 task->tk_status = 0; 1350 if (task->tk_rqstp) 1351 return; 1352 if (!list_empty(&xprt->free)) { 1353 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1354 list_del_init(&req->rq_list); 1355 task->tk_rqstp = req; 1356 xprt_request_init(task, xprt); 1357 return; 1358 } 1359 dprintk("RPC: waiting for request slot\n"); 1360 task->tk_status = -EAGAIN; 1361 task->tk_timeout = 0; 1362 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 1363 } 1364 1365 void 1366 xprt_reserve(struct rpc_task *task) 1367 { 1368 struct rpc_xprt *xprt = task->tk_xprt; 1369 1370 task->tk_status = -EIO; 1371 if (!xprt->shutdown) { 1372 spin_lock(&xprt->xprt_lock); 1373 do_xprt_reserve(task); 1374 spin_unlock(&xprt->xprt_lock); 1375 } 1376 } 1377 1378 /* 1379 * Allocate a 'unique' XID 1380 */ 1381 static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 1382 { 1383 return xprt->xid++; 1384 } 1385 1386 static inline void xprt_init_xid(struct rpc_xprt *xprt) 1387 { 1388 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 1389 } 1390 1391 /* 1392 * Initialize RPC request 1393 */ 1394 static void 1395 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1396 { 1397 struct rpc_rqst *req = task->tk_rqstp; 1398 1399 req->rq_timeout = xprt->timeout.to_initval; 1400 req->rq_task = task; 1401 req->rq_xprt = xprt; 1402 req->rq_xid = xprt_alloc_xid(xprt); 1403 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 1404 req, ntohl(req->rq_xid)); 1405 } 1406 1407 /* 1408 * Release an RPC call slot 1409 */ 1410 void 1411 xprt_release(struct rpc_task *task) 1412 { 1413 struct rpc_xprt *xprt = task->tk_xprt; 1414 struct rpc_rqst *req; 1415 1416 if (!(req = task->tk_rqstp)) 1417 return; 1418 spin_lock_bh(&xprt->sock_lock); 1419 __xprt_release_write(xprt, task); 1420 __xprt_put_cong(xprt, req); 1421 if (!list_empty(&req->rq_list)) 1422 list_del(&req->rq_list); 1423 xprt->last_used = jiffies; 1424 if (list_empty(&xprt->recv) && !xprt->shutdown) 1425 mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); 1426 spin_unlock_bh(&xprt->sock_lock); 1427 task->tk_rqstp = NULL; 1428 memset(req, 0, sizeof(*req)); /* mark unused */ 1429 1430 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 1431 1432 spin_lock(&xprt->xprt_lock); 1433 list_add(&req->rq_list, &xprt->free); 1434 xprt_clear_backlog(xprt); 1435 spin_unlock(&xprt->xprt_lock); 1436 } 1437 1438 /* 1439 * Set default timeout parameters 1440 */ 1441 static void 1442 xprt_default_timeout(struct rpc_timeout *to, int proto) 1443 { 1444 if (proto == IPPROTO_UDP) 1445 xprt_set_timeout(to, 5, 5 * HZ); 1446 else 1447 xprt_set_timeout(to, 5, 60 * HZ); 1448 } 1449 1450 /* 1451 * Set constant timeout 1452 */ 1453 void 1454 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 1455 { 1456 to->to_initval = 1457 to->to_increment = incr; 1458 to->to_maxval = incr * retr; 1459 to->to_retries = retr; 1460 to->to_exponential = 0; 1461 } 1462 1463 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 1464 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 1465 1466 /* 1467 * Initialize an RPC client 1468 */ 1469 static struct rpc_xprt * 1470 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 1471 { 1472 struct rpc_xprt *xprt; 1473 unsigned int entries; 1474 size_t slot_table_size; 1475 struct rpc_rqst *req; 1476 1477 dprintk("RPC: setting up %s transport...\n", 1478 proto == IPPROTO_UDP? "UDP" : "TCP"); 1479 1480 entries = (proto == IPPROTO_TCP)? 1481 xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; 1482 1483 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 1484 return ERR_PTR(-ENOMEM); 1485 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 1486 xprt->max_reqs = entries; 1487 slot_table_size = entries * sizeof(xprt->slot[0]); 1488 xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); 1489 if (xprt->slot == NULL) { 1490 kfree(xprt); 1491 return ERR_PTR(-ENOMEM); 1492 } 1493 memset(xprt->slot, 0, slot_table_size); 1494 1495 xprt->addr = *ap; 1496 xprt->prot = proto; 1497 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; 1498 if (xprt->stream) { 1499 xprt->cwnd = RPC_MAXCWND(xprt); 1500 xprt->nocong = 1; 1501 xprt->max_payload = (1U << 31) - 1; 1502 } else { 1503 xprt->cwnd = RPC_INITCWND; 1504 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 1505 } 1506 spin_lock_init(&xprt->sock_lock); 1507 spin_lock_init(&xprt->xprt_lock); 1508 init_waitqueue_head(&xprt->cong_wait); 1509 1510 INIT_LIST_HEAD(&xprt->free); 1511 INIT_LIST_HEAD(&xprt->recv); 1512 INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 1513 INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); 1514 init_timer(&xprt->timer); 1515 xprt->timer.function = xprt_init_autodisconnect; 1516 xprt->timer.data = (unsigned long) xprt; 1517 xprt->last_used = jiffies; 1518 xprt->port = XPRT_MAX_RESVPORT; 1519 1520 /* Set timeout parameters */ 1521 if (to) { 1522 xprt->timeout = *to; 1523 } else 1524 xprt_default_timeout(&xprt->timeout, xprt->prot); 1525 1526 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1527 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1528 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 1529 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1530 1531 /* initialize free list */ 1532 for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) 1533 list_add(&req->rq_list, &xprt->free); 1534 1535 xprt_init_xid(xprt); 1536 1537 /* Check whether we want to use a reserved port */ 1538 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 1539 1540 dprintk("RPC: created transport %p with %u slots\n", xprt, 1541 xprt->max_reqs); 1542 1543 return xprt; 1544 } 1545 1546 /* 1547 * Bind to a reserved port 1548 */ 1549 static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) 1550 { 1551 struct sockaddr_in myaddr = { 1552 .sin_family = AF_INET, 1553 }; 1554 int err, port; 1555 1556 /* Were we already bound to a given port? Try to reuse it */ 1557 port = xprt->port; 1558 do { 1559 myaddr.sin_port = htons(port); 1560 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, 1561 sizeof(myaddr)); 1562 if (err == 0) { 1563 xprt->port = port; 1564 return 0; 1565 } 1566 if (--port == 0) 1567 port = XPRT_MAX_RESVPORT; 1568 } while (err == -EADDRINUSE && port != xprt->port); 1569 1570 printk("RPC: Can't bind to reserved port (%d).\n", -err); 1571 return err; 1572 } 1573 1574 static void 1575 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) 1576 { 1577 struct sock *sk = sock->sk; 1578 1579 if (xprt->inet) 1580 return; 1581 1582 write_lock_bh(&sk->sk_callback_lock); 1583 sk->sk_user_data = xprt; 1584 xprt->old_data_ready = sk->sk_data_ready; 1585 xprt->old_state_change = sk->sk_state_change; 1586 xprt->old_write_space = sk->sk_write_space; 1587 if (xprt->prot == IPPROTO_UDP) { 1588 sk->sk_data_ready = udp_data_ready; 1589 sk->sk_no_check = UDP_CSUM_NORCV; 1590 xprt_set_connected(xprt); 1591 } else { 1592 tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ 1593 sk->sk_data_ready = tcp_data_ready; 1594 sk->sk_state_change = tcp_state_change; 1595 xprt_clear_connected(xprt); 1596 } 1597 sk->sk_write_space = xprt_write_space; 1598 1599 /* Reset to new socket */ 1600 xprt->sock = sock; 1601 xprt->inet = sk; 1602 write_unlock_bh(&sk->sk_callback_lock); 1603 1604 return; 1605 } 1606 1607 /* 1608 * Set socket buffer length 1609 */ 1610 void 1611 xprt_sock_setbufsize(struct rpc_xprt *xprt) 1612 { 1613 struct sock *sk = xprt->inet; 1614 1615 if (xprt->stream) 1616 return; 1617 if (xprt->rcvsize) { 1618 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1619 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; 1620 } 1621 if (xprt->sndsize) { 1622 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1623 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; 1624 sk->sk_write_space(sk); 1625 } 1626 } 1627 1628 /* 1629 * Datastream sockets are created here, but xprt_connect will create 1630 * and connect stream sockets. 1631 */ 1632 static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) 1633 { 1634 struct socket *sock; 1635 int type, err; 1636 1637 dprintk("RPC: xprt_create_socket(%s %d)\n", 1638 (proto == IPPROTO_UDP)? "udp" : "tcp", proto); 1639 1640 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; 1641 1642 if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { 1643 printk("RPC: can't create socket (%d).\n", -err); 1644 return NULL; 1645 } 1646 1647 /* If the caller has the capability, bind to a reserved port */ 1648 if (resvport && xprt_bindresvport(xprt, sock) < 0) { 1649 printk("RPC: can't bind to reserved port.\n"); 1650 goto failed; 1651 } 1652 1653 return sock; 1654 1655 failed: 1656 sock_release(sock); 1657 return NULL; 1658 } 1659 1660 /* 1661 * Create an RPC client transport given the protocol and peer address. 1662 */ 1663 struct rpc_xprt * 1664 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 1665 { 1666 struct rpc_xprt *xprt; 1667 1668 xprt = xprt_setup(proto, sap, to); 1669 if (IS_ERR(xprt)) 1670 dprintk("RPC: xprt_create_proto failed\n"); 1671 else 1672 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 1673 return xprt; 1674 } 1675 1676 /* 1677 * Prepare for transport shutdown. 1678 */ 1679 static void 1680 xprt_shutdown(struct rpc_xprt *xprt) 1681 { 1682 xprt->shutdown = 1; 1683 rpc_wake_up(&xprt->sending); 1684 rpc_wake_up(&xprt->resend); 1685 rpc_wake_up(&xprt->pending); 1686 rpc_wake_up(&xprt->backlog); 1687 wake_up(&xprt->cong_wait); 1688 del_timer_sync(&xprt->timer); 1689 1690 /* synchronously wait for connect worker to finish */ 1691 cancel_delayed_work(&xprt->sock_connect); 1692 flush_scheduled_work(); 1693 } 1694 1695 /* 1696 * Clear the xprt backlog queue 1697 */ 1698 static int 1699 xprt_clear_backlog(struct rpc_xprt *xprt) { 1700 rpc_wake_up_next(&xprt->backlog); 1701 wake_up(&xprt->cong_wait); 1702 return 1; 1703 } 1704 1705 /* 1706 * Destroy an RPC transport, killing off all requests. 1707 */ 1708 int 1709 xprt_destroy(struct rpc_xprt *xprt) 1710 { 1711 dprintk("RPC: destroying transport %p\n", xprt); 1712 xprt_shutdown(xprt); 1713 xprt_disconnect(xprt); 1714 xprt_close(xprt); 1715 kfree(xprt->slot); 1716 kfree(xprt); 1717 1718 return 0; 1719 } 1720