1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 34 #endif 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 /* 39 * svc.c, Server-side remote procedure call interface. 40 * 41 * There are two sets of procedures here. The xprt routines are 42 * for handling transport handles. The svc routines handle the 43 * list of service routines. 44 * 45 * Copyright (C) 1984, Sun Microsystems, Inc. 46 */ 47 48 #include <sys/param.h> 49 #include <sys/lock.h> 50 #include <sys/kernel.h> 51 #include <sys/kthread.h> 52 #include <sys/malloc.h> 53 #include <sys/mbuf.h> 54 #include <sys/mutex.h> 55 #include <sys/proc.h> 56 #include <sys/queue.h> 57 #include <sys/socketvar.h> 58 #include <sys/systm.h> 59 #include <sys/smp.h> 60 #include <sys/sx.h> 61 #include <sys/ucred.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpcb_clnt.h> 65 #include <rpc/replay.h> 66 67 #include <rpc/rpc_com.h> 68 69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 71 72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 73 char *); 74 static void svc_new_thread(SVCGROUP *grp); 75 static void xprt_unregister_locked(SVCXPRT *xprt); 76 static void svc_change_space_used(SVCPOOL *pool, long delta); 77 static bool_t svc_request_space_available(SVCPOOL *pool); 78 79 /* *************** SVCXPRT related stuff **************** */ 80 81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 84 85 SVCPOOL* 86 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 87 { 88 SVCPOOL *pool; 89 SVCGROUP *grp; 90 int g; 91 92 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 93 94 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 95 pool->sp_name = name; 96 pool->sp_state = SVCPOOL_INIT; 97 pool->sp_proc = NULL; 98 TAILQ_INIT(&pool->sp_callouts); 99 TAILQ_INIT(&pool->sp_lcallouts); 100 pool->sp_minthreads = 1; 101 pool->sp_maxthreads = 1; 102 pool->sp_groupcount = 1; 103 for (g = 0; g < SVC_MAXGROUPS; g++) { 104 grp = &pool->sp_groups[g]; 105 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 106 grp->sg_pool = pool; 107 grp->sg_state = SVCPOOL_ACTIVE; 108 TAILQ_INIT(&grp->sg_xlist); 109 TAILQ_INIT(&grp->sg_active); 110 LIST_INIT(&grp->sg_idlethreads); 111 grp->sg_minthreads = 1; 112 grp->sg_maxthreads = 1; 113 } 114 115 /* 116 * Don't use more than a quarter of mbuf clusters. Nota bene: 117 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 118 * on LP64 architectures, so cast to u_long to avoid undefined 119 * behavior. (ILP32 architectures cannot have nmbclusters 120 * large enough to overflow for other reasons.) 121 */ 122 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 123 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 124 125 sysctl_ctx_init(&pool->sp_sysctl); 126 if (sysctl_base) { 127 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 128 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 129 pool, 0, svcpool_minthread_sysctl, "I", 130 "Minimal number of threads"); 131 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 132 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 133 pool, 0, svcpool_maxthread_sysctl, "I", 134 "Maximal number of threads"); 135 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 136 "threads", CTLTYPE_INT | CTLFLAG_RD, 137 pool, 0, svcpool_threads_sysctl, "I", 138 "Current number of threads"); 139 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 140 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 141 "Number of thread groups"); 142 143 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 144 "request_space_used", CTLFLAG_RD, 145 &pool->sp_space_used, 146 "Space in parsed but not handled requests."); 147 148 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 149 "request_space_used_highest", CTLFLAG_RD, 150 &pool->sp_space_used_highest, 151 "Highest space used since reboot."); 152 153 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 154 "request_space_high", CTLFLAG_RW, 155 &pool->sp_space_high, 156 "Maximum space in parsed but not handled requests."); 157 158 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 159 "request_space_low", CTLFLAG_RW, 160 &pool->sp_space_low, 161 "Low water mark for request space."); 162 163 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 164 "request_space_throttled", CTLFLAG_RD, 165 &pool->sp_space_throttled, 0, 166 "Whether nfs requests are currently throttled"); 167 168 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 169 "request_space_throttle_count", CTLFLAG_RD, 170 &pool->sp_space_throttle_count, 0, 171 "Count of times throttling based on request space has occurred"); 172 } 173 174 return pool; 175 } 176 177 void 178 svcpool_destroy(SVCPOOL *pool) 179 { 180 SVCGROUP *grp; 181 SVCXPRT *xprt, *nxprt; 182 struct svc_callout *s; 183 struct svc_loss_callout *sl; 184 struct svcxprt_list cleanup; 185 int g; 186 187 TAILQ_INIT(&cleanup); 188 189 for (g = 0; g < SVC_MAXGROUPS; g++) { 190 grp = &pool->sp_groups[g]; 191 mtx_lock(&grp->sg_lock); 192 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 193 xprt_unregister_locked(xprt); 194 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 195 } 196 mtx_unlock(&grp->sg_lock); 197 } 198 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 199 SVC_RELEASE(xprt); 200 } 201 202 mtx_lock(&pool->sp_lock); 203 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 204 mtx_unlock(&pool->sp_lock); 205 svc_unreg(pool, s->sc_prog, s->sc_vers); 206 mtx_lock(&pool->sp_lock); 207 } 208 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 209 mtx_unlock(&pool->sp_lock); 210 svc_loss_unreg(pool, sl->slc_dispatch); 211 mtx_lock(&pool->sp_lock); 212 } 213 mtx_unlock(&pool->sp_lock); 214 215 for (g = 0; g < SVC_MAXGROUPS; g++) { 216 grp = &pool->sp_groups[g]; 217 mtx_destroy(&grp->sg_lock); 218 } 219 mtx_destroy(&pool->sp_lock); 220 221 if (pool->sp_rcache) 222 replay_freecache(pool->sp_rcache); 223 224 sysctl_ctx_free(&pool->sp_sysctl); 225 free(pool, M_RPC); 226 } 227 228 /* 229 * Sysctl handler to get the present thread count on a pool 230 */ 231 static int 232 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 233 { 234 SVCPOOL *pool; 235 int threads, error, g; 236 237 pool = oidp->oid_arg1; 238 threads = 0; 239 mtx_lock(&pool->sp_lock); 240 for (g = 0; g < pool->sp_groupcount; g++) 241 threads += pool->sp_groups[g].sg_threadcount; 242 mtx_unlock(&pool->sp_lock); 243 error = sysctl_handle_int(oidp, &threads, 0, req); 244 return (error); 245 } 246 247 /* 248 * Sysctl handler to set the minimum thread count on a pool 249 */ 250 static int 251 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 252 { 253 SVCPOOL *pool; 254 int newminthreads, error, g; 255 256 pool = oidp->oid_arg1; 257 newminthreads = pool->sp_minthreads; 258 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 259 if (error == 0 && newminthreads != pool->sp_minthreads) { 260 if (newminthreads > pool->sp_maxthreads) 261 return (EINVAL); 262 mtx_lock(&pool->sp_lock); 263 pool->sp_minthreads = newminthreads; 264 for (g = 0; g < pool->sp_groupcount; g++) { 265 pool->sp_groups[g].sg_minthreads = max(1, 266 pool->sp_minthreads / pool->sp_groupcount); 267 } 268 mtx_unlock(&pool->sp_lock); 269 } 270 return (error); 271 } 272 273 /* 274 * Sysctl handler to set the maximum thread count on a pool 275 */ 276 static int 277 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 278 { 279 SVCPOOL *pool; 280 int newmaxthreads, error, g; 281 282 pool = oidp->oid_arg1; 283 newmaxthreads = pool->sp_maxthreads; 284 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 285 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 286 if (newmaxthreads < pool->sp_minthreads) 287 return (EINVAL); 288 mtx_lock(&pool->sp_lock); 289 pool->sp_maxthreads = newmaxthreads; 290 for (g = 0; g < pool->sp_groupcount; g++) { 291 pool->sp_groups[g].sg_maxthreads = max(1, 292 pool->sp_maxthreads / pool->sp_groupcount); 293 } 294 mtx_unlock(&pool->sp_lock); 295 } 296 return (error); 297 } 298 299 /* 300 * Activate a transport handle. 301 */ 302 void 303 xprt_register(SVCXPRT *xprt) 304 { 305 SVCPOOL *pool = xprt->xp_pool; 306 SVCGROUP *grp; 307 int g; 308 309 SVC_ACQUIRE(xprt); 310 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 311 xprt->xp_group = grp = &pool->sp_groups[g]; 312 mtx_lock(&grp->sg_lock); 313 xprt->xp_registered = TRUE; 314 xprt->xp_active = FALSE; 315 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 316 mtx_unlock(&grp->sg_lock); 317 } 318 319 /* 320 * De-activate a transport handle. Note: the locked version doesn't 321 * release the transport - caller must do that after dropping the pool 322 * lock. 323 */ 324 static void 325 xprt_unregister_locked(SVCXPRT *xprt) 326 { 327 SVCGROUP *grp = xprt->xp_group; 328 329 mtx_assert(&grp->sg_lock, MA_OWNED); 330 KASSERT(xprt->xp_registered == TRUE, 331 ("xprt_unregister_locked: not registered")); 332 xprt_inactive_locked(xprt); 333 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 334 xprt->xp_registered = FALSE; 335 } 336 337 void 338 xprt_unregister(SVCXPRT *xprt) 339 { 340 SVCGROUP *grp = xprt->xp_group; 341 342 mtx_lock(&grp->sg_lock); 343 if (xprt->xp_registered == FALSE) { 344 /* Already unregistered by another thread */ 345 mtx_unlock(&grp->sg_lock); 346 return; 347 } 348 xprt_unregister_locked(xprt); 349 mtx_unlock(&grp->sg_lock); 350 351 SVC_RELEASE(xprt); 352 } 353 354 /* 355 * Attempt to assign a service thread to this transport. 356 */ 357 static int 358 xprt_assignthread(SVCXPRT *xprt) 359 { 360 SVCGROUP *grp = xprt->xp_group; 361 SVCTHREAD *st; 362 363 mtx_assert(&grp->sg_lock, MA_OWNED); 364 st = LIST_FIRST(&grp->sg_idlethreads); 365 if (st) { 366 LIST_REMOVE(st, st_ilink); 367 SVC_ACQUIRE(xprt); 368 xprt->xp_thread = st; 369 st->st_xprt = xprt; 370 cv_signal(&st->st_cond); 371 return (TRUE); 372 } else { 373 /* 374 * See if we can create a new thread. The 375 * actual thread creation happens in 376 * svc_run_internal because our locking state 377 * is poorly defined (we are typically called 378 * from a socket upcall). Don't create more 379 * than one thread per second. 380 */ 381 if (grp->sg_state == SVCPOOL_ACTIVE 382 && grp->sg_lastcreatetime < time_uptime 383 && grp->sg_threadcount < grp->sg_maxthreads) { 384 grp->sg_state = SVCPOOL_THREADWANTED; 385 } 386 } 387 return (FALSE); 388 } 389 390 void 391 xprt_active(SVCXPRT *xprt) 392 { 393 SVCGROUP *grp = xprt->xp_group; 394 395 mtx_lock(&grp->sg_lock); 396 397 if (!xprt->xp_registered) { 398 /* 399 * Race with xprt_unregister - we lose. 400 */ 401 mtx_unlock(&grp->sg_lock); 402 return; 403 } 404 405 if (!xprt->xp_active) { 406 xprt->xp_active = TRUE; 407 if (xprt->xp_thread == NULL) { 408 if (!svc_request_space_available(xprt->xp_pool) || 409 !xprt_assignthread(xprt)) 410 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 411 xp_alink); 412 } 413 } 414 415 mtx_unlock(&grp->sg_lock); 416 } 417 418 void 419 xprt_inactive_locked(SVCXPRT *xprt) 420 { 421 SVCGROUP *grp = xprt->xp_group; 422 423 mtx_assert(&grp->sg_lock, MA_OWNED); 424 if (xprt->xp_active) { 425 if (xprt->xp_thread == NULL) 426 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 427 xprt->xp_active = FALSE; 428 } 429 } 430 431 void 432 xprt_inactive(SVCXPRT *xprt) 433 { 434 SVCGROUP *grp = xprt->xp_group; 435 436 mtx_lock(&grp->sg_lock); 437 xprt_inactive_locked(xprt); 438 mtx_unlock(&grp->sg_lock); 439 } 440 441 /* 442 * Variant of xprt_inactive() for use only when sure that port is 443 * assigned to thread. For example, withing receive handlers. 444 */ 445 void 446 xprt_inactive_self(SVCXPRT *xprt) 447 { 448 449 KASSERT(xprt->xp_thread != NULL, 450 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 451 xprt->xp_active = FALSE; 452 } 453 454 /* 455 * Add a service program to the callout list. 456 * The dispatch routine will be called when a rpc request for this 457 * program number comes in. 458 */ 459 bool_t 460 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 461 void (*dispatch)(struct svc_req *, SVCXPRT *), 462 const struct netconfig *nconf) 463 { 464 SVCPOOL *pool = xprt->xp_pool; 465 struct svc_callout *s; 466 char *netid = NULL; 467 int flag = 0; 468 469 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 470 471 if (xprt->xp_netid) { 472 netid = strdup(xprt->xp_netid, M_RPC); 473 flag = 1; 474 } else if (nconf && nconf->nc_netid) { 475 netid = strdup(nconf->nc_netid, M_RPC); 476 flag = 1; 477 } /* must have been created with svc_raw_create */ 478 if ((netid == NULL) && (flag == 1)) { 479 return (FALSE); 480 } 481 482 mtx_lock(&pool->sp_lock); 483 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 484 if (netid) 485 free(netid, M_RPC); 486 if (s->sc_dispatch == dispatch) 487 goto rpcb_it; /* he is registering another xptr */ 488 mtx_unlock(&pool->sp_lock); 489 return (FALSE); 490 } 491 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 492 if (s == NULL) { 493 if (netid) 494 free(netid, M_RPC); 495 mtx_unlock(&pool->sp_lock); 496 return (FALSE); 497 } 498 499 s->sc_prog = prog; 500 s->sc_vers = vers; 501 s->sc_dispatch = dispatch; 502 s->sc_netid = netid; 503 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 504 505 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 506 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 507 508 rpcb_it: 509 mtx_unlock(&pool->sp_lock); 510 /* now register the information with the local binder service */ 511 if (nconf) { 512 bool_t dummy; 513 struct netconfig tnc; 514 struct netbuf nb; 515 tnc = *nconf; 516 nb.buf = &xprt->xp_ltaddr; 517 nb.len = xprt->xp_ltaddr.ss_len; 518 dummy = rpcb_set(prog, vers, &tnc, &nb); 519 return (dummy); 520 } 521 return (TRUE); 522 } 523 524 /* 525 * Remove a service program from the callout list. 526 */ 527 void 528 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 529 { 530 struct svc_callout *s; 531 532 /* unregister the information anyway */ 533 (void) rpcb_unset(prog, vers, NULL); 534 mtx_lock(&pool->sp_lock); 535 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 536 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 537 if (s->sc_netid) 538 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 539 mem_free(s, sizeof (struct svc_callout)); 540 } 541 mtx_unlock(&pool->sp_lock); 542 } 543 544 /* 545 * Add a service connection loss program to the callout list. 546 * The dispatch routine will be called when some port in ths pool die. 547 */ 548 bool_t 549 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 550 { 551 SVCPOOL *pool = xprt->xp_pool; 552 struct svc_loss_callout *s; 553 554 mtx_lock(&pool->sp_lock); 555 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 556 if (s->slc_dispatch == dispatch) 557 break; 558 } 559 if (s != NULL) { 560 mtx_unlock(&pool->sp_lock); 561 return (TRUE); 562 } 563 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 564 if (s == NULL) { 565 mtx_unlock(&pool->sp_lock); 566 return (FALSE); 567 } 568 s->slc_dispatch = dispatch; 569 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 570 mtx_unlock(&pool->sp_lock); 571 return (TRUE); 572 } 573 574 /* 575 * Remove a service connection loss program from the callout list. 576 */ 577 void 578 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 579 { 580 struct svc_loss_callout *s; 581 582 mtx_lock(&pool->sp_lock); 583 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 584 if (s->slc_dispatch == dispatch) { 585 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 586 free(s, M_RPC); 587 break; 588 } 589 } 590 mtx_unlock(&pool->sp_lock); 591 } 592 593 /* ********************** CALLOUT list related stuff ************* */ 594 595 /* 596 * Search the callout list for a program number, return the callout 597 * struct. 598 */ 599 static struct svc_callout * 600 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 601 { 602 struct svc_callout *s; 603 604 mtx_assert(&pool->sp_lock, MA_OWNED); 605 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 606 if (s->sc_prog == prog && s->sc_vers == vers 607 && (netid == NULL || s->sc_netid == NULL || 608 strcmp(netid, s->sc_netid) == 0)) 609 break; 610 } 611 612 return (s); 613 } 614 615 /* ******************* REPLY GENERATION ROUTINES ************ */ 616 617 static bool_t 618 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 619 struct mbuf *body) 620 { 621 SVCXPRT *xprt = rqstp->rq_xprt; 622 bool_t ok; 623 624 if (rqstp->rq_args) { 625 m_freem(rqstp->rq_args); 626 rqstp->rq_args = NULL; 627 } 628 629 if (xprt->xp_pool->sp_rcache) 630 replay_setreply(xprt->xp_pool->sp_rcache, 631 rply, svc_getrpccaller(rqstp), body); 632 633 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 634 return (FALSE); 635 636 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 637 if (rqstp->rq_addr) { 638 free(rqstp->rq_addr, M_SONAME); 639 rqstp->rq_addr = NULL; 640 } 641 642 return (ok); 643 } 644 645 /* 646 * Send a reply to an rpc request 647 */ 648 bool_t 649 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 650 { 651 struct rpc_msg rply; 652 struct mbuf *m; 653 XDR xdrs; 654 bool_t ok; 655 656 rply.rm_xid = rqstp->rq_xid; 657 rply.rm_direction = REPLY; 658 rply.rm_reply.rp_stat = MSG_ACCEPTED; 659 rply.acpted_rply.ar_verf = rqstp->rq_verf; 660 rply.acpted_rply.ar_stat = SUCCESS; 661 rply.acpted_rply.ar_results.where = NULL; 662 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 663 664 m = m_getcl(M_WAITOK, MT_DATA, 0); 665 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 666 ok = xdr_results(&xdrs, xdr_location); 667 XDR_DESTROY(&xdrs); 668 669 if (ok) { 670 return (svc_sendreply_common(rqstp, &rply, m)); 671 } else { 672 m_freem(m); 673 return (FALSE); 674 } 675 } 676 677 bool_t 678 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 679 { 680 struct rpc_msg rply; 681 682 rply.rm_xid = rqstp->rq_xid; 683 rply.rm_direction = REPLY; 684 rply.rm_reply.rp_stat = MSG_ACCEPTED; 685 rply.acpted_rply.ar_verf = rqstp->rq_verf; 686 rply.acpted_rply.ar_stat = SUCCESS; 687 rply.acpted_rply.ar_results.where = NULL; 688 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 689 690 return (svc_sendreply_common(rqstp, &rply, m)); 691 } 692 693 /* 694 * No procedure error reply 695 */ 696 void 697 svcerr_noproc(struct svc_req *rqstp) 698 { 699 SVCXPRT *xprt = rqstp->rq_xprt; 700 struct rpc_msg rply; 701 702 rply.rm_xid = rqstp->rq_xid; 703 rply.rm_direction = REPLY; 704 rply.rm_reply.rp_stat = MSG_ACCEPTED; 705 rply.acpted_rply.ar_verf = rqstp->rq_verf; 706 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 707 708 if (xprt->xp_pool->sp_rcache) 709 replay_setreply(xprt->xp_pool->sp_rcache, 710 &rply, svc_getrpccaller(rqstp), NULL); 711 712 svc_sendreply_common(rqstp, &rply, NULL); 713 } 714 715 /* 716 * Can't decode args error reply 717 */ 718 void 719 svcerr_decode(struct svc_req *rqstp) 720 { 721 SVCXPRT *xprt = rqstp->rq_xprt; 722 struct rpc_msg rply; 723 724 rply.rm_xid = rqstp->rq_xid; 725 rply.rm_direction = REPLY; 726 rply.rm_reply.rp_stat = MSG_ACCEPTED; 727 rply.acpted_rply.ar_verf = rqstp->rq_verf; 728 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 729 730 if (xprt->xp_pool->sp_rcache) 731 replay_setreply(xprt->xp_pool->sp_rcache, 732 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 733 734 svc_sendreply_common(rqstp, &rply, NULL); 735 } 736 737 /* 738 * Some system error 739 */ 740 void 741 svcerr_systemerr(struct svc_req *rqstp) 742 { 743 SVCXPRT *xprt = rqstp->rq_xprt; 744 struct rpc_msg rply; 745 746 rply.rm_xid = rqstp->rq_xid; 747 rply.rm_direction = REPLY; 748 rply.rm_reply.rp_stat = MSG_ACCEPTED; 749 rply.acpted_rply.ar_verf = rqstp->rq_verf; 750 rply.acpted_rply.ar_stat = SYSTEM_ERR; 751 752 if (xprt->xp_pool->sp_rcache) 753 replay_setreply(xprt->xp_pool->sp_rcache, 754 &rply, svc_getrpccaller(rqstp), NULL); 755 756 svc_sendreply_common(rqstp, &rply, NULL); 757 } 758 759 /* 760 * Authentication error reply 761 */ 762 void 763 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 764 { 765 SVCXPRT *xprt = rqstp->rq_xprt; 766 struct rpc_msg rply; 767 768 rply.rm_xid = rqstp->rq_xid; 769 rply.rm_direction = REPLY; 770 rply.rm_reply.rp_stat = MSG_DENIED; 771 rply.rjcted_rply.rj_stat = AUTH_ERROR; 772 rply.rjcted_rply.rj_why = why; 773 774 if (xprt->xp_pool->sp_rcache) 775 replay_setreply(xprt->xp_pool->sp_rcache, 776 &rply, svc_getrpccaller(rqstp), NULL); 777 778 svc_sendreply_common(rqstp, &rply, NULL); 779 } 780 781 /* 782 * Auth too weak error reply 783 */ 784 void 785 svcerr_weakauth(struct svc_req *rqstp) 786 { 787 788 svcerr_auth(rqstp, AUTH_TOOWEAK); 789 } 790 791 /* 792 * Program unavailable error reply 793 */ 794 void 795 svcerr_noprog(struct svc_req *rqstp) 796 { 797 SVCXPRT *xprt = rqstp->rq_xprt; 798 struct rpc_msg rply; 799 800 rply.rm_xid = rqstp->rq_xid; 801 rply.rm_direction = REPLY; 802 rply.rm_reply.rp_stat = MSG_ACCEPTED; 803 rply.acpted_rply.ar_verf = rqstp->rq_verf; 804 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 805 806 if (xprt->xp_pool->sp_rcache) 807 replay_setreply(xprt->xp_pool->sp_rcache, 808 &rply, svc_getrpccaller(rqstp), NULL); 809 810 svc_sendreply_common(rqstp, &rply, NULL); 811 } 812 813 /* 814 * Program version mismatch error reply 815 */ 816 void 817 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 818 { 819 SVCXPRT *xprt = rqstp->rq_xprt; 820 struct rpc_msg rply; 821 822 rply.rm_xid = rqstp->rq_xid; 823 rply.rm_direction = REPLY; 824 rply.rm_reply.rp_stat = MSG_ACCEPTED; 825 rply.acpted_rply.ar_verf = rqstp->rq_verf; 826 rply.acpted_rply.ar_stat = PROG_MISMATCH; 827 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 828 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 829 830 if (xprt->xp_pool->sp_rcache) 831 replay_setreply(xprt->xp_pool->sp_rcache, 832 &rply, svc_getrpccaller(rqstp), NULL); 833 834 svc_sendreply_common(rqstp, &rply, NULL); 835 } 836 837 /* 838 * Allocate a new server transport structure. All fields are 839 * initialized to zero and xp_p3 is initialized to point at an 840 * extension structure to hold various flags and authentication 841 * parameters. 842 */ 843 SVCXPRT * 844 svc_xprt_alloc() 845 { 846 SVCXPRT *xprt; 847 SVCXPRT_EXT *ext; 848 849 xprt = mem_alloc(sizeof(SVCXPRT)); 850 memset(xprt, 0, sizeof(SVCXPRT)); 851 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 852 memset(ext, 0, sizeof(SVCXPRT_EXT)); 853 xprt->xp_p3 = ext; 854 refcount_init(&xprt->xp_refs, 1); 855 856 return (xprt); 857 } 858 859 /* 860 * Free a server transport structure. 861 */ 862 void 863 svc_xprt_free(xprt) 864 SVCXPRT *xprt; 865 { 866 867 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 868 mem_free(xprt, sizeof(SVCXPRT)); 869 } 870 871 /* ******************* SERVER INPUT STUFF ******************* */ 872 873 /* 874 * Read RPC requests from a transport and queue them to be 875 * executed. We handle authentication and replay cache replies here. 876 * Actually dispatching the RPC is deferred till svc_executereq. 877 */ 878 static enum xprt_stat 879 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 880 { 881 SVCPOOL *pool = xprt->xp_pool; 882 struct svc_req *r; 883 struct rpc_msg msg; 884 struct mbuf *args; 885 struct svc_loss_callout *s; 886 enum xprt_stat stat; 887 888 /* now receive msgs from xprtprt (support batch calls) */ 889 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 890 891 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 892 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 893 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 894 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 895 enum auth_stat why; 896 897 /* 898 * Handle replays and authenticate before queuing the 899 * request to be executed. 900 */ 901 SVC_ACQUIRE(xprt); 902 r->rq_xprt = xprt; 903 if (pool->sp_rcache) { 904 struct rpc_msg repmsg; 905 struct mbuf *repbody; 906 enum replay_state rs; 907 rs = replay_find(pool->sp_rcache, &msg, 908 svc_getrpccaller(r), &repmsg, &repbody); 909 switch (rs) { 910 case RS_NEW: 911 break; 912 case RS_DONE: 913 SVC_REPLY(xprt, &repmsg, r->rq_addr, 914 repbody, &r->rq_reply_seq); 915 if (r->rq_addr) { 916 free(r->rq_addr, M_SONAME); 917 r->rq_addr = NULL; 918 } 919 m_freem(args); 920 goto call_done; 921 922 default: 923 m_freem(args); 924 goto call_done; 925 } 926 } 927 928 r->rq_xid = msg.rm_xid; 929 r->rq_prog = msg.rm_call.cb_prog; 930 r->rq_vers = msg.rm_call.cb_vers; 931 r->rq_proc = msg.rm_call.cb_proc; 932 r->rq_size = sizeof(*r) + m_length(args, NULL); 933 r->rq_args = args; 934 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 935 /* 936 * RPCSEC_GSS uses this return code 937 * for requests that form part of its 938 * context establishment protocol and 939 * should not be dispatched to the 940 * application. 941 */ 942 if (why != RPCSEC_GSS_NODISPATCH) 943 svcerr_auth(r, why); 944 goto call_done; 945 } 946 947 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 948 svcerr_decode(r); 949 goto call_done; 950 } 951 952 /* 953 * Everything checks out, return request to caller. 954 */ 955 *rqstp_ret = r; 956 r = NULL; 957 } 958 call_done: 959 if (r) { 960 svc_freereq(r); 961 r = NULL; 962 } 963 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 964 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 965 (*s->slc_dispatch)(xprt); 966 xprt_unregister(xprt); 967 } 968 969 return (stat); 970 } 971 972 static void 973 svc_executereq(struct svc_req *rqstp) 974 { 975 SVCXPRT *xprt = rqstp->rq_xprt; 976 SVCPOOL *pool = xprt->xp_pool; 977 int prog_found; 978 rpcvers_t low_vers; 979 rpcvers_t high_vers; 980 struct svc_callout *s; 981 982 /* now match message with a registered service*/ 983 prog_found = FALSE; 984 low_vers = (rpcvers_t) -1L; 985 high_vers = (rpcvers_t) 0L; 986 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 987 if (s->sc_prog == rqstp->rq_prog) { 988 if (s->sc_vers == rqstp->rq_vers) { 989 /* 990 * We hand ownership of r to the 991 * dispatch method - they must call 992 * svc_freereq. 993 */ 994 (*s->sc_dispatch)(rqstp, xprt); 995 return; 996 } /* found correct version */ 997 prog_found = TRUE; 998 if (s->sc_vers < low_vers) 999 low_vers = s->sc_vers; 1000 if (s->sc_vers > high_vers) 1001 high_vers = s->sc_vers; 1002 } /* found correct program */ 1003 } 1004 1005 /* 1006 * if we got here, the program or version 1007 * is not served ... 1008 */ 1009 if (prog_found) 1010 svcerr_progvers(rqstp, low_vers, high_vers); 1011 else 1012 svcerr_noprog(rqstp); 1013 1014 svc_freereq(rqstp); 1015 } 1016 1017 static void 1018 svc_checkidle(SVCGROUP *grp) 1019 { 1020 SVCXPRT *xprt, *nxprt; 1021 time_t timo; 1022 struct svcxprt_list cleanup; 1023 1024 TAILQ_INIT(&cleanup); 1025 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1026 /* 1027 * Only some transports have idle timers. Don't time 1028 * something out which is just waking up. 1029 */ 1030 if (!xprt->xp_idletimeout || xprt->xp_thread) 1031 continue; 1032 1033 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1034 if (time_uptime > timo) { 1035 xprt_unregister_locked(xprt); 1036 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1037 } 1038 } 1039 1040 mtx_unlock(&grp->sg_lock); 1041 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1042 SVC_RELEASE(xprt); 1043 } 1044 mtx_lock(&grp->sg_lock); 1045 } 1046 1047 static void 1048 svc_assign_waiting_sockets(SVCPOOL *pool) 1049 { 1050 SVCGROUP *grp; 1051 SVCXPRT *xprt; 1052 int g; 1053 1054 for (g = 0; g < pool->sp_groupcount; g++) { 1055 grp = &pool->sp_groups[g]; 1056 mtx_lock(&grp->sg_lock); 1057 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1058 if (xprt_assignthread(xprt)) 1059 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1060 else 1061 break; 1062 } 1063 mtx_unlock(&grp->sg_lock); 1064 } 1065 } 1066 1067 static void 1068 svc_change_space_used(SVCPOOL *pool, long delta) 1069 { 1070 unsigned long value; 1071 1072 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1073 if (delta > 0) { 1074 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1075 pool->sp_space_throttled = TRUE; 1076 pool->sp_space_throttle_count++; 1077 } 1078 if (value > pool->sp_space_used_highest) 1079 pool->sp_space_used_highest = value; 1080 } else { 1081 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1082 pool->sp_space_throttled = FALSE; 1083 svc_assign_waiting_sockets(pool); 1084 } 1085 } 1086 } 1087 1088 static bool_t 1089 svc_request_space_available(SVCPOOL *pool) 1090 { 1091 1092 if (pool->sp_space_throttled) 1093 return (FALSE); 1094 return (TRUE); 1095 } 1096 1097 static void 1098 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1099 { 1100 SVCPOOL *pool = grp->sg_pool; 1101 SVCTHREAD *st, *stpref; 1102 SVCXPRT *xprt; 1103 enum xprt_stat stat; 1104 struct svc_req *rqstp; 1105 struct proc *p; 1106 long sz; 1107 int error; 1108 1109 st = mem_alloc(sizeof(*st)); 1110 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1111 st->st_pool = pool; 1112 st->st_xprt = NULL; 1113 STAILQ_INIT(&st->st_reqs); 1114 cv_init(&st->st_cond, "rpcsvc"); 1115 1116 mtx_lock(&grp->sg_lock); 1117 1118 /* 1119 * If we are a new thread which was spawned to cope with 1120 * increased load, set the state back to SVCPOOL_ACTIVE. 1121 */ 1122 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1123 grp->sg_state = SVCPOOL_ACTIVE; 1124 1125 while (grp->sg_state != SVCPOOL_CLOSING) { 1126 /* 1127 * Create new thread if requested. 1128 */ 1129 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1130 grp->sg_state = SVCPOOL_THREADSTARTING; 1131 grp->sg_lastcreatetime = time_uptime; 1132 mtx_unlock(&grp->sg_lock); 1133 svc_new_thread(grp); 1134 mtx_lock(&grp->sg_lock); 1135 continue; 1136 } 1137 1138 /* 1139 * Check for idle transports once per second. 1140 */ 1141 if (time_uptime > grp->sg_lastidlecheck) { 1142 grp->sg_lastidlecheck = time_uptime; 1143 svc_checkidle(grp); 1144 } 1145 1146 xprt = st->st_xprt; 1147 if (!xprt) { 1148 /* 1149 * Enforce maxthreads count. 1150 */ 1151 if (grp->sg_threadcount > grp->sg_maxthreads) 1152 break; 1153 1154 /* 1155 * Before sleeping, see if we can find an 1156 * active transport which isn't being serviced 1157 * by a thread. 1158 */ 1159 if (svc_request_space_available(pool) && 1160 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1161 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1162 SVC_ACQUIRE(xprt); 1163 xprt->xp_thread = st; 1164 st->st_xprt = xprt; 1165 continue; 1166 } 1167 1168 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1169 if (ismaster || (!ismaster && 1170 grp->sg_threadcount > grp->sg_minthreads)) 1171 error = cv_timedwait_sig(&st->st_cond, 1172 &grp->sg_lock, 5 * hz); 1173 else 1174 error = cv_wait_sig(&st->st_cond, 1175 &grp->sg_lock); 1176 if (st->st_xprt == NULL) 1177 LIST_REMOVE(st, st_ilink); 1178 1179 /* 1180 * Reduce worker thread count when idle. 1181 */ 1182 if (error == EWOULDBLOCK) { 1183 if (!ismaster 1184 && (grp->sg_threadcount 1185 > grp->sg_minthreads) 1186 && !st->st_xprt) 1187 break; 1188 } else if (error != 0) { 1189 KASSERT(error == EINTR || error == ERESTART, 1190 ("non-signal error %d", error)); 1191 mtx_unlock(&grp->sg_lock); 1192 p = curproc; 1193 PROC_LOCK(p); 1194 if (P_SHOULDSTOP(p) || 1195 (p->p_flag & P_TOTAL_STOP) != 0) { 1196 thread_suspend_check(0); 1197 PROC_UNLOCK(p); 1198 mtx_lock(&grp->sg_lock); 1199 } else { 1200 PROC_UNLOCK(p); 1201 svc_exit(pool); 1202 mtx_lock(&grp->sg_lock); 1203 break; 1204 } 1205 } 1206 continue; 1207 } 1208 mtx_unlock(&grp->sg_lock); 1209 1210 /* 1211 * Drain the transport socket and queue up any RPCs. 1212 */ 1213 xprt->xp_lastactive = time_uptime; 1214 do { 1215 if (!svc_request_space_available(pool)) 1216 break; 1217 rqstp = NULL; 1218 stat = svc_getreq(xprt, &rqstp); 1219 if (rqstp) { 1220 svc_change_space_used(pool, rqstp->rq_size); 1221 /* 1222 * See if the application has a preference 1223 * for some other thread. 1224 */ 1225 if (pool->sp_assign) { 1226 stpref = pool->sp_assign(st, rqstp); 1227 rqstp->rq_thread = stpref; 1228 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1229 rqstp, rq_link); 1230 mtx_unlock(&stpref->st_lock); 1231 if (stpref != st) 1232 rqstp = NULL; 1233 } else { 1234 rqstp->rq_thread = st; 1235 STAILQ_INSERT_TAIL(&st->st_reqs, 1236 rqstp, rq_link); 1237 } 1238 } 1239 } while (rqstp == NULL && stat == XPRT_MOREREQS 1240 && grp->sg_state != SVCPOOL_CLOSING); 1241 1242 /* 1243 * Move this transport to the end of the active list to 1244 * ensure fairness when multiple transports are active. 1245 * If this was the last queued request, svc_getreq will end 1246 * up calling xprt_inactive to remove from the active list. 1247 */ 1248 mtx_lock(&grp->sg_lock); 1249 xprt->xp_thread = NULL; 1250 st->st_xprt = NULL; 1251 if (xprt->xp_active) { 1252 if (!svc_request_space_available(pool) || 1253 !xprt_assignthread(xprt)) 1254 TAILQ_INSERT_TAIL(&grp->sg_active, 1255 xprt, xp_alink); 1256 } 1257 mtx_unlock(&grp->sg_lock); 1258 SVC_RELEASE(xprt); 1259 1260 /* 1261 * Execute what we have queued. 1262 */ 1263 mtx_lock(&st->st_lock); 1264 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1265 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1266 mtx_unlock(&st->st_lock); 1267 sz = (long)rqstp->rq_size; 1268 svc_executereq(rqstp); 1269 svc_change_space_used(pool, -sz); 1270 mtx_lock(&st->st_lock); 1271 } 1272 mtx_unlock(&st->st_lock); 1273 mtx_lock(&grp->sg_lock); 1274 } 1275 1276 if (st->st_xprt) { 1277 xprt = st->st_xprt; 1278 st->st_xprt = NULL; 1279 SVC_RELEASE(xprt); 1280 } 1281 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1282 mtx_destroy(&st->st_lock); 1283 cv_destroy(&st->st_cond); 1284 mem_free(st, sizeof(*st)); 1285 1286 grp->sg_threadcount--; 1287 if (!ismaster) 1288 wakeup(grp); 1289 mtx_unlock(&grp->sg_lock); 1290 } 1291 1292 static void 1293 svc_thread_start(void *arg) 1294 { 1295 1296 svc_run_internal((SVCGROUP *) arg, FALSE); 1297 kthread_exit(); 1298 } 1299 1300 static void 1301 svc_new_thread(SVCGROUP *grp) 1302 { 1303 SVCPOOL *pool = grp->sg_pool; 1304 struct thread *td; 1305 1306 grp->sg_threadcount++; 1307 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1308 "%s: service", pool->sp_name); 1309 } 1310 1311 void 1312 svc_run(SVCPOOL *pool) 1313 { 1314 int g, i; 1315 struct proc *p; 1316 struct thread *td; 1317 SVCGROUP *grp; 1318 1319 p = curproc; 1320 td = curthread; 1321 snprintf(td->td_name, sizeof(td->td_name), 1322 "%s: master", pool->sp_name); 1323 pool->sp_state = SVCPOOL_ACTIVE; 1324 pool->sp_proc = p; 1325 1326 /* Choose group count based on number of threads and CPUs. */ 1327 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1328 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1329 for (g = 0; g < pool->sp_groupcount; g++) { 1330 grp = &pool->sp_groups[g]; 1331 grp->sg_minthreads = max(1, 1332 pool->sp_minthreads / pool->sp_groupcount); 1333 grp->sg_maxthreads = max(1, 1334 pool->sp_maxthreads / pool->sp_groupcount); 1335 grp->sg_lastcreatetime = time_uptime; 1336 } 1337 1338 /* Starting threads */ 1339 for (g = 0; g < pool->sp_groupcount; g++) { 1340 grp = &pool->sp_groups[g]; 1341 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1342 svc_new_thread(grp); 1343 } 1344 pool->sp_groups[0].sg_threadcount++; 1345 svc_run_internal(&pool->sp_groups[0], TRUE); 1346 1347 /* Waiting for threads to stop. */ 1348 for (g = 0; g < pool->sp_groupcount; g++) { 1349 grp = &pool->sp_groups[g]; 1350 mtx_lock(&grp->sg_lock); 1351 while (grp->sg_threadcount > 0) 1352 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1353 mtx_unlock(&grp->sg_lock); 1354 } 1355 } 1356 1357 void 1358 svc_exit(SVCPOOL *pool) 1359 { 1360 SVCGROUP *grp; 1361 SVCTHREAD *st; 1362 int g; 1363 1364 pool->sp_state = SVCPOOL_CLOSING; 1365 for (g = 0; g < pool->sp_groupcount; g++) { 1366 grp = &pool->sp_groups[g]; 1367 mtx_lock(&grp->sg_lock); 1368 if (grp->sg_state != SVCPOOL_CLOSING) { 1369 grp->sg_state = SVCPOOL_CLOSING; 1370 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1371 cv_signal(&st->st_cond); 1372 } 1373 mtx_unlock(&grp->sg_lock); 1374 } 1375 } 1376 1377 bool_t 1378 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1379 { 1380 struct mbuf *m; 1381 XDR xdrs; 1382 bool_t stat; 1383 1384 m = rqstp->rq_args; 1385 rqstp->rq_args = NULL; 1386 1387 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1388 stat = xargs(&xdrs, args); 1389 XDR_DESTROY(&xdrs); 1390 1391 return (stat); 1392 } 1393 1394 bool_t 1395 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1396 { 1397 XDR xdrs; 1398 1399 if (rqstp->rq_addr) { 1400 free(rqstp->rq_addr, M_SONAME); 1401 rqstp->rq_addr = NULL; 1402 } 1403 1404 xdrs.x_op = XDR_FREE; 1405 return (xargs(&xdrs, args)); 1406 } 1407 1408 void 1409 svc_freereq(struct svc_req *rqstp) 1410 { 1411 SVCTHREAD *st; 1412 SVCPOOL *pool; 1413 1414 st = rqstp->rq_thread; 1415 if (st) { 1416 pool = st->st_pool; 1417 if (pool->sp_done) 1418 pool->sp_done(st, rqstp); 1419 } 1420 1421 if (rqstp->rq_auth.svc_ah_ops) 1422 SVCAUTH_RELEASE(&rqstp->rq_auth); 1423 1424 if (rqstp->rq_xprt) { 1425 SVC_RELEASE(rqstp->rq_xprt); 1426 } 1427 1428 if (rqstp->rq_addr) 1429 free(rqstp->rq_addr, M_SONAME); 1430 1431 if (rqstp->rq_args) 1432 m_freem(rqstp->rq_args); 1433 1434 free(rqstp, M_RPC); 1435 } 1436