1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/smp_lock.h> 9 #include <linux/errno.h> 10 #include <linux/freezer.h> 11 #include <linux/kthread.h> 12 #include <net/sock.h> 13 #include <linux/sunrpc/stats.h> 14 #include <linux/sunrpc/svc_xprt.h> 15 #include <linux/sunrpc/svcsock.h> 16 17 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 18 19 #define SVC_MAX_WAKING 5 20 21 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 22 static int svc_deferred_recv(struct svc_rqst *rqstp); 23 static struct cache_deferred_req *svc_defer(struct cache_req *req); 24 static void svc_age_temp_xprts(unsigned long closure); 25 26 /* apparently the "standard" is that clients close 27 * idle connections after 5 minutes, servers after 28 * 6 minutes 29 * http://www.connectathon.org/talks96/nfstcp.pdf 30 */ 31 static int svc_conn_age_period = 6*60; 32 33 /* List of registered transport classes */ 34 static DEFINE_SPINLOCK(svc_xprt_class_lock); 35 static LIST_HEAD(svc_xprt_class_list); 36 37 /* SMP locking strategy: 38 * 39 * svc_pool->sp_lock protects most of the fields of that pool. 40 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 41 * when both need to be taken (rare), svc_serv->sv_lock is first. 42 * BKL protects svc_serv->sv_nrthread. 43 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 44 * and the ->sk_info_authunix cache. 45 * 46 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 47 * enqueued multiply. During normal transport processing this bit 48 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 49 * Providers should not manipulate this bit directly. 50 * 51 * Some flags can be set to certain values at any time 52 * providing that certain rules are followed: 53 * 54 * XPT_CONN, XPT_DATA: 55 * - Can be set or cleared at any time. 56 * - After a set, svc_xprt_enqueue must be called to enqueue 57 * the transport for processing. 58 * - After a clear, the transport must be read/accepted. 59 * If this succeeds, it must be set again. 60 * XPT_CLOSE: 61 * - Can set at any time. It is never cleared. 62 * XPT_DEAD: 63 * - Can only be set while XPT_BUSY is held which ensures 64 * that no other thread will be using the transport or will 65 * try to set XPT_DEAD. 66 */ 67 68 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 69 { 70 struct svc_xprt_class *cl; 71 int res = -EEXIST; 72 73 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 74 75 INIT_LIST_HEAD(&xcl->xcl_list); 76 spin_lock(&svc_xprt_class_lock); 77 /* Make sure there isn't already a class with the same name */ 78 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 79 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 80 goto out; 81 } 82 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 83 res = 0; 84 out: 85 spin_unlock(&svc_xprt_class_lock); 86 return res; 87 } 88 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 89 90 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 91 { 92 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 93 spin_lock(&svc_xprt_class_lock); 94 list_del_init(&xcl->xcl_list); 95 spin_unlock(&svc_xprt_class_lock); 96 } 97 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 98 99 /* 100 * Format the transport list for printing 101 */ 102 int svc_print_xprts(char *buf, int maxlen) 103 { 104 struct list_head *le; 105 char tmpstr[80]; 106 int len = 0; 107 buf[0] = '\0'; 108 109 spin_lock(&svc_xprt_class_lock); 110 list_for_each(le, &svc_xprt_class_list) { 111 int slen; 112 struct svc_xprt_class *xcl = 113 list_entry(le, struct svc_xprt_class, xcl_list); 114 115 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 116 slen = strlen(tmpstr); 117 if (len + slen > maxlen) 118 break; 119 len += slen; 120 strcat(buf, tmpstr); 121 } 122 spin_unlock(&svc_xprt_class_lock); 123 124 return len; 125 } 126 127 static void svc_xprt_free(struct kref *kref) 128 { 129 struct svc_xprt *xprt = 130 container_of(kref, struct svc_xprt, xpt_ref); 131 struct module *owner = xprt->xpt_class->xcl_owner; 132 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) 133 && xprt->xpt_auth_cache != NULL) 134 svcauth_unix_info_release(xprt->xpt_auth_cache); 135 xprt->xpt_ops->xpo_free(xprt); 136 module_put(owner); 137 } 138 139 void svc_xprt_put(struct svc_xprt *xprt) 140 { 141 kref_put(&xprt->xpt_ref, svc_xprt_free); 142 } 143 EXPORT_SYMBOL_GPL(svc_xprt_put); 144 145 /* 146 * Called by transport drivers to initialize the transport independent 147 * portion of the transport instance. 148 */ 149 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 150 struct svc_serv *serv) 151 { 152 memset(xprt, 0, sizeof(*xprt)); 153 xprt->xpt_class = xcl; 154 xprt->xpt_ops = xcl->xcl_ops; 155 kref_init(&xprt->xpt_ref); 156 xprt->xpt_server = serv; 157 INIT_LIST_HEAD(&xprt->xpt_list); 158 INIT_LIST_HEAD(&xprt->xpt_ready); 159 INIT_LIST_HEAD(&xprt->xpt_deferred); 160 mutex_init(&xprt->xpt_mutex); 161 spin_lock_init(&xprt->xpt_lock); 162 set_bit(XPT_BUSY, &xprt->xpt_flags); 163 } 164 EXPORT_SYMBOL_GPL(svc_xprt_init); 165 166 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 167 struct svc_serv *serv, 168 const int family, 169 const unsigned short port, 170 int flags) 171 { 172 struct sockaddr_in sin = { 173 .sin_family = AF_INET, 174 .sin_addr.s_addr = htonl(INADDR_ANY), 175 .sin_port = htons(port), 176 }; 177 struct sockaddr_in6 sin6 = { 178 .sin6_family = AF_INET6, 179 .sin6_addr = IN6ADDR_ANY_INIT, 180 .sin6_port = htons(port), 181 }; 182 struct sockaddr *sap; 183 size_t len; 184 185 switch (family) { 186 case PF_INET: 187 sap = (struct sockaddr *)&sin; 188 len = sizeof(sin); 189 break; 190 case PF_INET6: 191 sap = (struct sockaddr *)&sin6; 192 len = sizeof(sin6); 193 break; 194 default: 195 return ERR_PTR(-EAFNOSUPPORT); 196 } 197 198 return xcl->xcl_ops->xpo_create(serv, sap, len, flags); 199 } 200 201 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 202 const int family, const unsigned short port, 203 int flags) 204 { 205 struct svc_xprt_class *xcl; 206 207 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 208 spin_lock(&svc_xprt_class_lock); 209 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 210 struct svc_xprt *newxprt; 211 212 if (strcmp(xprt_name, xcl->xcl_name)) 213 continue; 214 215 if (!try_module_get(xcl->xcl_owner)) 216 goto err; 217 218 spin_unlock(&svc_xprt_class_lock); 219 newxprt = __svc_xpo_create(xcl, serv, family, port, flags); 220 if (IS_ERR(newxprt)) { 221 module_put(xcl->xcl_owner); 222 return PTR_ERR(newxprt); 223 } 224 225 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 226 spin_lock_bh(&serv->sv_lock); 227 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 228 spin_unlock_bh(&serv->sv_lock); 229 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 230 return svc_xprt_local_port(newxprt); 231 } 232 err: 233 spin_unlock(&svc_xprt_class_lock); 234 dprintk("svc: transport %s not found\n", xprt_name); 235 return -ENOENT; 236 } 237 EXPORT_SYMBOL_GPL(svc_create_xprt); 238 239 /* 240 * Copy the local and remote xprt addresses to the rqstp structure 241 */ 242 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 243 { 244 struct sockaddr *sin; 245 246 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 247 rqstp->rq_addrlen = xprt->xpt_remotelen; 248 249 /* 250 * Destination address in request is needed for binding the 251 * source address in RPC replies/callbacks later. 252 */ 253 sin = (struct sockaddr *)&xprt->xpt_local; 254 switch (sin->sa_family) { 255 case AF_INET: 256 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 257 break; 258 case AF_INET6: 259 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 260 break; 261 } 262 } 263 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 264 265 /** 266 * svc_print_addr - Format rq_addr field for printing 267 * @rqstp: svc_rqst struct containing address to print 268 * @buf: target buffer for formatted address 269 * @len: length of target buffer 270 * 271 */ 272 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 273 { 274 return __svc_print_addr(svc_addr(rqstp), buf, len); 275 } 276 EXPORT_SYMBOL_GPL(svc_print_addr); 277 278 /* 279 * Queue up an idle server thread. Must have pool->sp_lock held. 280 * Note: this is really a stack rather than a queue, so that we only 281 * use as many different threads as we need, and the rest don't pollute 282 * the cache. 283 */ 284 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 285 { 286 list_add(&rqstp->rq_list, &pool->sp_threads); 287 } 288 289 /* 290 * Dequeue an nfsd thread. Must have pool->sp_lock held. 291 */ 292 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 293 { 294 list_del(&rqstp->rq_list); 295 } 296 297 /* 298 * Queue up a transport with data pending. If there are idle nfsd 299 * processes, wake 'em up. 300 * 301 */ 302 void svc_xprt_enqueue(struct svc_xprt *xprt) 303 { 304 struct svc_serv *serv = xprt->xpt_server; 305 struct svc_pool *pool; 306 struct svc_rqst *rqstp; 307 int cpu; 308 int thread_avail; 309 310 if (!(xprt->xpt_flags & 311 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 312 return; 313 314 cpu = get_cpu(); 315 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 316 put_cpu(); 317 318 spin_lock_bh(&pool->sp_lock); 319 320 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 321 /* Don't enqueue dead transports */ 322 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 323 goto out_unlock; 324 } 325 326 pool->sp_stats.packets++; 327 328 /* Mark transport as busy. It will remain in this state until 329 * the provider calls svc_xprt_received. We update XPT_BUSY 330 * atomically because it also guards against trying to enqueue 331 * the transport twice. 332 */ 333 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 334 /* Don't enqueue transport while already enqueued */ 335 dprintk("svc: transport %p busy, not enqueued\n", xprt); 336 goto out_unlock; 337 } 338 BUG_ON(xprt->xpt_pool != NULL); 339 xprt->xpt_pool = pool; 340 341 /* Handle pending connection */ 342 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 343 goto process; 344 345 /* Handle close in-progress */ 346 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 347 goto process; 348 349 /* Check if we have space to reply to a request */ 350 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 351 /* Don't enqueue while not enough space for reply */ 352 dprintk("svc: no write space, transport %p not enqueued\n", 353 xprt); 354 xprt->xpt_pool = NULL; 355 clear_bit(XPT_BUSY, &xprt->xpt_flags); 356 goto out_unlock; 357 } 358 359 process: 360 /* Work out whether threads are available */ 361 thread_avail = !list_empty(&pool->sp_threads); /* threads are asleep */ 362 if (pool->sp_nwaking >= SVC_MAX_WAKING) { 363 /* too many threads are runnable and trying to wake up */ 364 thread_avail = 0; 365 pool->sp_stats.overloads_avoided++; 366 } 367 368 if (thread_avail) { 369 rqstp = list_entry(pool->sp_threads.next, 370 struct svc_rqst, 371 rq_list); 372 dprintk("svc: transport %p served by daemon %p\n", 373 xprt, rqstp); 374 svc_thread_dequeue(pool, rqstp); 375 if (rqstp->rq_xprt) 376 printk(KERN_ERR 377 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 378 rqstp, rqstp->rq_xprt); 379 rqstp->rq_xprt = xprt; 380 svc_xprt_get(xprt); 381 rqstp->rq_reserved = serv->sv_max_mesg; 382 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 383 rqstp->rq_waking = 1; 384 pool->sp_nwaking++; 385 pool->sp_stats.threads_woken++; 386 BUG_ON(xprt->xpt_pool != pool); 387 wake_up(&rqstp->rq_wait); 388 } else { 389 dprintk("svc: transport %p put into queue\n", xprt); 390 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 391 pool->sp_stats.sockets_queued++; 392 BUG_ON(xprt->xpt_pool != pool); 393 } 394 395 out_unlock: 396 spin_unlock_bh(&pool->sp_lock); 397 } 398 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 399 400 /* 401 * Dequeue the first transport. Must be called with the pool->sp_lock held. 402 */ 403 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 404 { 405 struct svc_xprt *xprt; 406 407 if (list_empty(&pool->sp_sockets)) 408 return NULL; 409 410 xprt = list_entry(pool->sp_sockets.next, 411 struct svc_xprt, xpt_ready); 412 list_del_init(&xprt->xpt_ready); 413 414 dprintk("svc: transport %p dequeued, inuse=%d\n", 415 xprt, atomic_read(&xprt->xpt_ref.refcount)); 416 417 return xprt; 418 } 419 420 /* 421 * svc_xprt_received conditionally queues the transport for processing 422 * by another thread. The caller must hold the XPT_BUSY bit and must 423 * not thereafter touch transport data. 424 * 425 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 426 * insufficient) data. 427 */ 428 void svc_xprt_received(struct svc_xprt *xprt) 429 { 430 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 431 xprt->xpt_pool = NULL; 432 clear_bit(XPT_BUSY, &xprt->xpt_flags); 433 svc_xprt_enqueue(xprt); 434 } 435 EXPORT_SYMBOL_GPL(svc_xprt_received); 436 437 /** 438 * svc_reserve - change the space reserved for the reply to a request. 439 * @rqstp: The request in question 440 * @space: new max space to reserve 441 * 442 * Each request reserves some space on the output queue of the transport 443 * to make sure the reply fits. This function reduces that reserved 444 * space to be the amount of space used already, plus @space. 445 * 446 */ 447 void svc_reserve(struct svc_rqst *rqstp, int space) 448 { 449 space += rqstp->rq_res.head[0].iov_len; 450 451 if (space < rqstp->rq_reserved) { 452 struct svc_xprt *xprt = rqstp->rq_xprt; 453 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 454 rqstp->rq_reserved = space; 455 456 svc_xprt_enqueue(xprt); 457 } 458 } 459 EXPORT_SYMBOL_GPL(svc_reserve); 460 461 static void svc_xprt_release(struct svc_rqst *rqstp) 462 { 463 struct svc_xprt *xprt = rqstp->rq_xprt; 464 465 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 466 467 kfree(rqstp->rq_deferred); 468 rqstp->rq_deferred = NULL; 469 470 svc_free_res_pages(rqstp); 471 rqstp->rq_res.page_len = 0; 472 rqstp->rq_res.page_base = 0; 473 474 /* Reset response buffer and release 475 * the reservation. 476 * But first, check that enough space was reserved 477 * for the reply, otherwise we have a bug! 478 */ 479 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 480 printk(KERN_ERR "RPC request reserved %d but used %d\n", 481 rqstp->rq_reserved, 482 rqstp->rq_res.len); 483 484 rqstp->rq_res.head[0].iov_len = 0; 485 svc_reserve(rqstp, 0); 486 rqstp->rq_xprt = NULL; 487 488 svc_xprt_put(xprt); 489 } 490 491 /* 492 * External function to wake up a server waiting for data 493 * This really only makes sense for services like lockd 494 * which have exactly one thread anyway. 495 */ 496 void svc_wake_up(struct svc_serv *serv) 497 { 498 struct svc_rqst *rqstp; 499 unsigned int i; 500 struct svc_pool *pool; 501 502 for (i = 0; i < serv->sv_nrpools; i++) { 503 pool = &serv->sv_pools[i]; 504 505 spin_lock_bh(&pool->sp_lock); 506 if (!list_empty(&pool->sp_threads)) { 507 rqstp = list_entry(pool->sp_threads.next, 508 struct svc_rqst, 509 rq_list); 510 dprintk("svc: daemon %p woken up.\n", rqstp); 511 /* 512 svc_thread_dequeue(pool, rqstp); 513 rqstp->rq_xprt = NULL; 514 */ 515 wake_up(&rqstp->rq_wait); 516 } 517 spin_unlock_bh(&pool->sp_lock); 518 } 519 } 520 EXPORT_SYMBOL_GPL(svc_wake_up); 521 522 int svc_port_is_privileged(struct sockaddr *sin) 523 { 524 switch (sin->sa_family) { 525 case AF_INET: 526 return ntohs(((struct sockaddr_in *)sin)->sin_port) 527 < PROT_SOCK; 528 case AF_INET6: 529 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 530 < PROT_SOCK; 531 default: 532 return 0; 533 } 534 } 535 536 /* 537 * Make sure that we don't have too many active connections. If we have, 538 * something must be dropped. It's not clear what will happen if we allow 539 * "too many" connections, but when dealing with network-facing software, 540 * we have to code defensively. Here we do that by imposing hard limits. 541 * 542 * There's no point in trying to do random drop here for DoS 543 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 544 * attacker can easily beat that. 545 * 546 * The only somewhat efficient mechanism would be if drop old 547 * connections from the same IP first. But right now we don't even 548 * record the client IP in svc_sock. 549 * 550 * single-threaded services that expect a lot of clients will probably 551 * need to set sv_maxconn to override the default value which is based 552 * on the number of threads 553 */ 554 static void svc_check_conn_limits(struct svc_serv *serv) 555 { 556 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 557 (serv->sv_nrthreads+3) * 20; 558 559 if (serv->sv_tmpcnt > limit) { 560 struct svc_xprt *xprt = NULL; 561 spin_lock_bh(&serv->sv_lock); 562 if (!list_empty(&serv->sv_tempsocks)) { 563 if (net_ratelimit()) { 564 /* Try to help the admin */ 565 printk(KERN_NOTICE "%s: too many open " 566 "connections, consider increasing %s\n", 567 serv->sv_name, serv->sv_maxconn ? 568 "the max number of connections." : 569 "the number of threads."); 570 } 571 /* 572 * Always select the oldest connection. It's not fair, 573 * but so is life 574 */ 575 xprt = list_entry(serv->sv_tempsocks.prev, 576 struct svc_xprt, 577 xpt_list); 578 set_bit(XPT_CLOSE, &xprt->xpt_flags); 579 svc_xprt_get(xprt); 580 } 581 spin_unlock_bh(&serv->sv_lock); 582 583 if (xprt) { 584 svc_xprt_enqueue(xprt); 585 svc_xprt_put(xprt); 586 } 587 } 588 } 589 590 /* 591 * Receive the next request on any transport. This code is carefully 592 * organised not to touch any cachelines in the shared svc_serv 593 * structure, only cachelines in the local svc_pool. 594 */ 595 int svc_recv(struct svc_rqst *rqstp, long timeout) 596 { 597 struct svc_xprt *xprt = NULL; 598 struct svc_serv *serv = rqstp->rq_server; 599 struct svc_pool *pool = rqstp->rq_pool; 600 int len, i; 601 int pages; 602 struct xdr_buf *arg; 603 DECLARE_WAITQUEUE(wait, current); 604 long time_left; 605 606 dprintk("svc: server %p waiting for data (to = %ld)\n", 607 rqstp, timeout); 608 609 if (rqstp->rq_xprt) 610 printk(KERN_ERR 611 "svc_recv: service %p, transport not NULL!\n", 612 rqstp); 613 if (waitqueue_active(&rqstp->rq_wait)) 614 printk(KERN_ERR 615 "svc_recv: service %p, wait queue active!\n", 616 rqstp); 617 618 /* now allocate needed pages. If we get a failure, sleep briefly */ 619 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 620 for (i = 0; i < pages ; i++) 621 while (rqstp->rq_pages[i] == NULL) { 622 struct page *p = alloc_page(GFP_KERNEL); 623 if (!p) { 624 set_current_state(TASK_INTERRUPTIBLE); 625 if (signalled() || kthread_should_stop()) { 626 set_current_state(TASK_RUNNING); 627 return -EINTR; 628 } 629 schedule_timeout(msecs_to_jiffies(500)); 630 } 631 rqstp->rq_pages[i] = p; 632 } 633 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 634 BUG_ON(pages >= RPCSVC_MAXPAGES); 635 636 /* Make arg->head point to first page and arg->pages point to rest */ 637 arg = &rqstp->rq_arg; 638 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 639 arg->head[0].iov_len = PAGE_SIZE; 640 arg->pages = rqstp->rq_pages + 1; 641 arg->page_base = 0; 642 /* save at least one page for response */ 643 arg->page_len = (pages-2)*PAGE_SIZE; 644 arg->len = (pages-1)*PAGE_SIZE; 645 arg->tail[0].iov_len = 0; 646 647 try_to_freeze(); 648 cond_resched(); 649 if (signalled() || kthread_should_stop()) 650 return -EINTR; 651 652 spin_lock_bh(&pool->sp_lock); 653 if (rqstp->rq_waking) { 654 rqstp->rq_waking = 0; 655 pool->sp_nwaking--; 656 BUG_ON(pool->sp_nwaking < 0); 657 } 658 xprt = svc_xprt_dequeue(pool); 659 if (xprt) { 660 rqstp->rq_xprt = xprt; 661 svc_xprt_get(xprt); 662 rqstp->rq_reserved = serv->sv_max_mesg; 663 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 664 } else { 665 /* No data pending. Go to sleep */ 666 svc_thread_enqueue(pool, rqstp); 667 668 /* 669 * We have to be able to interrupt this wait 670 * to bring down the daemons ... 671 */ 672 set_current_state(TASK_INTERRUPTIBLE); 673 674 /* 675 * checking kthread_should_stop() here allows us to avoid 676 * locking and signalling when stopping kthreads that call 677 * svc_recv. If the thread has already been woken up, then 678 * we can exit here without sleeping. If not, then it 679 * it'll be woken up quickly during the schedule_timeout 680 */ 681 if (kthread_should_stop()) { 682 set_current_state(TASK_RUNNING); 683 spin_unlock_bh(&pool->sp_lock); 684 return -EINTR; 685 } 686 687 add_wait_queue(&rqstp->rq_wait, &wait); 688 spin_unlock_bh(&pool->sp_lock); 689 690 time_left = schedule_timeout(timeout); 691 692 try_to_freeze(); 693 694 spin_lock_bh(&pool->sp_lock); 695 remove_wait_queue(&rqstp->rq_wait, &wait); 696 if (!time_left) 697 pool->sp_stats.threads_timedout++; 698 699 xprt = rqstp->rq_xprt; 700 if (!xprt) { 701 svc_thread_dequeue(pool, rqstp); 702 spin_unlock_bh(&pool->sp_lock); 703 dprintk("svc: server %p, no data yet\n", rqstp); 704 if (signalled() || kthread_should_stop()) 705 return -EINTR; 706 else 707 return -EAGAIN; 708 } 709 } 710 spin_unlock_bh(&pool->sp_lock); 711 712 len = 0; 713 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 714 dprintk("svc_recv: found XPT_CLOSE\n"); 715 svc_delete_xprt(xprt); 716 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 717 struct svc_xprt *newxpt; 718 newxpt = xprt->xpt_ops->xpo_accept(xprt); 719 if (newxpt) { 720 /* 721 * We know this module_get will succeed because the 722 * listener holds a reference too 723 */ 724 __module_get(newxpt->xpt_class->xcl_owner); 725 svc_check_conn_limits(xprt->xpt_server); 726 spin_lock_bh(&serv->sv_lock); 727 set_bit(XPT_TEMP, &newxpt->xpt_flags); 728 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 729 serv->sv_tmpcnt++; 730 if (serv->sv_temptimer.function == NULL) { 731 /* setup timer to age temp transports */ 732 setup_timer(&serv->sv_temptimer, 733 svc_age_temp_xprts, 734 (unsigned long)serv); 735 mod_timer(&serv->sv_temptimer, 736 jiffies + svc_conn_age_period * HZ); 737 } 738 spin_unlock_bh(&serv->sv_lock); 739 svc_xprt_received(newxpt); 740 } 741 svc_xprt_received(xprt); 742 } else { 743 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 744 rqstp, pool->sp_id, xprt, 745 atomic_read(&xprt->xpt_ref.refcount)); 746 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 747 if (rqstp->rq_deferred) { 748 svc_xprt_received(xprt); 749 len = svc_deferred_recv(rqstp); 750 } else 751 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 752 dprintk("svc: got len=%d\n", len); 753 } 754 755 /* No data, incomplete (TCP) read, or accept() */ 756 if (len == 0 || len == -EAGAIN) { 757 rqstp->rq_res.len = 0; 758 svc_xprt_release(rqstp); 759 return -EAGAIN; 760 } 761 clear_bit(XPT_OLD, &xprt->xpt_flags); 762 763 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 764 rqstp->rq_chandle.defer = svc_defer; 765 766 if (serv->sv_stats) 767 serv->sv_stats->netcnt++; 768 return len; 769 } 770 EXPORT_SYMBOL_GPL(svc_recv); 771 772 /* 773 * Drop request 774 */ 775 void svc_drop(struct svc_rqst *rqstp) 776 { 777 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 778 svc_xprt_release(rqstp); 779 } 780 EXPORT_SYMBOL_GPL(svc_drop); 781 782 /* 783 * Return reply to client. 784 */ 785 int svc_send(struct svc_rqst *rqstp) 786 { 787 struct svc_xprt *xprt; 788 int len; 789 struct xdr_buf *xb; 790 791 xprt = rqstp->rq_xprt; 792 if (!xprt) 793 return -EFAULT; 794 795 /* release the receive skb before sending the reply */ 796 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 797 798 /* calculate over-all length */ 799 xb = &rqstp->rq_res; 800 xb->len = xb->head[0].iov_len + 801 xb->page_len + 802 xb->tail[0].iov_len; 803 804 /* Grab mutex to serialize outgoing data. */ 805 mutex_lock(&xprt->xpt_mutex); 806 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 807 len = -ENOTCONN; 808 else 809 len = xprt->xpt_ops->xpo_sendto(rqstp); 810 mutex_unlock(&xprt->xpt_mutex); 811 svc_xprt_release(rqstp); 812 813 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 814 return 0; 815 return len; 816 } 817 818 /* 819 * Timer function to close old temporary transports, using 820 * a mark-and-sweep algorithm. 821 */ 822 static void svc_age_temp_xprts(unsigned long closure) 823 { 824 struct svc_serv *serv = (struct svc_serv *)closure; 825 struct svc_xprt *xprt; 826 struct list_head *le, *next; 827 LIST_HEAD(to_be_aged); 828 829 dprintk("svc_age_temp_xprts\n"); 830 831 if (!spin_trylock_bh(&serv->sv_lock)) { 832 /* busy, try again 1 sec later */ 833 dprintk("svc_age_temp_xprts: busy\n"); 834 mod_timer(&serv->sv_temptimer, jiffies + HZ); 835 return; 836 } 837 838 list_for_each_safe(le, next, &serv->sv_tempsocks) { 839 xprt = list_entry(le, struct svc_xprt, xpt_list); 840 841 /* First time through, just mark it OLD. Second time 842 * through, close it. */ 843 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 844 continue; 845 if (atomic_read(&xprt->xpt_ref.refcount) > 1 846 || test_bit(XPT_BUSY, &xprt->xpt_flags)) 847 continue; 848 svc_xprt_get(xprt); 849 list_move(le, &to_be_aged); 850 set_bit(XPT_CLOSE, &xprt->xpt_flags); 851 set_bit(XPT_DETACHED, &xprt->xpt_flags); 852 } 853 spin_unlock_bh(&serv->sv_lock); 854 855 while (!list_empty(&to_be_aged)) { 856 le = to_be_aged.next; 857 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 858 list_del_init(le); 859 xprt = list_entry(le, struct svc_xprt, xpt_list); 860 861 dprintk("queuing xprt %p for closing\n", xprt); 862 863 /* a thread will dequeue and close it soon */ 864 svc_xprt_enqueue(xprt); 865 svc_xprt_put(xprt); 866 } 867 868 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 869 } 870 871 /* 872 * Remove a dead transport 873 */ 874 void svc_delete_xprt(struct svc_xprt *xprt) 875 { 876 struct svc_serv *serv = xprt->xpt_server; 877 struct svc_deferred_req *dr; 878 879 /* Only do this once */ 880 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 881 return; 882 883 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 884 xprt->xpt_ops->xpo_detach(xprt); 885 886 spin_lock_bh(&serv->sv_lock); 887 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 888 list_del_init(&xprt->xpt_list); 889 /* 890 * We used to delete the transport from whichever list 891 * it's sk_xprt.xpt_ready node was on, but we don't actually 892 * need to. This is because the only time we're called 893 * while still attached to a queue, the queue itself 894 * is about to be destroyed (in svc_destroy). 895 */ 896 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 897 serv->sv_tmpcnt--; 898 899 for (dr = svc_deferred_dequeue(xprt); dr; 900 dr = svc_deferred_dequeue(xprt)) { 901 svc_xprt_put(xprt); 902 kfree(dr); 903 } 904 905 svc_xprt_put(xprt); 906 spin_unlock_bh(&serv->sv_lock); 907 } 908 909 void svc_close_xprt(struct svc_xprt *xprt) 910 { 911 set_bit(XPT_CLOSE, &xprt->xpt_flags); 912 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 913 /* someone else will have to effect the close */ 914 return; 915 916 svc_xprt_get(xprt); 917 svc_delete_xprt(xprt); 918 clear_bit(XPT_BUSY, &xprt->xpt_flags); 919 svc_xprt_put(xprt); 920 } 921 EXPORT_SYMBOL_GPL(svc_close_xprt); 922 923 void svc_close_all(struct list_head *xprt_list) 924 { 925 struct svc_xprt *xprt; 926 struct svc_xprt *tmp; 927 928 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 929 set_bit(XPT_CLOSE, &xprt->xpt_flags); 930 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 931 /* Waiting to be processed, but no threads left, 932 * So just remove it from the waiting list 933 */ 934 list_del_init(&xprt->xpt_ready); 935 clear_bit(XPT_BUSY, &xprt->xpt_flags); 936 } 937 svc_close_xprt(xprt); 938 } 939 } 940 941 /* 942 * Handle defer and revisit of requests 943 */ 944 945 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 946 { 947 struct svc_deferred_req *dr = 948 container_of(dreq, struct svc_deferred_req, handle); 949 struct svc_xprt *xprt = dr->xprt; 950 951 spin_lock(&xprt->xpt_lock); 952 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 953 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 954 spin_unlock(&xprt->xpt_lock); 955 dprintk("revisit canceled\n"); 956 svc_xprt_put(xprt); 957 kfree(dr); 958 return; 959 } 960 dprintk("revisit queued\n"); 961 dr->xprt = NULL; 962 list_add(&dr->handle.recent, &xprt->xpt_deferred); 963 spin_unlock(&xprt->xpt_lock); 964 svc_xprt_enqueue(xprt); 965 svc_xprt_put(xprt); 966 } 967 968 /* 969 * Save the request off for later processing. The request buffer looks 970 * like this: 971 * 972 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 973 * 974 * This code can only handle requests that consist of an xprt-header 975 * and rpc-header. 976 */ 977 static struct cache_deferred_req *svc_defer(struct cache_req *req) 978 { 979 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 980 struct svc_deferred_req *dr; 981 982 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 983 return NULL; /* if more than a page, give up FIXME */ 984 if (rqstp->rq_deferred) { 985 dr = rqstp->rq_deferred; 986 rqstp->rq_deferred = NULL; 987 } else { 988 size_t skip; 989 size_t size; 990 /* FIXME maybe discard if size too large */ 991 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 992 dr = kmalloc(size, GFP_KERNEL); 993 if (dr == NULL) 994 return NULL; 995 996 dr->handle.owner = rqstp->rq_server; 997 dr->prot = rqstp->rq_prot; 998 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 999 dr->addrlen = rqstp->rq_addrlen; 1000 dr->daddr = rqstp->rq_daddr; 1001 dr->argslen = rqstp->rq_arg.len >> 2; 1002 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1003 1004 /* back up head to the start of the buffer and copy */ 1005 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1006 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1007 dr->argslen << 2); 1008 } 1009 svc_xprt_get(rqstp->rq_xprt); 1010 dr->xprt = rqstp->rq_xprt; 1011 1012 dr->handle.revisit = svc_revisit; 1013 return &dr->handle; 1014 } 1015 1016 /* 1017 * recv data from a deferred request into an active one 1018 */ 1019 static int svc_deferred_recv(struct svc_rqst *rqstp) 1020 { 1021 struct svc_deferred_req *dr = rqstp->rq_deferred; 1022 1023 /* setup iov_base past transport header */ 1024 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1025 /* The iov_len does not include the transport header bytes */ 1026 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1027 rqstp->rq_arg.page_len = 0; 1028 /* The rq_arg.len includes the transport header bytes */ 1029 rqstp->rq_arg.len = dr->argslen<<2; 1030 rqstp->rq_prot = dr->prot; 1031 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1032 rqstp->rq_addrlen = dr->addrlen; 1033 /* Save off transport header len in case we get deferred again */ 1034 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1035 rqstp->rq_daddr = dr->daddr; 1036 rqstp->rq_respages = rqstp->rq_pages; 1037 return (dr->argslen<<2) - dr->xprt_hlen; 1038 } 1039 1040 1041 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1042 { 1043 struct svc_deferred_req *dr = NULL; 1044 1045 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1046 return NULL; 1047 spin_lock(&xprt->xpt_lock); 1048 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1049 if (!list_empty(&xprt->xpt_deferred)) { 1050 dr = list_entry(xprt->xpt_deferred.next, 1051 struct svc_deferred_req, 1052 handle.recent); 1053 list_del_init(&dr->handle.recent); 1054 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1055 } 1056 spin_unlock(&xprt->xpt_lock); 1057 return dr; 1058 } 1059 1060 /** 1061 * svc_find_xprt - find an RPC transport instance 1062 * @serv: pointer to svc_serv to search 1063 * @xcl_name: C string containing transport's class name 1064 * @af: Address family of transport's local address 1065 * @port: transport's IP port number 1066 * 1067 * Return the transport instance pointer for the endpoint accepting 1068 * connections/peer traffic from the specified transport class, 1069 * address family and port. 1070 * 1071 * Specifying 0 for the address family or port is effectively a 1072 * wild-card, and will result in matching the first transport in the 1073 * service's list that has a matching class name. 1074 */ 1075 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1076 const sa_family_t af, const unsigned short port) 1077 { 1078 struct svc_xprt *xprt; 1079 struct svc_xprt *found = NULL; 1080 1081 /* Sanity check the args */ 1082 if (serv == NULL || xcl_name == NULL) 1083 return found; 1084 1085 spin_lock_bh(&serv->sv_lock); 1086 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1087 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1088 continue; 1089 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1090 continue; 1091 if (port != 0 && port != svc_xprt_local_port(xprt)) 1092 continue; 1093 found = xprt; 1094 svc_xprt_get(xprt); 1095 break; 1096 } 1097 spin_unlock_bh(&serv->sv_lock); 1098 return found; 1099 } 1100 EXPORT_SYMBOL_GPL(svc_find_xprt); 1101 1102 static int svc_one_xprt_name(const struct svc_xprt *xprt, 1103 char *pos, int remaining) 1104 { 1105 int len; 1106 1107 len = snprintf(pos, remaining, "%s %u\n", 1108 xprt->xpt_class->xcl_name, 1109 svc_xprt_local_port(xprt)); 1110 if (len >= remaining) 1111 return -ENAMETOOLONG; 1112 return len; 1113 } 1114 1115 /** 1116 * svc_xprt_names - format a buffer with a list of transport names 1117 * @serv: pointer to an RPC service 1118 * @buf: pointer to a buffer to be filled in 1119 * @buflen: length of buffer to be filled in 1120 * 1121 * Fills in @buf with a string containing a list of transport names, 1122 * each name terminated with '\n'. 1123 * 1124 * Returns positive length of the filled-in string on success; otherwise 1125 * a negative errno value is returned if an error occurs. 1126 */ 1127 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1128 { 1129 struct svc_xprt *xprt; 1130 int len, totlen; 1131 char *pos; 1132 1133 /* Sanity check args */ 1134 if (!serv) 1135 return 0; 1136 1137 spin_lock_bh(&serv->sv_lock); 1138 1139 pos = buf; 1140 totlen = 0; 1141 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1142 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1143 if (len < 0) { 1144 *buf = '\0'; 1145 totlen = len; 1146 } 1147 if (len <= 0) 1148 break; 1149 1150 pos += len; 1151 totlen += len; 1152 } 1153 1154 spin_unlock_bh(&serv->sv_lock); 1155 return totlen; 1156 } 1157 EXPORT_SYMBOL_GPL(svc_xprt_names); 1158 1159 1160 /*----------------------------------------------------------------------------*/ 1161 1162 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1163 { 1164 unsigned int pidx = (unsigned int)*pos; 1165 struct svc_serv *serv = m->private; 1166 1167 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1168 1169 lock_kernel(); 1170 /* bump up the pseudo refcount while traversing */ 1171 svc_get(serv); 1172 unlock_kernel(); 1173 1174 if (!pidx) 1175 return SEQ_START_TOKEN; 1176 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1177 } 1178 1179 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1180 { 1181 struct svc_pool *pool = p; 1182 struct svc_serv *serv = m->private; 1183 1184 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1185 1186 if (p == SEQ_START_TOKEN) { 1187 pool = &serv->sv_pools[0]; 1188 } else { 1189 unsigned int pidx = (pool - &serv->sv_pools[0]); 1190 if (pidx < serv->sv_nrpools-1) 1191 pool = &serv->sv_pools[pidx+1]; 1192 else 1193 pool = NULL; 1194 } 1195 ++*pos; 1196 return pool; 1197 } 1198 1199 static void svc_pool_stats_stop(struct seq_file *m, void *p) 1200 { 1201 struct svc_serv *serv = m->private; 1202 1203 lock_kernel(); 1204 /* this function really, really should have been called svc_put() */ 1205 svc_destroy(serv); 1206 unlock_kernel(); 1207 } 1208 1209 static int svc_pool_stats_show(struct seq_file *m, void *p) 1210 { 1211 struct svc_pool *pool = p; 1212 1213 if (p == SEQ_START_TOKEN) { 1214 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken overloads-avoided threads-timedout\n"); 1215 return 0; 1216 } 1217 1218 seq_printf(m, "%u %lu %lu %lu %lu %lu\n", 1219 pool->sp_id, 1220 pool->sp_stats.packets, 1221 pool->sp_stats.sockets_queued, 1222 pool->sp_stats.threads_woken, 1223 pool->sp_stats.overloads_avoided, 1224 pool->sp_stats.threads_timedout); 1225 1226 return 0; 1227 } 1228 1229 static const struct seq_operations svc_pool_stats_seq_ops = { 1230 .start = svc_pool_stats_start, 1231 .next = svc_pool_stats_next, 1232 .stop = svc_pool_stats_stop, 1233 .show = svc_pool_stats_show, 1234 }; 1235 1236 int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1237 { 1238 int err; 1239 1240 err = seq_open(file, &svc_pool_stats_seq_ops); 1241 if (!err) 1242 ((struct seq_file *) file->private_data)->private = serv; 1243 return err; 1244 } 1245 EXPORT_SYMBOL(svc_pool_stats_open); 1246 1247 /*----------------------------------------------------------------------------*/ 1248