1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 32 #if defined(LIBC_SCCS) && !defined(lint) 33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 34 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 35 #endif 36 #include <sys/cdefs.h> 37 __FBSDID("$FreeBSD$"); 38 39 /* 40 * svc.c, Server-side remote procedure call interface. 41 * 42 * There are two sets of procedures here. The xprt routines are 43 * for handling transport handles. The svc routines handle the 44 * list of service routines. 45 * 46 * Copyright (C) 1984, Sun Microsystems, Inc. 47 */ 48 49 #include <sys/param.h> 50 #include <sys/lock.h> 51 #include <sys/kernel.h> 52 #include <sys/kthread.h> 53 #include <sys/malloc.h> 54 #include <sys/mbuf.h> 55 #include <sys/mutex.h> 56 #include <sys/proc.h> 57 #include <sys/queue.h> 58 #include <sys/socketvar.h> 59 #include <sys/systm.h> 60 #include <sys/ucred.h> 61 62 #include <rpc/rpc.h> 63 #include <rpc/rpcb_clnt.h> 64 #include <rpc/replay.h> 65 66 #include <rpc/rpc_com.h> 67 68 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 70 71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 72 char *); 73 static void svc_new_thread(SVCPOOL *pool); 74 static void xprt_unregister_locked(SVCXPRT *xprt); 75 76 /* *************** SVCXPRT related stuff **************** */ 77 78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 80 81 SVCPOOL* 82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 83 { 84 SVCPOOL *pool; 85 86 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 87 88 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 89 pool->sp_name = name; 90 pool->sp_state = SVCPOOL_INIT; 91 pool->sp_proc = NULL; 92 TAILQ_INIT(&pool->sp_xlist); 93 TAILQ_INIT(&pool->sp_active); 94 TAILQ_INIT(&pool->sp_callouts); 95 LIST_INIT(&pool->sp_threads); 96 LIST_INIT(&pool->sp_idlethreads); 97 pool->sp_minthreads = 1; 98 pool->sp_maxthreads = 1; 99 pool->sp_threadcount = 0; 100 101 /* 102 * Don't use more than a quarter of mbuf clusters or more than 103 * 45Mb buffering requests. 104 */ 105 pool->sp_space_high = nmbclusters * MCLBYTES / 4; 106 if (pool->sp_space_high > 45 << 20) 107 pool->sp_space_high = 45 << 20; 108 pool->sp_space_low = 2 * pool->sp_space_high / 3; 109 110 sysctl_ctx_init(&pool->sp_sysctl); 111 if (sysctl_base) { 112 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 113 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 114 pool, 0, svcpool_minthread_sysctl, "I", ""); 115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 116 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 117 pool, 0, svcpool_maxthread_sysctl, "I", ""); 118 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 119 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); 120 121 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 122 "request_space_used", CTLFLAG_RD, 123 &pool->sp_space_used, 0, 124 "Space in parsed but not handled requests."); 125 126 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 127 "request_space_used_highest", CTLFLAG_RD, 128 &pool->sp_space_used_highest, 0, 129 "Highest space used since reboot."); 130 131 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 132 "request_space_high", CTLFLAG_RW, 133 &pool->sp_space_high, 0, 134 "Maximum space in parsed but not handled requests."); 135 136 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 137 "request_space_low", CTLFLAG_RW, 138 &pool->sp_space_low, 0, 139 "Low water mark for request space."); 140 141 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 142 "request_space_throttled", CTLFLAG_RD, 143 &pool->sp_space_throttled, 0, 144 "Whether nfs requests are currently throttled"); 145 146 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_throttle_count", CTLFLAG_RD, 148 &pool->sp_space_throttle_count, 0, 149 "Count of times throttling based on request space has occurred"); 150 } 151 152 return pool; 153 } 154 155 void 156 svcpool_destroy(SVCPOOL *pool) 157 { 158 SVCXPRT *xprt, *nxprt; 159 struct svc_callout *s; 160 struct svcxprt_list cleanup; 161 162 TAILQ_INIT(&cleanup); 163 mtx_lock(&pool->sp_lock); 164 165 while (TAILQ_FIRST(&pool->sp_xlist)) { 166 xprt = TAILQ_FIRST(&pool->sp_xlist); 167 xprt_unregister_locked(xprt); 168 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 169 } 170 171 while (TAILQ_FIRST(&pool->sp_callouts)) { 172 s = TAILQ_FIRST(&pool->sp_callouts); 173 mtx_unlock(&pool->sp_lock); 174 svc_unreg(pool, s->sc_prog, s->sc_vers); 175 mtx_lock(&pool->sp_lock); 176 } 177 178 mtx_destroy(&pool->sp_lock); 179 180 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 181 SVC_RELEASE(xprt); 182 } 183 184 if (pool->sp_rcache) 185 replay_freecache(pool->sp_rcache); 186 187 sysctl_ctx_free(&pool->sp_sysctl); 188 free(pool, M_RPC); 189 } 190 191 static bool_t 192 svcpool_active(SVCPOOL *pool) 193 { 194 enum svcpool_state state = pool->sp_state; 195 196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) 197 return (FALSE); 198 return (TRUE); 199 } 200 201 /* 202 * Sysctl handler to set the minimum thread count on a pool 203 */ 204 static int 205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 206 { 207 SVCPOOL *pool; 208 int newminthreads, error, n; 209 210 pool = oidp->oid_arg1; 211 newminthreads = pool->sp_minthreads; 212 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 213 if (error == 0 && newminthreads != pool->sp_minthreads) { 214 if (newminthreads > pool->sp_maxthreads) 215 return (EINVAL); 216 mtx_lock(&pool->sp_lock); 217 if (newminthreads > pool->sp_minthreads 218 && svcpool_active(pool)) { 219 /* 220 * If the pool is running and we are 221 * increasing, create some more threads now. 222 */ 223 n = newminthreads - pool->sp_threadcount; 224 if (n > 0) { 225 mtx_unlock(&pool->sp_lock); 226 while (n--) 227 svc_new_thread(pool); 228 mtx_lock(&pool->sp_lock); 229 } 230 } 231 pool->sp_minthreads = newminthreads; 232 mtx_unlock(&pool->sp_lock); 233 } 234 return (error); 235 } 236 237 /* 238 * Sysctl handler to set the maximum thread count on a pool 239 */ 240 static int 241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 242 { 243 SVCPOOL *pool; 244 SVCTHREAD *st; 245 int newmaxthreads, error; 246 247 pool = oidp->oid_arg1; 248 newmaxthreads = pool->sp_maxthreads; 249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 251 if (newmaxthreads < pool->sp_minthreads) 252 return (EINVAL); 253 mtx_lock(&pool->sp_lock); 254 if (newmaxthreads < pool->sp_maxthreads 255 && svcpool_active(pool)) { 256 /* 257 * If the pool is running and we are 258 * decreasing, wake up some idle threads to 259 * encourage them to exit. 260 */ 261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 262 cv_signal(&st->st_cond); 263 } 264 pool->sp_maxthreads = newmaxthreads; 265 mtx_unlock(&pool->sp_lock); 266 } 267 return (error); 268 } 269 270 /* 271 * Activate a transport handle. 272 */ 273 void 274 xprt_register(SVCXPRT *xprt) 275 { 276 SVCPOOL *pool = xprt->xp_pool; 277 278 mtx_lock(&pool->sp_lock); 279 xprt->xp_registered = TRUE; 280 xprt->xp_active = FALSE; 281 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); 282 mtx_unlock(&pool->sp_lock); 283 } 284 285 /* 286 * De-activate a transport handle. Note: the locked version doesn't 287 * release the transport - caller must do that after dropping the pool 288 * lock. 289 */ 290 static void 291 xprt_unregister_locked(SVCXPRT *xprt) 292 { 293 SVCPOOL *pool = xprt->xp_pool; 294 295 if (xprt->xp_active) { 296 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 297 xprt->xp_active = FALSE; 298 } 299 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); 300 xprt->xp_registered = FALSE; 301 } 302 303 void 304 xprt_unregister(SVCXPRT *xprt) 305 { 306 SVCPOOL *pool = xprt->xp_pool; 307 308 mtx_lock(&pool->sp_lock); 309 xprt_unregister_locked(xprt); 310 mtx_unlock(&pool->sp_lock); 311 312 SVC_RELEASE(xprt); 313 } 314 315 static void 316 xprt_assignthread(SVCXPRT *xprt) 317 { 318 SVCPOOL *pool = xprt->xp_pool; 319 SVCTHREAD *st; 320 321 /* 322 * Attempt to assign a service thread to this 323 * transport. 324 */ 325 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) { 326 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs)) 327 break; 328 } 329 if (st) { 330 SVC_ACQUIRE(xprt); 331 xprt->xp_thread = st; 332 st->st_xprt = xprt; 333 cv_signal(&st->st_cond); 334 } else { 335 /* 336 * See if we can create a new thread. The 337 * actual thread creation happens in 338 * svc_run_internal because our locking state 339 * is poorly defined (we are typically called 340 * from a socket upcall). Don't create more 341 * than one thread per second. 342 */ 343 if (pool->sp_state == SVCPOOL_ACTIVE 344 && pool->sp_lastcreatetime < time_uptime 345 && pool->sp_threadcount < pool->sp_maxthreads) { 346 pool->sp_state = SVCPOOL_THREADWANTED; 347 } 348 } 349 } 350 351 void 352 xprt_active(SVCXPRT *xprt) 353 { 354 SVCPOOL *pool = xprt->xp_pool; 355 356 if (!xprt->xp_registered) { 357 /* 358 * Race with xprt_unregister - we lose. 359 */ 360 return; 361 } 362 363 mtx_lock(&pool->sp_lock); 364 365 if (!xprt->xp_active) { 366 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink); 367 xprt->xp_active = TRUE; 368 xprt_assignthread(xprt); 369 } 370 371 mtx_unlock(&pool->sp_lock); 372 } 373 374 void 375 xprt_inactive_locked(SVCXPRT *xprt) 376 { 377 SVCPOOL *pool = xprt->xp_pool; 378 379 if (xprt->xp_active) { 380 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 381 xprt->xp_active = FALSE; 382 } 383 } 384 385 void 386 xprt_inactive(SVCXPRT *xprt) 387 { 388 SVCPOOL *pool = xprt->xp_pool; 389 390 mtx_lock(&pool->sp_lock); 391 xprt_inactive_locked(xprt); 392 mtx_unlock(&pool->sp_lock); 393 } 394 395 /* 396 * Add a service program to the callout list. 397 * The dispatch routine will be called when a rpc request for this 398 * program number comes in. 399 */ 400 bool_t 401 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 402 void (*dispatch)(struct svc_req *, SVCXPRT *), 403 const struct netconfig *nconf) 404 { 405 SVCPOOL *pool = xprt->xp_pool; 406 struct svc_callout *s; 407 char *netid = NULL; 408 int flag = 0; 409 410 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 411 412 if (xprt->xp_netid) { 413 netid = strdup(xprt->xp_netid, M_RPC); 414 flag = 1; 415 } else if (nconf && nconf->nc_netid) { 416 netid = strdup(nconf->nc_netid, M_RPC); 417 flag = 1; 418 } /* must have been created with svc_raw_create */ 419 if ((netid == NULL) && (flag == 1)) { 420 return (FALSE); 421 } 422 423 mtx_lock(&pool->sp_lock); 424 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 425 if (netid) 426 free(netid, M_RPC); 427 if (s->sc_dispatch == dispatch) 428 goto rpcb_it; /* he is registering another xptr */ 429 mtx_unlock(&pool->sp_lock); 430 return (FALSE); 431 } 432 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 433 if (s == NULL) { 434 if (netid) 435 free(netid, M_RPC); 436 mtx_unlock(&pool->sp_lock); 437 return (FALSE); 438 } 439 440 s->sc_prog = prog; 441 s->sc_vers = vers; 442 s->sc_dispatch = dispatch; 443 s->sc_netid = netid; 444 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 445 446 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 447 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 448 449 rpcb_it: 450 mtx_unlock(&pool->sp_lock); 451 /* now register the information with the local binder service */ 452 if (nconf) { 453 bool_t dummy; 454 struct netconfig tnc; 455 struct netbuf nb; 456 tnc = *nconf; 457 nb.buf = &xprt->xp_ltaddr; 458 nb.len = xprt->xp_ltaddr.ss_len; 459 dummy = rpcb_set(prog, vers, &tnc, &nb); 460 return (dummy); 461 } 462 return (TRUE); 463 } 464 465 /* 466 * Remove a service program from the callout list. 467 */ 468 void 469 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 470 { 471 struct svc_callout *s; 472 473 /* unregister the information anyway */ 474 (void) rpcb_unset(prog, vers, NULL); 475 mtx_lock(&pool->sp_lock); 476 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 477 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 478 if (s->sc_netid) 479 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 480 mem_free(s, sizeof (struct svc_callout)); 481 } 482 mtx_unlock(&pool->sp_lock); 483 } 484 485 /* ********************** CALLOUT list related stuff ************* */ 486 487 /* 488 * Search the callout list for a program number, return the callout 489 * struct. 490 */ 491 static struct svc_callout * 492 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 493 { 494 struct svc_callout *s; 495 496 mtx_assert(&pool->sp_lock, MA_OWNED); 497 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 498 if (s->sc_prog == prog && s->sc_vers == vers 499 && (netid == NULL || s->sc_netid == NULL || 500 strcmp(netid, s->sc_netid) == 0)) 501 break; 502 } 503 504 return (s); 505 } 506 507 /* ******************* REPLY GENERATION ROUTINES ************ */ 508 509 static bool_t 510 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 511 struct mbuf *body) 512 { 513 SVCXPRT *xprt = rqstp->rq_xprt; 514 bool_t ok; 515 516 if (rqstp->rq_args) { 517 m_freem(rqstp->rq_args); 518 rqstp->rq_args = NULL; 519 } 520 521 if (xprt->xp_pool->sp_rcache) 522 replay_setreply(xprt->xp_pool->sp_rcache, 523 rply, svc_getrpccaller(rqstp), body); 524 525 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 526 return (FALSE); 527 528 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 529 if (rqstp->rq_addr) { 530 free(rqstp->rq_addr, M_SONAME); 531 rqstp->rq_addr = NULL; 532 } 533 534 return (ok); 535 } 536 537 /* 538 * Send a reply to an rpc request 539 */ 540 bool_t 541 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 542 { 543 struct rpc_msg rply; 544 struct mbuf *m; 545 XDR xdrs; 546 bool_t ok; 547 548 rply.rm_xid = rqstp->rq_xid; 549 rply.rm_direction = REPLY; 550 rply.rm_reply.rp_stat = MSG_ACCEPTED; 551 rply.acpted_rply.ar_verf = rqstp->rq_verf; 552 rply.acpted_rply.ar_stat = SUCCESS; 553 rply.acpted_rply.ar_results.where = NULL; 554 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 555 556 MGET(m, M_WAIT, MT_DATA); 557 MCLGET(m, M_WAIT); 558 m->m_len = 0; 559 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 560 ok = xdr_results(&xdrs, xdr_location); 561 XDR_DESTROY(&xdrs); 562 563 if (ok) { 564 return (svc_sendreply_common(rqstp, &rply, m)); 565 } else { 566 m_freem(m); 567 return (FALSE); 568 } 569 } 570 571 bool_t 572 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 573 { 574 struct rpc_msg rply; 575 576 rply.rm_xid = rqstp->rq_xid; 577 rply.rm_direction = REPLY; 578 rply.rm_reply.rp_stat = MSG_ACCEPTED; 579 rply.acpted_rply.ar_verf = rqstp->rq_verf; 580 rply.acpted_rply.ar_stat = SUCCESS; 581 rply.acpted_rply.ar_results.where = NULL; 582 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 583 584 return (svc_sendreply_common(rqstp, &rply, m)); 585 } 586 587 /* 588 * No procedure error reply 589 */ 590 void 591 svcerr_noproc(struct svc_req *rqstp) 592 { 593 SVCXPRT *xprt = rqstp->rq_xprt; 594 struct rpc_msg rply; 595 596 rply.rm_xid = rqstp->rq_xid; 597 rply.rm_direction = REPLY; 598 rply.rm_reply.rp_stat = MSG_ACCEPTED; 599 rply.acpted_rply.ar_verf = rqstp->rq_verf; 600 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 601 602 if (xprt->xp_pool->sp_rcache) 603 replay_setreply(xprt->xp_pool->sp_rcache, 604 &rply, svc_getrpccaller(rqstp), NULL); 605 606 svc_sendreply_common(rqstp, &rply, NULL); 607 } 608 609 /* 610 * Can't decode args error reply 611 */ 612 void 613 svcerr_decode(struct svc_req *rqstp) 614 { 615 SVCXPRT *xprt = rqstp->rq_xprt; 616 struct rpc_msg rply; 617 618 rply.rm_xid = rqstp->rq_xid; 619 rply.rm_direction = REPLY; 620 rply.rm_reply.rp_stat = MSG_ACCEPTED; 621 rply.acpted_rply.ar_verf = rqstp->rq_verf; 622 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 623 624 if (xprt->xp_pool->sp_rcache) 625 replay_setreply(xprt->xp_pool->sp_rcache, 626 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 627 628 svc_sendreply_common(rqstp, &rply, NULL); 629 } 630 631 /* 632 * Some system error 633 */ 634 void 635 svcerr_systemerr(struct svc_req *rqstp) 636 { 637 SVCXPRT *xprt = rqstp->rq_xprt; 638 struct rpc_msg rply; 639 640 rply.rm_xid = rqstp->rq_xid; 641 rply.rm_direction = REPLY; 642 rply.rm_reply.rp_stat = MSG_ACCEPTED; 643 rply.acpted_rply.ar_verf = rqstp->rq_verf; 644 rply.acpted_rply.ar_stat = SYSTEM_ERR; 645 646 if (xprt->xp_pool->sp_rcache) 647 replay_setreply(xprt->xp_pool->sp_rcache, 648 &rply, svc_getrpccaller(rqstp), NULL); 649 650 svc_sendreply_common(rqstp, &rply, NULL); 651 } 652 653 /* 654 * Authentication error reply 655 */ 656 void 657 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 658 { 659 SVCXPRT *xprt = rqstp->rq_xprt; 660 struct rpc_msg rply; 661 662 rply.rm_xid = rqstp->rq_xid; 663 rply.rm_direction = REPLY; 664 rply.rm_reply.rp_stat = MSG_DENIED; 665 rply.rjcted_rply.rj_stat = AUTH_ERROR; 666 rply.rjcted_rply.rj_why = why; 667 668 if (xprt->xp_pool->sp_rcache) 669 replay_setreply(xprt->xp_pool->sp_rcache, 670 &rply, svc_getrpccaller(rqstp), NULL); 671 672 svc_sendreply_common(rqstp, &rply, NULL); 673 } 674 675 /* 676 * Auth too weak error reply 677 */ 678 void 679 svcerr_weakauth(struct svc_req *rqstp) 680 { 681 682 svcerr_auth(rqstp, AUTH_TOOWEAK); 683 } 684 685 /* 686 * Program unavailable error reply 687 */ 688 void 689 svcerr_noprog(struct svc_req *rqstp) 690 { 691 SVCXPRT *xprt = rqstp->rq_xprt; 692 struct rpc_msg rply; 693 694 rply.rm_xid = rqstp->rq_xid; 695 rply.rm_direction = REPLY; 696 rply.rm_reply.rp_stat = MSG_ACCEPTED; 697 rply.acpted_rply.ar_verf = rqstp->rq_verf; 698 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 699 700 if (xprt->xp_pool->sp_rcache) 701 replay_setreply(xprt->xp_pool->sp_rcache, 702 &rply, svc_getrpccaller(rqstp), NULL); 703 704 svc_sendreply_common(rqstp, &rply, NULL); 705 } 706 707 /* 708 * Program version mismatch error reply 709 */ 710 void 711 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 712 { 713 SVCXPRT *xprt = rqstp->rq_xprt; 714 struct rpc_msg rply; 715 716 rply.rm_xid = rqstp->rq_xid; 717 rply.rm_direction = REPLY; 718 rply.rm_reply.rp_stat = MSG_ACCEPTED; 719 rply.acpted_rply.ar_verf = rqstp->rq_verf; 720 rply.acpted_rply.ar_stat = PROG_MISMATCH; 721 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 722 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 723 724 if (xprt->xp_pool->sp_rcache) 725 replay_setreply(xprt->xp_pool->sp_rcache, 726 &rply, svc_getrpccaller(rqstp), NULL); 727 728 svc_sendreply_common(rqstp, &rply, NULL); 729 } 730 731 /* 732 * Allocate a new server transport structure. All fields are 733 * initialized to zero and xp_p3 is initialized to point at an 734 * extension structure to hold various flags and authentication 735 * parameters. 736 */ 737 SVCXPRT * 738 svc_xprt_alloc() 739 { 740 SVCXPRT *xprt; 741 SVCXPRT_EXT *ext; 742 743 xprt = mem_alloc(sizeof(SVCXPRT)); 744 memset(xprt, 0, sizeof(SVCXPRT)); 745 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 746 memset(ext, 0, sizeof(SVCXPRT_EXT)); 747 xprt->xp_p3 = ext; 748 refcount_init(&xprt->xp_refs, 1); 749 750 return (xprt); 751 } 752 753 /* 754 * Free a server transport structure. 755 */ 756 void 757 svc_xprt_free(xprt) 758 SVCXPRT *xprt; 759 { 760 761 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 762 mem_free(xprt, sizeof(SVCXPRT)); 763 } 764 765 /* ******************* SERVER INPUT STUFF ******************* */ 766 767 /* 768 * Read RPC requests from a transport and queue them to be 769 * executed. We handle authentication and replay cache replies here. 770 * Actually dispatching the RPC is deferred till svc_executereq. 771 */ 772 static enum xprt_stat 773 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 774 { 775 SVCPOOL *pool = xprt->xp_pool; 776 struct svc_req *r; 777 struct rpc_msg msg; 778 struct mbuf *args; 779 enum xprt_stat stat; 780 781 /* now receive msgs from xprtprt (support batch calls) */ 782 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 783 784 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 785 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 786 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 787 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 788 enum auth_stat why; 789 790 /* 791 * Handle replays and authenticate before queuing the 792 * request to be executed. 793 */ 794 SVC_ACQUIRE(xprt); 795 r->rq_xprt = xprt; 796 if (pool->sp_rcache) { 797 struct rpc_msg repmsg; 798 struct mbuf *repbody; 799 enum replay_state rs; 800 rs = replay_find(pool->sp_rcache, &msg, 801 svc_getrpccaller(r), &repmsg, &repbody); 802 switch (rs) { 803 case RS_NEW: 804 break; 805 case RS_DONE: 806 SVC_REPLY(xprt, &repmsg, r->rq_addr, 807 repbody); 808 if (r->rq_addr) { 809 free(r->rq_addr, M_SONAME); 810 r->rq_addr = NULL; 811 } 812 goto call_done; 813 814 default: 815 goto call_done; 816 } 817 } 818 819 r->rq_xid = msg.rm_xid; 820 r->rq_prog = msg.rm_call.cb_prog; 821 r->rq_vers = msg.rm_call.cb_vers; 822 r->rq_proc = msg.rm_call.cb_proc; 823 r->rq_size = sizeof(*r) + m_length(args, NULL); 824 r->rq_args = args; 825 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 826 /* 827 * RPCSEC_GSS uses this return code 828 * for requests that form part of its 829 * context establishment protocol and 830 * should not be dispatched to the 831 * application. 832 */ 833 if (why != RPCSEC_GSS_NODISPATCH) 834 svcerr_auth(r, why); 835 goto call_done; 836 } 837 838 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 839 svcerr_decode(r); 840 goto call_done; 841 } 842 843 /* 844 * Everything checks out, return request to caller. 845 */ 846 *rqstp_ret = r; 847 r = NULL; 848 } 849 call_done: 850 if (r) { 851 svc_freereq(r); 852 r = NULL; 853 } 854 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 855 xprt_unregister(xprt); 856 } 857 858 return (stat); 859 } 860 861 static void 862 svc_executereq(struct svc_req *rqstp) 863 { 864 SVCXPRT *xprt = rqstp->rq_xprt; 865 SVCPOOL *pool = xprt->xp_pool; 866 int prog_found; 867 rpcvers_t low_vers; 868 rpcvers_t high_vers; 869 struct svc_callout *s; 870 871 /* now match message with a registered service*/ 872 prog_found = FALSE; 873 low_vers = (rpcvers_t) -1L; 874 high_vers = (rpcvers_t) 0L; 875 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 876 if (s->sc_prog == rqstp->rq_prog) { 877 if (s->sc_vers == rqstp->rq_vers) { 878 /* 879 * We hand ownership of r to the 880 * dispatch method - they must call 881 * svc_freereq. 882 */ 883 (*s->sc_dispatch)(rqstp, xprt); 884 return; 885 } /* found correct version */ 886 prog_found = TRUE; 887 if (s->sc_vers < low_vers) 888 low_vers = s->sc_vers; 889 if (s->sc_vers > high_vers) 890 high_vers = s->sc_vers; 891 } /* found correct program */ 892 } 893 894 /* 895 * if we got here, the program or version 896 * is not served ... 897 */ 898 if (prog_found) 899 svcerr_progvers(rqstp, low_vers, high_vers); 900 else 901 svcerr_noprog(rqstp); 902 903 svc_freereq(rqstp); 904 } 905 906 static void 907 svc_checkidle(SVCPOOL *pool) 908 { 909 SVCXPRT *xprt, *nxprt; 910 time_t timo; 911 struct svcxprt_list cleanup; 912 913 TAILQ_INIT(&cleanup); 914 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { 915 /* 916 * Only some transports have idle timers. Don't time 917 * something out which is just waking up. 918 */ 919 if (!xprt->xp_idletimeout || xprt->xp_thread) 920 continue; 921 922 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 923 if (time_uptime > timo) { 924 xprt_unregister_locked(xprt); 925 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 926 } 927 } 928 929 mtx_unlock(&pool->sp_lock); 930 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 931 SVC_RELEASE(xprt); 932 } 933 mtx_lock(&pool->sp_lock); 934 935 } 936 937 static void 938 svc_assign_waiting_sockets(SVCPOOL *pool) 939 { 940 SVCXPRT *xprt; 941 942 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) { 943 if (!xprt->xp_thread) { 944 xprt_assignthread(xprt); 945 } 946 } 947 } 948 949 static bool_t 950 svc_request_space_available(SVCPOOL *pool) 951 { 952 953 mtx_assert(&pool->sp_lock, MA_OWNED); 954 955 if (pool->sp_space_throttled) { 956 /* 957 * Below the low-water yet? If so, assign any waiting sockets. 958 */ 959 if (pool->sp_space_used < pool->sp_space_low) { 960 pool->sp_space_throttled = FALSE; 961 svc_assign_waiting_sockets(pool); 962 return TRUE; 963 } 964 965 return FALSE; 966 } else { 967 if (pool->sp_space_used 968 >= pool->sp_space_high) { 969 pool->sp_space_throttled = TRUE; 970 pool->sp_space_throttle_count++; 971 return FALSE; 972 } 973 974 return TRUE; 975 } 976 } 977 978 static void 979 svc_run_internal(SVCPOOL *pool, bool_t ismaster) 980 { 981 SVCTHREAD *st, *stpref; 982 SVCXPRT *xprt; 983 enum xprt_stat stat; 984 struct svc_req *rqstp; 985 int error; 986 987 st = mem_alloc(sizeof(*st)); 988 st->st_xprt = NULL; 989 STAILQ_INIT(&st->st_reqs); 990 cv_init(&st->st_cond, "rpcsvc"); 991 992 mtx_lock(&pool->sp_lock); 993 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); 994 995 /* 996 * If we are a new thread which was spawned to cope with 997 * increased load, set the state back to SVCPOOL_ACTIVE. 998 */ 999 if (pool->sp_state == SVCPOOL_THREADSTARTING) 1000 pool->sp_state = SVCPOOL_ACTIVE; 1001 1002 while (pool->sp_state != SVCPOOL_CLOSING) { 1003 /* 1004 * Check for idle transports once per second. 1005 */ 1006 if (time_uptime > pool->sp_lastidlecheck) { 1007 pool->sp_lastidlecheck = time_uptime; 1008 svc_checkidle(pool); 1009 } 1010 1011 xprt = st->st_xprt; 1012 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { 1013 /* 1014 * Enforce maxthreads count. 1015 */ 1016 if (pool->sp_threadcount > pool->sp_maxthreads) 1017 break; 1018 1019 /* 1020 * Before sleeping, see if we can find an 1021 * active transport which isn't being serviced 1022 * by a thread. 1023 */ 1024 if (svc_request_space_available(pool)) { 1025 TAILQ_FOREACH(xprt, &pool->sp_active, 1026 xp_alink) { 1027 if (!xprt->xp_thread) { 1028 SVC_ACQUIRE(xprt); 1029 xprt->xp_thread = st; 1030 st->st_xprt = xprt; 1031 break; 1032 } 1033 } 1034 } 1035 if (st->st_xprt) 1036 continue; 1037 1038 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); 1039 error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock, 1040 5 * hz); 1041 LIST_REMOVE(st, st_ilink); 1042 1043 /* 1044 * Reduce worker thread count when idle. 1045 */ 1046 if (error == EWOULDBLOCK) { 1047 if (!ismaster 1048 && (pool->sp_threadcount 1049 > pool->sp_minthreads) 1050 && !st->st_xprt 1051 && STAILQ_EMPTY(&st->st_reqs)) 1052 break; 1053 } 1054 if (error == EWOULDBLOCK) 1055 continue; 1056 if (error) { 1057 if (pool->sp_state != SVCPOOL_CLOSING) { 1058 mtx_unlock(&pool->sp_lock); 1059 svc_exit(pool); 1060 mtx_lock(&pool->sp_lock); 1061 } 1062 break; 1063 } 1064 1065 if (pool->sp_state == SVCPOOL_THREADWANTED) { 1066 pool->sp_state = SVCPOOL_THREADSTARTING; 1067 pool->sp_lastcreatetime = time_uptime; 1068 mtx_unlock(&pool->sp_lock); 1069 svc_new_thread(pool); 1070 mtx_lock(&pool->sp_lock); 1071 } 1072 continue; 1073 } 1074 1075 if (xprt) { 1076 /* 1077 * Drain the transport socket and queue up any 1078 * RPCs. 1079 */ 1080 xprt->xp_lastactive = time_uptime; 1081 stat = XPRT_IDLE; 1082 do { 1083 if (!svc_request_space_available(pool)) 1084 break; 1085 rqstp = NULL; 1086 mtx_unlock(&pool->sp_lock); 1087 stat = svc_getreq(xprt, &rqstp); 1088 mtx_lock(&pool->sp_lock); 1089 if (rqstp) { 1090 /* 1091 * See if the application has 1092 * a preference for some other 1093 * thread. 1094 */ 1095 stpref = st; 1096 if (pool->sp_assign) 1097 stpref = pool->sp_assign(st, 1098 rqstp); 1099 1100 pool->sp_space_used += 1101 rqstp->rq_size; 1102 if (pool->sp_space_used 1103 > pool->sp_space_used_highest) 1104 pool->sp_space_used_highest = 1105 pool->sp_space_used; 1106 rqstp->rq_thread = stpref; 1107 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1108 rqstp, rq_link); 1109 stpref->st_reqcount++; 1110 1111 /* 1112 * If we assigned the request 1113 * to another thread, make 1114 * sure its awake and continue 1115 * reading from the 1116 * socket. Otherwise, try to 1117 * find some other thread to 1118 * read from the socket and 1119 * execute the request 1120 * immediately. 1121 */ 1122 if (stpref != st) { 1123 cv_signal(&stpref->st_cond); 1124 continue; 1125 } else { 1126 break; 1127 } 1128 } 1129 } while (stat == XPRT_MOREREQS 1130 && pool->sp_state != SVCPOOL_CLOSING); 1131 1132 /* 1133 * Move this transport to the end of the 1134 * active list to ensure fairness when 1135 * multiple transports are active. If this was 1136 * the last queued request, svc_getreq will 1137 * end up calling xprt_inactive to remove from 1138 * the active list. 1139 */ 1140 xprt->xp_thread = NULL; 1141 st->st_xprt = NULL; 1142 if (xprt->xp_active) { 1143 xprt_assignthread(xprt); 1144 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 1145 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, 1146 xp_alink); 1147 } 1148 mtx_unlock(&pool->sp_lock); 1149 SVC_RELEASE(xprt); 1150 mtx_lock(&pool->sp_lock); 1151 } 1152 1153 /* 1154 * Execute what we have queued. 1155 */ 1156 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1157 size_t sz = rqstp->rq_size; 1158 mtx_unlock(&pool->sp_lock); 1159 svc_executereq(rqstp); 1160 mtx_lock(&pool->sp_lock); 1161 pool->sp_space_used -= sz; 1162 } 1163 } 1164 1165 if (st->st_xprt) { 1166 xprt = st->st_xprt; 1167 st->st_xprt = NULL; 1168 SVC_RELEASE(xprt); 1169 } 1170 1171 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1172 LIST_REMOVE(st, st_link); 1173 pool->sp_threadcount--; 1174 1175 mtx_unlock(&pool->sp_lock); 1176 1177 cv_destroy(&st->st_cond); 1178 mem_free(st, sizeof(*st)); 1179 1180 if (!ismaster) 1181 wakeup(pool); 1182 } 1183 1184 static void 1185 svc_thread_start(void *arg) 1186 { 1187 1188 svc_run_internal((SVCPOOL *) arg, FALSE); 1189 kthread_exit(); 1190 } 1191 1192 static void 1193 svc_new_thread(SVCPOOL *pool) 1194 { 1195 struct thread *td; 1196 1197 pool->sp_threadcount++; 1198 kthread_add(svc_thread_start, pool, 1199 pool->sp_proc, &td, 0, 0, 1200 "%s: service", pool->sp_name); 1201 } 1202 1203 void 1204 svc_run(SVCPOOL *pool) 1205 { 1206 int i; 1207 struct proc *p; 1208 struct thread *td; 1209 1210 p = curproc; 1211 td = curthread; 1212 snprintf(td->td_name, sizeof(td->td_name), 1213 "%s: master", pool->sp_name); 1214 pool->sp_state = SVCPOOL_ACTIVE; 1215 pool->sp_proc = p; 1216 pool->sp_lastcreatetime = time_uptime; 1217 pool->sp_threadcount = 1; 1218 1219 for (i = 1; i < pool->sp_minthreads; i++) { 1220 svc_new_thread(pool); 1221 } 1222 1223 svc_run_internal(pool, TRUE); 1224 1225 mtx_lock(&pool->sp_lock); 1226 while (pool->sp_threadcount > 0) 1227 msleep(pool, &pool->sp_lock, 0, "svcexit", 0); 1228 mtx_unlock(&pool->sp_lock); 1229 } 1230 1231 void 1232 svc_exit(SVCPOOL *pool) 1233 { 1234 SVCTHREAD *st; 1235 1236 mtx_lock(&pool->sp_lock); 1237 1238 pool->sp_state = SVCPOOL_CLOSING; 1239 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 1240 cv_signal(&st->st_cond); 1241 1242 mtx_unlock(&pool->sp_lock); 1243 } 1244 1245 bool_t 1246 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1247 { 1248 struct mbuf *m; 1249 XDR xdrs; 1250 bool_t stat; 1251 1252 m = rqstp->rq_args; 1253 rqstp->rq_args = NULL; 1254 1255 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1256 stat = xargs(&xdrs, args); 1257 XDR_DESTROY(&xdrs); 1258 1259 return (stat); 1260 } 1261 1262 bool_t 1263 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1264 { 1265 XDR xdrs; 1266 1267 if (rqstp->rq_addr) { 1268 free(rqstp->rq_addr, M_SONAME); 1269 rqstp->rq_addr = NULL; 1270 } 1271 1272 xdrs.x_op = XDR_FREE; 1273 return (xargs(&xdrs, args)); 1274 } 1275 1276 void 1277 svc_freereq(struct svc_req *rqstp) 1278 { 1279 SVCTHREAD *st; 1280 SVCXPRT *xprt; 1281 SVCPOOL *pool; 1282 1283 st = rqstp->rq_thread; 1284 xprt = rqstp->rq_xprt; 1285 if (xprt) 1286 pool = xprt->xp_pool; 1287 else 1288 pool = NULL; 1289 if (st) { 1290 mtx_lock(&pool->sp_lock); 1291 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), 1292 ("Freeing request out of order")); 1293 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1294 st->st_reqcount--; 1295 if (pool->sp_done) 1296 pool->sp_done(st, rqstp); 1297 mtx_unlock(&pool->sp_lock); 1298 } 1299 1300 if (rqstp->rq_auth.svc_ah_ops) 1301 SVCAUTH_RELEASE(&rqstp->rq_auth); 1302 1303 if (rqstp->rq_xprt) { 1304 SVC_RELEASE(rqstp->rq_xprt); 1305 } 1306 1307 if (rqstp->rq_addr) 1308 free(rqstp->rq_addr, M_SONAME); 1309 1310 if (rqstp->rq_args) 1311 m_freem(rqstp->rq_args); 1312 1313 free(rqstp, M_RPC); 1314 } 1315