1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 32 #if defined(LIBC_SCCS) && !defined(lint) 33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 34 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 35 #endif 36 #include <sys/cdefs.h> 37 __FBSDID("$FreeBSD$"); 38 39 /* 40 * svc.c, Server-side remote procedure call interface. 41 * 42 * There are two sets of procedures here. The xprt routines are 43 * for handling transport handles. The svc routines handle the 44 * list of service routines. 45 * 46 * Copyright (C) 1984, Sun Microsystems, Inc. 47 */ 48 49 #include <sys/param.h> 50 #include <sys/lock.h> 51 #include <sys/kernel.h> 52 #include <sys/kthread.h> 53 #include <sys/malloc.h> 54 #include <sys/mbuf.h> 55 #include <sys/mutex.h> 56 #include <sys/proc.h> 57 #include <sys/queue.h> 58 #include <sys/socketvar.h> 59 #include <sys/systm.h> 60 #include <sys/ucred.h> 61 62 #include <rpc/rpc.h> 63 #include <rpc/rpcb_clnt.h> 64 #include <rpc/replay.h> 65 66 #include <rpc/rpc_com.h> 67 68 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 70 71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 72 char *); 73 static void svc_new_thread(SVCPOOL *pool); 74 static void xprt_unregister_locked(SVCXPRT *xprt); 75 76 /* *************** SVCXPRT related stuff **************** */ 77 78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 80 81 SVCPOOL* 82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 83 { 84 SVCPOOL *pool; 85 86 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 87 88 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 89 pool->sp_name = name; 90 pool->sp_state = SVCPOOL_INIT; 91 pool->sp_proc = NULL; 92 TAILQ_INIT(&pool->sp_xlist); 93 TAILQ_INIT(&pool->sp_active); 94 TAILQ_INIT(&pool->sp_callouts); 95 LIST_INIT(&pool->sp_threads); 96 LIST_INIT(&pool->sp_idlethreads); 97 pool->sp_minthreads = 1; 98 pool->sp_maxthreads = 1; 99 pool->sp_threadcount = 0; 100 101 /* 102 * Don't use more than a quarter of mbuf clusters or more than 103 * 45Mb buffering requests. 104 */ 105 pool->sp_space_high = nmbclusters * MCLBYTES / 4; 106 if (pool->sp_space_high > 45 << 20) 107 pool->sp_space_high = 45 << 20; 108 pool->sp_space_low = 2 * pool->sp_space_high / 3; 109 110 sysctl_ctx_init(&pool->sp_sysctl); 111 if (sysctl_base) { 112 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 113 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 114 pool, 0, svcpool_minthread_sysctl, "I", ""); 115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 116 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 117 pool, 0, svcpool_maxthread_sysctl, "I", ""); 118 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 119 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); 120 121 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 122 "request_space_used", CTLFLAG_RD, 123 &pool->sp_space_used, 0, 124 "Space in parsed but not handled requests."); 125 126 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 127 "request_space_used_highest", CTLFLAG_RD, 128 &pool->sp_space_used_highest, 0, 129 "Highest space used since reboot."); 130 131 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 132 "request_space_high", CTLFLAG_RW, 133 &pool->sp_space_high, 0, 134 "Maximum space in parsed but not handled requests."); 135 136 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 137 "request_space_low", CTLFLAG_RW, 138 &pool->sp_space_low, 0, 139 "Low water mark for request space."); 140 141 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 142 "request_space_throttled", CTLFLAG_RD, 143 &pool->sp_space_throttled, 0, 144 "Whether nfs requests are currently throttled"); 145 146 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_throttle_count", CTLFLAG_RD, 148 &pool->sp_space_throttle_count, 0, 149 "Count of times throttling based on request space has occurred"); 150 } 151 152 return pool; 153 } 154 155 void 156 svcpool_destroy(SVCPOOL *pool) 157 { 158 SVCXPRT *xprt, *nxprt; 159 struct svc_callout *s; 160 struct svcxprt_list cleanup; 161 162 TAILQ_INIT(&cleanup); 163 mtx_lock(&pool->sp_lock); 164 165 while (TAILQ_FIRST(&pool->sp_xlist)) { 166 xprt = TAILQ_FIRST(&pool->sp_xlist); 167 xprt_unregister_locked(xprt); 168 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 169 } 170 171 while (TAILQ_FIRST(&pool->sp_callouts)) { 172 s = TAILQ_FIRST(&pool->sp_callouts); 173 mtx_unlock(&pool->sp_lock); 174 svc_unreg(pool, s->sc_prog, s->sc_vers); 175 mtx_lock(&pool->sp_lock); 176 } 177 178 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 179 SVC_RELEASE(xprt); 180 } 181 182 mtx_destroy(&pool->sp_lock); 183 184 if (pool->sp_rcache) 185 replay_freecache(pool->sp_rcache); 186 187 sysctl_ctx_free(&pool->sp_sysctl); 188 free(pool, M_RPC); 189 } 190 191 static bool_t 192 svcpool_active(SVCPOOL *pool) 193 { 194 enum svcpool_state state = pool->sp_state; 195 196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) 197 return (FALSE); 198 return (TRUE); 199 } 200 201 /* 202 * Sysctl handler to set the minimum thread count on a pool 203 */ 204 static int 205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 206 { 207 SVCPOOL *pool; 208 int newminthreads, error, n; 209 210 pool = oidp->oid_arg1; 211 newminthreads = pool->sp_minthreads; 212 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 213 if (error == 0 && newminthreads != pool->sp_minthreads) { 214 if (newminthreads > pool->sp_maxthreads) 215 return (EINVAL); 216 mtx_lock(&pool->sp_lock); 217 if (newminthreads > pool->sp_minthreads 218 && svcpool_active(pool)) { 219 /* 220 * If the pool is running and we are 221 * increasing, create some more threads now. 222 */ 223 n = newminthreads - pool->sp_threadcount; 224 if (n > 0) { 225 mtx_unlock(&pool->sp_lock); 226 while (n--) 227 svc_new_thread(pool); 228 mtx_lock(&pool->sp_lock); 229 } 230 } 231 pool->sp_minthreads = newminthreads; 232 mtx_unlock(&pool->sp_lock); 233 } 234 return (error); 235 } 236 237 /* 238 * Sysctl handler to set the maximum thread count on a pool 239 */ 240 static int 241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 242 { 243 SVCPOOL *pool; 244 SVCTHREAD *st; 245 int newmaxthreads, error; 246 247 pool = oidp->oid_arg1; 248 newmaxthreads = pool->sp_maxthreads; 249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 251 if (newmaxthreads < pool->sp_minthreads) 252 return (EINVAL); 253 mtx_lock(&pool->sp_lock); 254 if (newmaxthreads < pool->sp_maxthreads 255 && svcpool_active(pool)) { 256 /* 257 * If the pool is running and we are 258 * decreasing, wake up some idle threads to 259 * encourage them to exit. 260 */ 261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 262 cv_signal(&st->st_cond); 263 } 264 pool->sp_maxthreads = newmaxthreads; 265 mtx_unlock(&pool->sp_lock); 266 } 267 return (error); 268 } 269 270 /* 271 * Activate a transport handle. 272 */ 273 void 274 xprt_register(SVCXPRT *xprt) 275 { 276 SVCPOOL *pool = xprt->xp_pool; 277 278 mtx_lock(&pool->sp_lock); 279 xprt->xp_registered = TRUE; 280 xprt->xp_active = FALSE; 281 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); 282 mtx_unlock(&pool->sp_lock); 283 } 284 285 /* 286 * De-activate a transport handle. Note: the locked version doesn't 287 * release the transport - caller must do that after dropping the pool 288 * lock. 289 */ 290 static void 291 xprt_unregister_locked(SVCXPRT *xprt) 292 { 293 SVCPOOL *pool = xprt->xp_pool; 294 295 if (xprt->xp_active) { 296 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 297 xprt->xp_active = FALSE; 298 } 299 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); 300 xprt->xp_registered = FALSE; 301 } 302 303 void 304 xprt_unregister(SVCXPRT *xprt) 305 { 306 SVCPOOL *pool = xprt->xp_pool; 307 308 mtx_lock(&pool->sp_lock); 309 xprt_unregister_locked(xprt); 310 mtx_unlock(&pool->sp_lock); 311 312 SVC_RELEASE(xprt); 313 } 314 315 static void 316 xprt_assignthread(SVCXPRT *xprt) 317 { 318 SVCPOOL *pool = xprt->xp_pool; 319 SVCTHREAD *st; 320 321 /* 322 * Attempt to assign a service thread to this 323 * transport. 324 */ 325 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) { 326 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs)) 327 break; 328 } 329 if (st) { 330 SVC_ACQUIRE(xprt); 331 xprt->xp_thread = st; 332 st->st_xprt = xprt; 333 cv_signal(&st->st_cond); 334 } else { 335 /* 336 * See if we can create a new thread. The 337 * actual thread creation happens in 338 * svc_run_internal because our locking state 339 * is poorly defined (we are typically called 340 * from a socket upcall). Don't create more 341 * than one thread per second. 342 */ 343 if (pool->sp_state == SVCPOOL_ACTIVE 344 && pool->sp_lastcreatetime < time_uptime 345 && pool->sp_threadcount < pool->sp_maxthreads) { 346 pool->sp_state = SVCPOOL_THREADWANTED; 347 } 348 } 349 } 350 351 void 352 xprt_active(SVCXPRT *xprt) 353 { 354 SVCPOOL *pool = xprt->xp_pool; 355 356 mtx_lock(&pool->sp_lock); 357 358 if (!xprt->xp_registered) { 359 /* 360 * Race with xprt_unregister - we lose. 361 */ 362 mtx_unlock(&pool->sp_lock); 363 return; 364 } 365 366 if (!xprt->xp_active) { 367 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink); 368 xprt->xp_active = TRUE; 369 xprt_assignthread(xprt); 370 } 371 372 mtx_unlock(&pool->sp_lock); 373 } 374 375 void 376 xprt_inactive_locked(SVCXPRT *xprt) 377 { 378 SVCPOOL *pool = xprt->xp_pool; 379 380 if (xprt->xp_active) { 381 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 382 xprt->xp_active = FALSE; 383 } 384 } 385 386 void 387 xprt_inactive(SVCXPRT *xprt) 388 { 389 SVCPOOL *pool = xprt->xp_pool; 390 391 mtx_lock(&pool->sp_lock); 392 xprt_inactive_locked(xprt); 393 mtx_unlock(&pool->sp_lock); 394 } 395 396 /* 397 * Add a service program to the callout list. 398 * The dispatch routine will be called when a rpc request for this 399 * program number comes in. 400 */ 401 bool_t 402 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 403 void (*dispatch)(struct svc_req *, SVCXPRT *), 404 const struct netconfig *nconf) 405 { 406 SVCPOOL *pool = xprt->xp_pool; 407 struct svc_callout *s; 408 char *netid = NULL; 409 int flag = 0; 410 411 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 412 413 if (xprt->xp_netid) { 414 netid = strdup(xprt->xp_netid, M_RPC); 415 flag = 1; 416 } else if (nconf && nconf->nc_netid) { 417 netid = strdup(nconf->nc_netid, M_RPC); 418 flag = 1; 419 } /* must have been created with svc_raw_create */ 420 if ((netid == NULL) && (flag == 1)) { 421 return (FALSE); 422 } 423 424 mtx_lock(&pool->sp_lock); 425 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 426 if (netid) 427 free(netid, M_RPC); 428 if (s->sc_dispatch == dispatch) 429 goto rpcb_it; /* he is registering another xptr */ 430 mtx_unlock(&pool->sp_lock); 431 return (FALSE); 432 } 433 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 434 if (s == NULL) { 435 if (netid) 436 free(netid, M_RPC); 437 mtx_unlock(&pool->sp_lock); 438 return (FALSE); 439 } 440 441 s->sc_prog = prog; 442 s->sc_vers = vers; 443 s->sc_dispatch = dispatch; 444 s->sc_netid = netid; 445 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 446 447 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 448 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 449 450 rpcb_it: 451 mtx_unlock(&pool->sp_lock); 452 /* now register the information with the local binder service */ 453 if (nconf) { 454 bool_t dummy; 455 struct netconfig tnc; 456 struct netbuf nb; 457 tnc = *nconf; 458 nb.buf = &xprt->xp_ltaddr; 459 nb.len = xprt->xp_ltaddr.ss_len; 460 dummy = rpcb_set(prog, vers, &tnc, &nb); 461 return (dummy); 462 } 463 return (TRUE); 464 } 465 466 /* 467 * Remove a service program from the callout list. 468 */ 469 void 470 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 471 { 472 struct svc_callout *s; 473 474 /* unregister the information anyway */ 475 (void) rpcb_unset(prog, vers, NULL); 476 mtx_lock(&pool->sp_lock); 477 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 478 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 479 if (s->sc_netid) 480 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 481 mem_free(s, sizeof (struct svc_callout)); 482 } 483 mtx_unlock(&pool->sp_lock); 484 } 485 486 /* ********************** CALLOUT list related stuff ************* */ 487 488 /* 489 * Search the callout list for a program number, return the callout 490 * struct. 491 */ 492 static struct svc_callout * 493 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 494 { 495 struct svc_callout *s; 496 497 mtx_assert(&pool->sp_lock, MA_OWNED); 498 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 499 if (s->sc_prog == prog && s->sc_vers == vers 500 && (netid == NULL || s->sc_netid == NULL || 501 strcmp(netid, s->sc_netid) == 0)) 502 break; 503 } 504 505 return (s); 506 } 507 508 /* ******************* REPLY GENERATION ROUTINES ************ */ 509 510 static bool_t 511 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 512 struct mbuf *body) 513 { 514 SVCXPRT *xprt = rqstp->rq_xprt; 515 bool_t ok; 516 517 if (rqstp->rq_args) { 518 m_freem(rqstp->rq_args); 519 rqstp->rq_args = NULL; 520 } 521 522 if (xprt->xp_pool->sp_rcache) 523 replay_setreply(xprt->xp_pool->sp_rcache, 524 rply, svc_getrpccaller(rqstp), body); 525 526 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 527 return (FALSE); 528 529 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 530 if (rqstp->rq_addr) { 531 free(rqstp->rq_addr, M_SONAME); 532 rqstp->rq_addr = NULL; 533 } 534 535 return (ok); 536 } 537 538 /* 539 * Send a reply to an rpc request 540 */ 541 bool_t 542 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 543 { 544 struct rpc_msg rply; 545 struct mbuf *m; 546 XDR xdrs; 547 bool_t ok; 548 549 rply.rm_xid = rqstp->rq_xid; 550 rply.rm_direction = REPLY; 551 rply.rm_reply.rp_stat = MSG_ACCEPTED; 552 rply.acpted_rply.ar_verf = rqstp->rq_verf; 553 rply.acpted_rply.ar_stat = SUCCESS; 554 rply.acpted_rply.ar_results.where = NULL; 555 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 556 557 MGET(m, M_WAIT, MT_DATA); 558 MCLGET(m, M_WAIT); 559 m->m_len = 0; 560 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 561 ok = xdr_results(&xdrs, xdr_location); 562 XDR_DESTROY(&xdrs); 563 564 if (ok) { 565 return (svc_sendreply_common(rqstp, &rply, m)); 566 } else { 567 m_freem(m); 568 return (FALSE); 569 } 570 } 571 572 bool_t 573 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 574 { 575 struct rpc_msg rply; 576 577 rply.rm_xid = rqstp->rq_xid; 578 rply.rm_direction = REPLY; 579 rply.rm_reply.rp_stat = MSG_ACCEPTED; 580 rply.acpted_rply.ar_verf = rqstp->rq_verf; 581 rply.acpted_rply.ar_stat = SUCCESS; 582 rply.acpted_rply.ar_results.where = NULL; 583 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 584 585 return (svc_sendreply_common(rqstp, &rply, m)); 586 } 587 588 /* 589 * No procedure error reply 590 */ 591 void 592 svcerr_noproc(struct svc_req *rqstp) 593 { 594 SVCXPRT *xprt = rqstp->rq_xprt; 595 struct rpc_msg rply; 596 597 rply.rm_xid = rqstp->rq_xid; 598 rply.rm_direction = REPLY; 599 rply.rm_reply.rp_stat = MSG_ACCEPTED; 600 rply.acpted_rply.ar_verf = rqstp->rq_verf; 601 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 602 603 if (xprt->xp_pool->sp_rcache) 604 replay_setreply(xprt->xp_pool->sp_rcache, 605 &rply, svc_getrpccaller(rqstp), NULL); 606 607 svc_sendreply_common(rqstp, &rply, NULL); 608 } 609 610 /* 611 * Can't decode args error reply 612 */ 613 void 614 svcerr_decode(struct svc_req *rqstp) 615 { 616 SVCXPRT *xprt = rqstp->rq_xprt; 617 struct rpc_msg rply; 618 619 rply.rm_xid = rqstp->rq_xid; 620 rply.rm_direction = REPLY; 621 rply.rm_reply.rp_stat = MSG_ACCEPTED; 622 rply.acpted_rply.ar_verf = rqstp->rq_verf; 623 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 624 625 if (xprt->xp_pool->sp_rcache) 626 replay_setreply(xprt->xp_pool->sp_rcache, 627 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 628 629 svc_sendreply_common(rqstp, &rply, NULL); 630 } 631 632 /* 633 * Some system error 634 */ 635 void 636 svcerr_systemerr(struct svc_req *rqstp) 637 { 638 SVCXPRT *xprt = rqstp->rq_xprt; 639 struct rpc_msg rply; 640 641 rply.rm_xid = rqstp->rq_xid; 642 rply.rm_direction = REPLY; 643 rply.rm_reply.rp_stat = MSG_ACCEPTED; 644 rply.acpted_rply.ar_verf = rqstp->rq_verf; 645 rply.acpted_rply.ar_stat = SYSTEM_ERR; 646 647 if (xprt->xp_pool->sp_rcache) 648 replay_setreply(xprt->xp_pool->sp_rcache, 649 &rply, svc_getrpccaller(rqstp), NULL); 650 651 svc_sendreply_common(rqstp, &rply, NULL); 652 } 653 654 /* 655 * Authentication error reply 656 */ 657 void 658 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 659 { 660 SVCXPRT *xprt = rqstp->rq_xprt; 661 struct rpc_msg rply; 662 663 rply.rm_xid = rqstp->rq_xid; 664 rply.rm_direction = REPLY; 665 rply.rm_reply.rp_stat = MSG_DENIED; 666 rply.rjcted_rply.rj_stat = AUTH_ERROR; 667 rply.rjcted_rply.rj_why = why; 668 669 if (xprt->xp_pool->sp_rcache) 670 replay_setreply(xprt->xp_pool->sp_rcache, 671 &rply, svc_getrpccaller(rqstp), NULL); 672 673 svc_sendreply_common(rqstp, &rply, NULL); 674 } 675 676 /* 677 * Auth too weak error reply 678 */ 679 void 680 svcerr_weakauth(struct svc_req *rqstp) 681 { 682 683 svcerr_auth(rqstp, AUTH_TOOWEAK); 684 } 685 686 /* 687 * Program unavailable error reply 688 */ 689 void 690 svcerr_noprog(struct svc_req *rqstp) 691 { 692 SVCXPRT *xprt = rqstp->rq_xprt; 693 struct rpc_msg rply; 694 695 rply.rm_xid = rqstp->rq_xid; 696 rply.rm_direction = REPLY; 697 rply.rm_reply.rp_stat = MSG_ACCEPTED; 698 rply.acpted_rply.ar_verf = rqstp->rq_verf; 699 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 700 701 if (xprt->xp_pool->sp_rcache) 702 replay_setreply(xprt->xp_pool->sp_rcache, 703 &rply, svc_getrpccaller(rqstp), NULL); 704 705 svc_sendreply_common(rqstp, &rply, NULL); 706 } 707 708 /* 709 * Program version mismatch error reply 710 */ 711 void 712 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 713 { 714 SVCXPRT *xprt = rqstp->rq_xprt; 715 struct rpc_msg rply; 716 717 rply.rm_xid = rqstp->rq_xid; 718 rply.rm_direction = REPLY; 719 rply.rm_reply.rp_stat = MSG_ACCEPTED; 720 rply.acpted_rply.ar_verf = rqstp->rq_verf; 721 rply.acpted_rply.ar_stat = PROG_MISMATCH; 722 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 723 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 724 725 if (xprt->xp_pool->sp_rcache) 726 replay_setreply(xprt->xp_pool->sp_rcache, 727 &rply, svc_getrpccaller(rqstp), NULL); 728 729 svc_sendreply_common(rqstp, &rply, NULL); 730 } 731 732 /* 733 * Allocate a new server transport structure. All fields are 734 * initialized to zero and xp_p3 is initialized to point at an 735 * extension structure to hold various flags and authentication 736 * parameters. 737 */ 738 SVCXPRT * 739 svc_xprt_alloc() 740 { 741 SVCXPRT *xprt; 742 SVCXPRT_EXT *ext; 743 744 xprt = mem_alloc(sizeof(SVCXPRT)); 745 memset(xprt, 0, sizeof(SVCXPRT)); 746 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 747 memset(ext, 0, sizeof(SVCXPRT_EXT)); 748 xprt->xp_p3 = ext; 749 refcount_init(&xprt->xp_refs, 1); 750 751 return (xprt); 752 } 753 754 /* 755 * Free a server transport structure. 756 */ 757 void 758 svc_xprt_free(xprt) 759 SVCXPRT *xprt; 760 { 761 762 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 763 mem_free(xprt, sizeof(SVCXPRT)); 764 } 765 766 /* ******************* SERVER INPUT STUFF ******************* */ 767 768 /* 769 * Read RPC requests from a transport and queue them to be 770 * executed. We handle authentication and replay cache replies here. 771 * Actually dispatching the RPC is deferred till svc_executereq. 772 */ 773 static enum xprt_stat 774 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 775 { 776 SVCPOOL *pool = xprt->xp_pool; 777 struct svc_req *r; 778 struct rpc_msg msg; 779 struct mbuf *args; 780 enum xprt_stat stat; 781 782 /* now receive msgs from xprtprt (support batch calls) */ 783 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 784 785 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 786 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 787 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 788 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 789 enum auth_stat why; 790 791 /* 792 * Handle replays and authenticate before queuing the 793 * request to be executed. 794 */ 795 SVC_ACQUIRE(xprt); 796 r->rq_xprt = xprt; 797 if (pool->sp_rcache) { 798 struct rpc_msg repmsg; 799 struct mbuf *repbody; 800 enum replay_state rs; 801 rs = replay_find(pool->sp_rcache, &msg, 802 svc_getrpccaller(r), &repmsg, &repbody); 803 switch (rs) { 804 case RS_NEW: 805 break; 806 case RS_DONE: 807 SVC_REPLY(xprt, &repmsg, r->rq_addr, 808 repbody); 809 if (r->rq_addr) { 810 free(r->rq_addr, M_SONAME); 811 r->rq_addr = NULL; 812 } 813 goto call_done; 814 815 default: 816 goto call_done; 817 } 818 } 819 820 r->rq_xid = msg.rm_xid; 821 r->rq_prog = msg.rm_call.cb_prog; 822 r->rq_vers = msg.rm_call.cb_vers; 823 r->rq_proc = msg.rm_call.cb_proc; 824 r->rq_size = sizeof(*r) + m_length(args, NULL); 825 r->rq_args = args; 826 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 827 /* 828 * RPCSEC_GSS uses this return code 829 * for requests that form part of its 830 * context establishment protocol and 831 * should not be dispatched to the 832 * application. 833 */ 834 if (why != RPCSEC_GSS_NODISPATCH) 835 svcerr_auth(r, why); 836 goto call_done; 837 } 838 839 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 840 svcerr_decode(r); 841 goto call_done; 842 } 843 844 /* 845 * Everything checks out, return request to caller. 846 */ 847 *rqstp_ret = r; 848 r = NULL; 849 } 850 call_done: 851 if (r) { 852 svc_freereq(r); 853 r = NULL; 854 } 855 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 856 xprt_unregister(xprt); 857 } 858 859 return (stat); 860 } 861 862 static void 863 svc_executereq(struct svc_req *rqstp) 864 { 865 SVCXPRT *xprt = rqstp->rq_xprt; 866 SVCPOOL *pool = xprt->xp_pool; 867 int prog_found; 868 rpcvers_t low_vers; 869 rpcvers_t high_vers; 870 struct svc_callout *s; 871 872 /* now match message with a registered service*/ 873 prog_found = FALSE; 874 low_vers = (rpcvers_t) -1L; 875 high_vers = (rpcvers_t) 0L; 876 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 877 if (s->sc_prog == rqstp->rq_prog) { 878 if (s->sc_vers == rqstp->rq_vers) { 879 /* 880 * We hand ownership of r to the 881 * dispatch method - they must call 882 * svc_freereq. 883 */ 884 (*s->sc_dispatch)(rqstp, xprt); 885 return; 886 } /* found correct version */ 887 prog_found = TRUE; 888 if (s->sc_vers < low_vers) 889 low_vers = s->sc_vers; 890 if (s->sc_vers > high_vers) 891 high_vers = s->sc_vers; 892 } /* found correct program */ 893 } 894 895 /* 896 * if we got here, the program or version 897 * is not served ... 898 */ 899 if (prog_found) 900 svcerr_progvers(rqstp, low_vers, high_vers); 901 else 902 svcerr_noprog(rqstp); 903 904 svc_freereq(rqstp); 905 } 906 907 static void 908 svc_checkidle(SVCPOOL *pool) 909 { 910 SVCXPRT *xprt, *nxprt; 911 time_t timo; 912 struct svcxprt_list cleanup; 913 914 TAILQ_INIT(&cleanup); 915 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { 916 /* 917 * Only some transports have idle timers. Don't time 918 * something out which is just waking up. 919 */ 920 if (!xprt->xp_idletimeout || xprt->xp_thread) 921 continue; 922 923 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 924 if (time_uptime > timo) { 925 xprt_unregister_locked(xprt); 926 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 927 } 928 } 929 930 mtx_unlock(&pool->sp_lock); 931 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 932 SVC_RELEASE(xprt); 933 } 934 mtx_lock(&pool->sp_lock); 935 936 } 937 938 static void 939 svc_assign_waiting_sockets(SVCPOOL *pool) 940 { 941 SVCXPRT *xprt; 942 943 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) { 944 if (!xprt->xp_thread) { 945 xprt_assignthread(xprt); 946 } 947 } 948 } 949 950 static bool_t 951 svc_request_space_available(SVCPOOL *pool) 952 { 953 954 mtx_assert(&pool->sp_lock, MA_OWNED); 955 956 if (pool->sp_space_throttled) { 957 /* 958 * Below the low-water yet? If so, assign any waiting sockets. 959 */ 960 if (pool->sp_space_used < pool->sp_space_low) { 961 pool->sp_space_throttled = FALSE; 962 svc_assign_waiting_sockets(pool); 963 return TRUE; 964 } 965 966 return FALSE; 967 } else { 968 if (pool->sp_space_used 969 >= pool->sp_space_high) { 970 pool->sp_space_throttled = TRUE; 971 pool->sp_space_throttle_count++; 972 return FALSE; 973 } 974 975 return TRUE; 976 } 977 } 978 979 static void 980 svc_run_internal(SVCPOOL *pool, bool_t ismaster) 981 { 982 SVCTHREAD *st, *stpref; 983 SVCXPRT *xprt; 984 enum xprt_stat stat; 985 struct svc_req *rqstp; 986 int error; 987 988 st = mem_alloc(sizeof(*st)); 989 st->st_xprt = NULL; 990 STAILQ_INIT(&st->st_reqs); 991 cv_init(&st->st_cond, "rpcsvc"); 992 993 mtx_lock(&pool->sp_lock); 994 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); 995 996 /* 997 * If we are a new thread which was spawned to cope with 998 * increased load, set the state back to SVCPOOL_ACTIVE. 999 */ 1000 if (pool->sp_state == SVCPOOL_THREADSTARTING) 1001 pool->sp_state = SVCPOOL_ACTIVE; 1002 1003 while (pool->sp_state != SVCPOOL_CLOSING) { 1004 /* 1005 * Check for idle transports once per second. 1006 */ 1007 if (time_uptime > pool->sp_lastidlecheck) { 1008 pool->sp_lastidlecheck = time_uptime; 1009 svc_checkidle(pool); 1010 } 1011 1012 xprt = st->st_xprt; 1013 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { 1014 /* 1015 * Enforce maxthreads count. 1016 */ 1017 if (pool->sp_threadcount > pool->sp_maxthreads) 1018 break; 1019 1020 /* 1021 * Before sleeping, see if we can find an 1022 * active transport which isn't being serviced 1023 * by a thread. 1024 */ 1025 if (svc_request_space_available(pool)) { 1026 TAILQ_FOREACH(xprt, &pool->sp_active, 1027 xp_alink) { 1028 if (!xprt->xp_thread) { 1029 SVC_ACQUIRE(xprt); 1030 xprt->xp_thread = st; 1031 st->st_xprt = xprt; 1032 break; 1033 } 1034 } 1035 } 1036 if (st->st_xprt) 1037 continue; 1038 1039 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); 1040 error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock, 1041 5 * hz); 1042 LIST_REMOVE(st, st_ilink); 1043 1044 /* 1045 * Reduce worker thread count when idle. 1046 */ 1047 if (error == EWOULDBLOCK) { 1048 if (!ismaster 1049 && (pool->sp_threadcount 1050 > pool->sp_minthreads) 1051 && !st->st_xprt 1052 && STAILQ_EMPTY(&st->st_reqs)) 1053 break; 1054 } 1055 if (error == EWOULDBLOCK) 1056 continue; 1057 if (error) { 1058 if (pool->sp_state != SVCPOOL_CLOSING) { 1059 mtx_unlock(&pool->sp_lock); 1060 svc_exit(pool); 1061 mtx_lock(&pool->sp_lock); 1062 } 1063 break; 1064 } 1065 1066 if (pool->sp_state == SVCPOOL_THREADWANTED) { 1067 pool->sp_state = SVCPOOL_THREADSTARTING; 1068 pool->sp_lastcreatetime = time_uptime; 1069 mtx_unlock(&pool->sp_lock); 1070 svc_new_thread(pool); 1071 mtx_lock(&pool->sp_lock); 1072 } 1073 continue; 1074 } 1075 1076 if (xprt) { 1077 /* 1078 * Drain the transport socket and queue up any 1079 * RPCs. 1080 */ 1081 xprt->xp_lastactive = time_uptime; 1082 stat = XPRT_IDLE; 1083 do { 1084 if (!svc_request_space_available(pool)) 1085 break; 1086 rqstp = NULL; 1087 mtx_unlock(&pool->sp_lock); 1088 stat = svc_getreq(xprt, &rqstp); 1089 mtx_lock(&pool->sp_lock); 1090 if (rqstp) { 1091 /* 1092 * See if the application has 1093 * a preference for some other 1094 * thread. 1095 */ 1096 stpref = st; 1097 if (pool->sp_assign) 1098 stpref = pool->sp_assign(st, 1099 rqstp); 1100 1101 pool->sp_space_used += 1102 rqstp->rq_size; 1103 if (pool->sp_space_used 1104 > pool->sp_space_used_highest) 1105 pool->sp_space_used_highest = 1106 pool->sp_space_used; 1107 rqstp->rq_thread = stpref; 1108 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1109 rqstp, rq_link); 1110 stpref->st_reqcount++; 1111 1112 /* 1113 * If we assigned the request 1114 * to another thread, make 1115 * sure its awake and continue 1116 * reading from the 1117 * socket. Otherwise, try to 1118 * find some other thread to 1119 * read from the socket and 1120 * execute the request 1121 * immediately. 1122 */ 1123 if (stpref != st) { 1124 cv_signal(&stpref->st_cond); 1125 continue; 1126 } else { 1127 break; 1128 } 1129 } 1130 } while (stat == XPRT_MOREREQS 1131 && pool->sp_state != SVCPOOL_CLOSING); 1132 1133 /* 1134 * Move this transport to the end of the 1135 * active list to ensure fairness when 1136 * multiple transports are active. If this was 1137 * the last queued request, svc_getreq will 1138 * end up calling xprt_inactive to remove from 1139 * the active list. 1140 */ 1141 xprt->xp_thread = NULL; 1142 st->st_xprt = NULL; 1143 if (xprt->xp_active) { 1144 xprt_assignthread(xprt); 1145 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 1146 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, 1147 xp_alink); 1148 } 1149 mtx_unlock(&pool->sp_lock); 1150 SVC_RELEASE(xprt); 1151 mtx_lock(&pool->sp_lock); 1152 } 1153 1154 /* 1155 * Execute what we have queued. 1156 */ 1157 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1158 size_t sz = rqstp->rq_size; 1159 mtx_unlock(&pool->sp_lock); 1160 svc_executereq(rqstp); 1161 mtx_lock(&pool->sp_lock); 1162 pool->sp_space_used -= sz; 1163 } 1164 } 1165 1166 if (st->st_xprt) { 1167 xprt = st->st_xprt; 1168 st->st_xprt = NULL; 1169 SVC_RELEASE(xprt); 1170 } 1171 1172 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1173 LIST_REMOVE(st, st_link); 1174 pool->sp_threadcount--; 1175 1176 mtx_unlock(&pool->sp_lock); 1177 1178 cv_destroy(&st->st_cond); 1179 mem_free(st, sizeof(*st)); 1180 1181 if (!ismaster) 1182 wakeup(pool); 1183 } 1184 1185 static void 1186 svc_thread_start(void *arg) 1187 { 1188 1189 svc_run_internal((SVCPOOL *) arg, FALSE); 1190 kthread_exit(); 1191 } 1192 1193 static void 1194 svc_new_thread(SVCPOOL *pool) 1195 { 1196 struct thread *td; 1197 1198 pool->sp_threadcount++; 1199 kthread_add(svc_thread_start, pool, 1200 pool->sp_proc, &td, 0, 0, 1201 "%s: service", pool->sp_name); 1202 } 1203 1204 void 1205 svc_run(SVCPOOL *pool) 1206 { 1207 int i; 1208 struct proc *p; 1209 struct thread *td; 1210 1211 p = curproc; 1212 td = curthread; 1213 snprintf(td->td_name, sizeof(td->td_name), 1214 "%s: master", pool->sp_name); 1215 pool->sp_state = SVCPOOL_ACTIVE; 1216 pool->sp_proc = p; 1217 pool->sp_lastcreatetime = time_uptime; 1218 pool->sp_threadcount = 1; 1219 1220 for (i = 1; i < pool->sp_minthreads; i++) { 1221 svc_new_thread(pool); 1222 } 1223 1224 svc_run_internal(pool, TRUE); 1225 1226 mtx_lock(&pool->sp_lock); 1227 while (pool->sp_threadcount > 0) 1228 msleep(pool, &pool->sp_lock, 0, "svcexit", 0); 1229 mtx_unlock(&pool->sp_lock); 1230 } 1231 1232 void 1233 svc_exit(SVCPOOL *pool) 1234 { 1235 SVCTHREAD *st; 1236 1237 mtx_lock(&pool->sp_lock); 1238 1239 pool->sp_state = SVCPOOL_CLOSING; 1240 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 1241 cv_signal(&st->st_cond); 1242 1243 mtx_unlock(&pool->sp_lock); 1244 } 1245 1246 bool_t 1247 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1248 { 1249 struct mbuf *m; 1250 XDR xdrs; 1251 bool_t stat; 1252 1253 m = rqstp->rq_args; 1254 rqstp->rq_args = NULL; 1255 1256 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1257 stat = xargs(&xdrs, args); 1258 XDR_DESTROY(&xdrs); 1259 1260 return (stat); 1261 } 1262 1263 bool_t 1264 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1265 { 1266 XDR xdrs; 1267 1268 if (rqstp->rq_addr) { 1269 free(rqstp->rq_addr, M_SONAME); 1270 rqstp->rq_addr = NULL; 1271 } 1272 1273 xdrs.x_op = XDR_FREE; 1274 return (xargs(&xdrs, args)); 1275 } 1276 1277 void 1278 svc_freereq(struct svc_req *rqstp) 1279 { 1280 SVCTHREAD *st; 1281 SVCXPRT *xprt; 1282 SVCPOOL *pool; 1283 1284 st = rqstp->rq_thread; 1285 xprt = rqstp->rq_xprt; 1286 if (xprt) 1287 pool = xprt->xp_pool; 1288 else 1289 pool = NULL; 1290 if (st) { 1291 mtx_lock(&pool->sp_lock); 1292 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), 1293 ("Freeing request out of order")); 1294 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1295 st->st_reqcount--; 1296 if (pool->sp_done) 1297 pool->sp_done(st, rqstp); 1298 mtx_unlock(&pool->sp_lock); 1299 } 1300 1301 if (rqstp->rq_auth.svc_ah_ops) 1302 SVCAUTH_RELEASE(&rqstp->rq_auth); 1303 1304 if (rqstp->rq_xprt) { 1305 SVC_RELEASE(rqstp->rq_xprt); 1306 } 1307 1308 if (rqstp->rq_addr) 1309 free(rqstp->rq_addr, M_SONAME); 1310 1311 if (rqstp->rq_args) 1312 m_freem(rqstp->rq_args); 1313 1314 free(rqstp, M_RPC); 1315 } 1316