1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 34 #endif 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 /* 39 * svc.c, Server-side remote procedure call interface. 40 * 41 * There are two sets of procedures here. The xprt routines are 42 * for handling transport handles. The svc routines handle the 43 * list of service routines. 44 * 45 * Copyright (C) 1984, Sun Microsystems, Inc. 46 */ 47 48 #include <sys/param.h> 49 #include <sys/lock.h> 50 #include <sys/kernel.h> 51 #include <sys/kthread.h> 52 #include <sys/malloc.h> 53 #include <sys/mbuf.h> 54 #include <sys/mutex.h> 55 #include <sys/proc.h> 56 #include <sys/queue.h> 57 #include <sys/socketvar.h> 58 #include <sys/systm.h> 59 #include <sys/smp.h> 60 #include <sys/sx.h> 61 #include <sys/ucred.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpcb_clnt.h> 65 #include <rpc/replay.h> 66 67 #include <rpc/rpc_com.h> 68 69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 71 72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 73 char *); 74 static void svc_new_thread(SVCGROUP *grp); 75 static void xprt_unregister_locked(SVCXPRT *xprt); 76 static void svc_change_space_used(SVCPOOL *pool, int delta); 77 static bool_t svc_request_space_available(SVCPOOL *pool); 78 79 /* *************** SVCXPRT related stuff **************** */ 80 81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 84 85 SVCPOOL* 86 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 87 { 88 SVCPOOL *pool; 89 SVCGROUP *grp; 90 int g; 91 92 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 93 94 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 95 pool->sp_name = name; 96 pool->sp_state = SVCPOOL_INIT; 97 pool->sp_proc = NULL; 98 TAILQ_INIT(&pool->sp_callouts); 99 TAILQ_INIT(&pool->sp_lcallouts); 100 pool->sp_minthreads = 1; 101 pool->sp_maxthreads = 1; 102 pool->sp_groupcount = 1; 103 for (g = 0; g < SVC_MAXGROUPS; g++) { 104 grp = &pool->sp_groups[g]; 105 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 106 grp->sg_pool = pool; 107 grp->sg_state = SVCPOOL_ACTIVE; 108 TAILQ_INIT(&grp->sg_xlist); 109 TAILQ_INIT(&grp->sg_active); 110 LIST_INIT(&grp->sg_idlethreads); 111 grp->sg_minthreads = 1; 112 grp->sg_maxthreads = 1; 113 } 114 115 /* 116 * Don't use more than a quarter of mbuf clusters or more than 117 * 45Mb buffering requests. 118 */ 119 pool->sp_space_high = nmbclusters * MCLBYTES / 4; 120 if (pool->sp_space_high > 45 << 20) 121 pool->sp_space_high = 45 << 20; 122 pool->sp_space_low = 2 * pool->sp_space_high / 3; 123 124 sysctl_ctx_init(&pool->sp_sysctl); 125 if (sysctl_base) { 126 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 127 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 128 pool, 0, svcpool_minthread_sysctl, "I", 129 "Minimal number of threads"); 130 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 132 pool, 0, svcpool_maxthread_sysctl, "I", 133 "Maximal number of threads"); 134 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 135 "threads", CTLTYPE_INT | CTLFLAG_RD, 136 pool, 0, svcpool_threads_sysctl, "I", 137 "Current number of threads"); 138 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 139 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 140 "Number of thread groups"); 141 142 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 143 "request_space_used", CTLFLAG_RD, 144 &pool->sp_space_used, 0, 145 "Space in parsed but not handled requests."); 146 147 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 148 "request_space_used_highest", CTLFLAG_RD, 149 &pool->sp_space_used_highest, 0, 150 "Highest space used since reboot."); 151 152 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 153 "request_space_high", CTLFLAG_RW, 154 &pool->sp_space_high, 0, 155 "Maximum space in parsed but not handled requests."); 156 157 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 158 "request_space_low", CTLFLAG_RW, 159 &pool->sp_space_low, 0, 160 "Low water mark for request space."); 161 162 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 163 "request_space_throttled", CTLFLAG_RD, 164 &pool->sp_space_throttled, 0, 165 "Whether nfs requests are currently throttled"); 166 167 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 168 "request_space_throttle_count", CTLFLAG_RD, 169 &pool->sp_space_throttle_count, 0, 170 "Count of times throttling based on request space has occurred"); 171 } 172 173 return pool; 174 } 175 176 void 177 svcpool_destroy(SVCPOOL *pool) 178 { 179 SVCGROUP *grp; 180 SVCXPRT *xprt, *nxprt; 181 struct svc_callout *s; 182 struct svc_loss_callout *sl; 183 struct svcxprt_list cleanup; 184 int g; 185 186 TAILQ_INIT(&cleanup); 187 188 for (g = 0; g < SVC_MAXGROUPS; g++) { 189 grp = &pool->sp_groups[g]; 190 mtx_lock(&grp->sg_lock); 191 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 192 xprt_unregister_locked(xprt); 193 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 194 } 195 mtx_unlock(&grp->sg_lock); 196 } 197 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 198 SVC_RELEASE(xprt); 199 } 200 201 mtx_lock(&pool->sp_lock); 202 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 203 mtx_unlock(&pool->sp_lock); 204 svc_unreg(pool, s->sc_prog, s->sc_vers); 205 mtx_lock(&pool->sp_lock); 206 } 207 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 208 mtx_unlock(&pool->sp_lock); 209 svc_loss_unreg(pool, sl->slc_dispatch); 210 mtx_lock(&pool->sp_lock); 211 } 212 mtx_unlock(&pool->sp_lock); 213 214 for (g = 0; g < SVC_MAXGROUPS; g++) { 215 grp = &pool->sp_groups[g]; 216 mtx_destroy(&grp->sg_lock); 217 } 218 mtx_destroy(&pool->sp_lock); 219 220 if (pool->sp_rcache) 221 replay_freecache(pool->sp_rcache); 222 223 sysctl_ctx_free(&pool->sp_sysctl); 224 free(pool, M_RPC); 225 } 226 227 /* 228 * Sysctl handler to get the present thread count on a pool 229 */ 230 static int 231 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 232 { 233 SVCPOOL *pool; 234 int threads, error, g; 235 236 pool = oidp->oid_arg1; 237 threads = 0; 238 mtx_lock(&pool->sp_lock); 239 for (g = 0; g < pool->sp_groupcount; g++) 240 threads += pool->sp_groups[g].sg_threadcount; 241 mtx_unlock(&pool->sp_lock); 242 error = sysctl_handle_int(oidp, &threads, 0, req); 243 return (error); 244 } 245 246 /* 247 * Sysctl handler to set the minimum thread count on a pool 248 */ 249 static int 250 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 251 { 252 SVCPOOL *pool; 253 int newminthreads, error, g; 254 255 pool = oidp->oid_arg1; 256 newminthreads = pool->sp_minthreads; 257 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 258 if (error == 0 && newminthreads != pool->sp_minthreads) { 259 if (newminthreads > pool->sp_maxthreads) 260 return (EINVAL); 261 mtx_lock(&pool->sp_lock); 262 pool->sp_minthreads = newminthreads; 263 for (g = 0; g < pool->sp_groupcount; g++) { 264 pool->sp_groups[g].sg_minthreads = max(1, 265 pool->sp_minthreads / pool->sp_groupcount); 266 } 267 mtx_unlock(&pool->sp_lock); 268 } 269 return (error); 270 } 271 272 /* 273 * Sysctl handler to set the maximum thread count on a pool 274 */ 275 static int 276 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 277 { 278 SVCPOOL *pool; 279 int newmaxthreads, error, g; 280 281 pool = oidp->oid_arg1; 282 newmaxthreads = pool->sp_maxthreads; 283 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 284 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 285 if (newmaxthreads < pool->sp_minthreads) 286 return (EINVAL); 287 mtx_lock(&pool->sp_lock); 288 pool->sp_maxthreads = newmaxthreads; 289 for (g = 0; g < pool->sp_groupcount; g++) { 290 pool->sp_groups[g].sg_maxthreads = max(1, 291 pool->sp_maxthreads / pool->sp_groupcount); 292 } 293 mtx_unlock(&pool->sp_lock); 294 } 295 return (error); 296 } 297 298 /* 299 * Activate a transport handle. 300 */ 301 void 302 xprt_register(SVCXPRT *xprt) 303 { 304 SVCPOOL *pool = xprt->xp_pool; 305 SVCGROUP *grp; 306 int g; 307 308 SVC_ACQUIRE(xprt); 309 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 310 xprt->xp_group = grp = &pool->sp_groups[g]; 311 mtx_lock(&grp->sg_lock); 312 xprt->xp_registered = TRUE; 313 xprt->xp_active = FALSE; 314 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 315 mtx_unlock(&grp->sg_lock); 316 } 317 318 /* 319 * De-activate a transport handle. Note: the locked version doesn't 320 * release the transport - caller must do that after dropping the pool 321 * lock. 322 */ 323 static void 324 xprt_unregister_locked(SVCXPRT *xprt) 325 { 326 SVCGROUP *grp = xprt->xp_group; 327 328 mtx_assert(&grp->sg_lock, MA_OWNED); 329 KASSERT(xprt->xp_registered == TRUE, 330 ("xprt_unregister_locked: not registered")); 331 xprt_inactive_locked(xprt); 332 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 333 xprt->xp_registered = FALSE; 334 } 335 336 void 337 xprt_unregister(SVCXPRT *xprt) 338 { 339 SVCGROUP *grp = xprt->xp_group; 340 341 mtx_lock(&grp->sg_lock); 342 if (xprt->xp_registered == FALSE) { 343 /* Already unregistered by another thread */ 344 mtx_unlock(&grp->sg_lock); 345 return; 346 } 347 xprt_unregister_locked(xprt); 348 mtx_unlock(&grp->sg_lock); 349 350 SVC_RELEASE(xprt); 351 } 352 353 /* 354 * Attempt to assign a service thread to this transport. 355 */ 356 static int 357 xprt_assignthread(SVCXPRT *xprt) 358 { 359 SVCGROUP *grp = xprt->xp_group; 360 SVCTHREAD *st; 361 362 mtx_assert(&grp->sg_lock, MA_OWNED); 363 st = LIST_FIRST(&grp->sg_idlethreads); 364 if (st) { 365 LIST_REMOVE(st, st_ilink); 366 SVC_ACQUIRE(xprt); 367 xprt->xp_thread = st; 368 st->st_xprt = xprt; 369 cv_signal(&st->st_cond); 370 return (TRUE); 371 } else { 372 /* 373 * See if we can create a new thread. The 374 * actual thread creation happens in 375 * svc_run_internal because our locking state 376 * is poorly defined (we are typically called 377 * from a socket upcall). Don't create more 378 * than one thread per second. 379 */ 380 if (grp->sg_state == SVCPOOL_ACTIVE 381 && grp->sg_lastcreatetime < time_uptime 382 && grp->sg_threadcount < grp->sg_maxthreads) { 383 grp->sg_state = SVCPOOL_THREADWANTED; 384 } 385 } 386 return (FALSE); 387 } 388 389 void 390 xprt_active(SVCXPRT *xprt) 391 { 392 SVCGROUP *grp = xprt->xp_group; 393 394 mtx_lock(&grp->sg_lock); 395 396 if (!xprt->xp_registered) { 397 /* 398 * Race with xprt_unregister - we lose. 399 */ 400 mtx_unlock(&grp->sg_lock); 401 return; 402 } 403 404 if (!xprt->xp_active) { 405 xprt->xp_active = TRUE; 406 if (xprt->xp_thread == NULL) { 407 if (!svc_request_space_available(xprt->xp_pool) || 408 !xprt_assignthread(xprt)) 409 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 410 xp_alink); 411 } 412 } 413 414 mtx_unlock(&grp->sg_lock); 415 } 416 417 void 418 xprt_inactive_locked(SVCXPRT *xprt) 419 { 420 SVCGROUP *grp = xprt->xp_group; 421 422 mtx_assert(&grp->sg_lock, MA_OWNED); 423 if (xprt->xp_active) { 424 if (xprt->xp_thread == NULL) 425 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 426 xprt->xp_active = FALSE; 427 } 428 } 429 430 void 431 xprt_inactive(SVCXPRT *xprt) 432 { 433 SVCGROUP *grp = xprt->xp_group; 434 435 mtx_lock(&grp->sg_lock); 436 xprt_inactive_locked(xprt); 437 mtx_unlock(&grp->sg_lock); 438 } 439 440 /* 441 * Variant of xprt_inactive() for use only when sure that port is 442 * assigned to thread. For example, withing receive handlers. 443 */ 444 void 445 xprt_inactive_self(SVCXPRT *xprt) 446 { 447 448 KASSERT(xprt->xp_thread != NULL, 449 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 450 xprt->xp_active = FALSE; 451 } 452 453 /* 454 * Add a service program to the callout list. 455 * The dispatch routine will be called when a rpc request for this 456 * program number comes in. 457 */ 458 bool_t 459 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 460 void (*dispatch)(struct svc_req *, SVCXPRT *), 461 const struct netconfig *nconf) 462 { 463 SVCPOOL *pool = xprt->xp_pool; 464 struct svc_callout *s; 465 char *netid = NULL; 466 int flag = 0; 467 468 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 469 470 if (xprt->xp_netid) { 471 netid = strdup(xprt->xp_netid, M_RPC); 472 flag = 1; 473 } else if (nconf && nconf->nc_netid) { 474 netid = strdup(nconf->nc_netid, M_RPC); 475 flag = 1; 476 } /* must have been created with svc_raw_create */ 477 if ((netid == NULL) && (flag == 1)) { 478 return (FALSE); 479 } 480 481 mtx_lock(&pool->sp_lock); 482 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 483 if (netid) 484 free(netid, M_RPC); 485 if (s->sc_dispatch == dispatch) 486 goto rpcb_it; /* he is registering another xptr */ 487 mtx_unlock(&pool->sp_lock); 488 return (FALSE); 489 } 490 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 491 if (s == NULL) { 492 if (netid) 493 free(netid, M_RPC); 494 mtx_unlock(&pool->sp_lock); 495 return (FALSE); 496 } 497 498 s->sc_prog = prog; 499 s->sc_vers = vers; 500 s->sc_dispatch = dispatch; 501 s->sc_netid = netid; 502 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 503 504 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 505 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 506 507 rpcb_it: 508 mtx_unlock(&pool->sp_lock); 509 /* now register the information with the local binder service */ 510 if (nconf) { 511 bool_t dummy; 512 struct netconfig tnc; 513 struct netbuf nb; 514 tnc = *nconf; 515 nb.buf = &xprt->xp_ltaddr; 516 nb.len = xprt->xp_ltaddr.ss_len; 517 dummy = rpcb_set(prog, vers, &tnc, &nb); 518 return (dummy); 519 } 520 return (TRUE); 521 } 522 523 /* 524 * Remove a service program from the callout list. 525 */ 526 void 527 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 528 { 529 struct svc_callout *s; 530 531 /* unregister the information anyway */ 532 (void) rpcb_unset(prog, vers, NULL); 533 mtx_lock(&pool->sp_lock); 534 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 535 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 536 if (s->sc_netid) 537 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 538 mem_free(s, sizeof (struct svc_callout)); 539 } 540 mtx_unlock(&pool->sp_lock); 541 } 542 543 /* 544 * Add a service connection loss program to the callout list. 545 * The dispatch routine will be called when some port in ths pool die. 546 */ 547 bool_t 548 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 549 { 550 SVCPOOL *pool = xprt->xp_pool; 551 struct svc_loss_callout *s; 552 553 mtx_lock(&pool->sp_lock); 554 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 555 if (s->slc_dispatch == dispatch) 556 break; 557 } 558 if (s != NULL) { 559 mtx_unlock(&pool->sp_lock); 560 return (TRUE); 561 } 562 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 563 if (s == NULL) { 564 mtx_unlock(&pool->sp_lock); 565 return (FALSE); 566 } 567 s->slc_dispatch = dispatch; 568 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 569 mtx_unlock(&pool->sp_lock); 570 return (TRUE); 571 } 572 573 /* 574 * Remove a service connection loss program from the callout list. 575 */ 576 void 577 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 578 { 579 struct svc_loss_callout *s; 580 581 mtx_lock(&pool->sp_lock); 582 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 583 if (s->slc_dispatch == dispatch) { 584 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 585 free(s, M_RPC); 586 break; 587 } 588 } 589 mtx_unlock(&pool->sp_lock); 590 } 591 592 /* ********************** CALLOUT list related stuff ************* */ 593 594 /* 595 * Search the callout list for a program number, return the callout 596 * struct. 597 */ 598 static struct svc_callout * 599 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 600 { 601 struct svc_callout *s; 602 603 mtx_assert(&pool->sp_lock, MA_OWNED); 604 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 605 if (s->sc_prog == prog && s->sc_vers == vers 606 && (netid == NULL || s->sc_netid == NULL || 607 strcmp(netid, s->sc_netid) == 0)) 608 break; 609 } 610 611 return (s); 612 } 613 614 /* ******************* REPLY GENERATION ROUTINES ************ */ 615 616 static bool_t 617 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 618 struct mbuf *body) 619 { 620 SVCXPRT *xprt = rqstp->rq_xprt; 621 bool_t ok; 622 623 if (rqstp->rq_args) { 624 m_freem(rqstp->rq_args); 625 rqstp->rq_args = NULL; 626 } 627 628 if (xprt->xp_pool->sp_rcache) 629 replay_setreply(xprt->xp_pool->sp_rcache, 630 rply, svc_getrpccaller(rqstp), body); 631 632 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 633 return (FALSE); 634 635 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 636 if (rqstp->rq_addr) { 637 free(rqstp->rq_addr, M_SONAME); 638 rqstp->rq_addr = NULL; 639 } 640 641 return (ok); 642 } 643 644 /* 645 * Send a reply to an rpc request 646 */ 647 bool_t 648 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 649 { 650 struct rpc_msg rply; 651 struct mbuf *m; 652 XDR xdrs; 653 bool_t ok; 654 655 rply.rm_xid = rqstp->rq_xid; 656 rply.rm_direction = REPLY; 657 rply.rm_reply.rp_stat = MSG_ACCEPTED; 658 rply.acpted_rply.ar_verf = rqstp->rq_verf; 659 rply.acpted_rply.ar_stat = SUCCESS; 660 rply.acpted_rply.ar_results.where = NULL; 661 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 662 663 m = m_getcl(M_WAITOK, MT_DATA, 0); 664 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 665 ok = xdr_results(&xdrs, xdr_location); 666 XDR_DESTROY(&xdrs); 667 668 if (ok) { 669 return (svc_sendreply_common(rqstp, &rply, m)); 670 } else { 671 m_freem(m); 672 return (FALSE); 673 } 674 } 675 676 bool_t 677 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 678 { 679 struct rpc_msg rply; 680 681 rply.rm_xid = rqstp->rq_xid; 682 rply.rm_direction = REPLY; 683 rply.rm_reply.rp_stat = MSG_ACCEPTED; 684 rply.acpted_rply.ar_verf = rqstp->rq_verf; 685 rply.acpted_rply.ar_stat = SUCCESS; 686 rply.acpted_rply.ar_results.where = NULL; 687 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 688 689 return (svc_sendreply_common(rqstp, &rply, m)); 690 } 691 692 /* 693 * No procedure error reply 694 */ 695 void 696 svcerr_noproc(struct svc_req *rqstp) 697 { 698 SVCXPRT *xprt = rqstp->rq_xprt; 699 struct rpc_msg rply; 700 701 rply.rm_xid = rqstp->rq_xid; 702 rply.rm_direction = REPLY; 703 rply.rm_reply.rp_stat = MSG_ACCEPTED; 704 rply.acpted_rply.ar_verf = rqstp->rq_verf; 705 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 706 707 if (xprt->xp_pool->sp_rcache) 708 replay_setreply(xprt->xp_pool->sp_rcache, 709 &rply, svc_getrpccaller(rqstp), NULL); 710 711 svc_sendreply_common(rqstp, &rply, NULL); 712 } 713 714 /* 715 * Can't decode args error reply 716 */ 717 void 718 svcerr_decode(struct svc_req *rqstp) 719 { 720 SVCXPRT *xprt = rqstp->rq_xprt; 721 struct rpc_msg rply; 722 723 rply.rm_xid = rqstp->rq_xid; 724 rply.rm_direction = REPLY; 725 rply.rm_reply.rp_stat = MSG_ACCEPTED; 726 rply.acpted_rply.ar_verf = rqstp->rq_verf; 727 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 728 729 if (xprt->xp_pool->sp_rcache) 730 replay_setreply(xprt->xp_pool->sp_rcache, 731 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 732 733 svc_sendreply_common(rqstp, &rply, NULL); 734 } 735 736 /* 737 * Some system error 738 */ 739 void 740 svcerr_systemerr(struct svc_req *rqstp) 741 { 742 SVCXPRT *xprt = rqstp->rq_xprt; 743 struct rpc_msg rply; 744 745 rply.rm_xid = rqstp->rq_xid; 746 rply.rm_direction = REPLY; 747 rply.rm_reply.rp_stat = MSG_ACCEPTED; 748 rply.acpted_rply.ar_verf = rqstp->rq_verf; 749 rply.acpted_rply.ar_stat = SYSTEM_ERR; 750 751 if (xprt->xp_pool->sp_rcache) 752 replay_setreply(xprt->xp_pool->sp_rcache, 753 &rply, svc_getrpccaller(rqstp), NULL); 754 755 svc_sendreply_common(rqstp, &rply, NULL); 756 } 757 758 /* 759 * Authentication error reply 760 */ 761 void 762 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 763 { 764 SVCXPRT *xprt = rqstp->rq_xprt; 765 struct rpc_msg rply; 766 767 rply.rm_xid = rqstp->rq_xid; 768 rply.rm_direction = REPLY; 769 rply.rm_reply.rp_stat = MSG_DENIED; 770 rply.rjcted_rply.rj_stat = AUTH_ERROR; 771 rply.rjcted_rply.rj_why = why; 772 773 if (xprt->xp_pool->sp_rcache) 774 replay_setreply(xprt->xp_pool->sp_rcache, 775 &rply, svc_getrpccaller(rqstp), NULL); 776 777 svc_sendreply_common(rqstp, &rply, NULL); 778 } 779 780 /* 781 * Auth too weak error reply 782 */ 783 void 784 svcerr_weakauth(struct svc_req *rqstp) 785 { 786 787 svcerr_auth(rqstp, AUTH_TOOWEAK); 788 } 789 790 /* 791 * Program unavailable error reply 792 */ 793 void 794 svcerr_noprog(struct svc_req *rqstp) 795 { 796 SVCXPRT *xprt = rqstp->rq_xprt; 797 struct rpc_msg rply; 798 799 rply.rm_xid = rqstp->rq_xid; 800 rply.rm_direction = REPLY; 801 rply.rm_reply.rp_stat = MSG_ACCEPTED; 802 rply.acpted_rply.ar_verf = rqstp->rq_verf; 803 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 804 805 if (xprt->xp_pool->sp_rcache) 806 replay_setreply(xprt->xp_pool->sp_rcache, 807 &rply, svc_getrpccaller(rqstp), NULL); 808 809 svc_sendreply_common(rqstp, &rply, NULL); 810 } 811 812 /* 813 * Program version mismatch error reply 814 */ 815 void 816 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 817 { 818 SVCXPRT *xprt = rqstp->rq_xprt; 819 struct rpc_msg rply; 820 821 rply.rm_xid = rqstp->rq_xid; 822 rply.rm_direction = REPLY; 823 rply.rm_reply.rp_stat = MSG_ACCEPTED; 824 rply.acpted_rply.ar_verf = rqstp->rq_verf; 825 rply.acpted_rply.ar_stat = PROG_MISMATCH; 826 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 827 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 828 829 if (xprt->xp_pool->sp_rcache) 830 replay_setreply(xprt->xp_pool->sp_rcache, 831 &rply, svc_getrpccaller(rqstp), NULL); 832 833 svc_sendreply_common(rqstp, &rply, NULL); 834 } 835 836 /* 837 * Allocate a new server transport structure. All fields are 838 * initialized to zero and xp_p3 is initialized to point at an 839 * extension structure to hold various flags and authentication 840 * parameters. 841 */ 842 SVCXPRT * 843 svc_xprt_alloc() 844 { 845 SVCXPRT *xprt; 846 SVCXPRT_EXT *ext; 847 848 xprt = mem_alloc(sizeof(SVCXPRT)); 849 memset(xprt, 0, sizeof(SVCXPRT)); 850 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 851 memset(ext, 0, sizeof(SVCXPRT_EXT)); 852 xprt->xp_p3 = ext; 853 refcount_init(&xprt->xp_refs, 1); 854 855 return (xprt); 856 } 857 858 /* 859 * Free a server transport structure. 860 */ 861 void 862 svc_xprt_free(xprt) 863 SVCXPRT *xprt; 864 { 865 866 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 867 mem_free(xprt, sizeof(SVCXPRT)); 868 } 869 870 /* ******************* SERVER INPUT STUFF ******************* */ 871 872 /* 873 * Read RPC requests from a transport and queue them to be 874 * executed. We handle authentication and replay cache replies here. 875 * Actually dispatching the RPC is deferred till svc_executereq. 876 */ 877 static enum xprt_stat 878 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 879 { 880 SVCPOOL *pool = xprt->xp_pool; 881 struct svc_req *r; 882 struct rpc_msg msg; 883 struct mbuf *args; 884 struct svc_loss_callout *s; 885 enum xprt_stat stat; 886 887 /* now receive msgs from xprtprt (support batch calls) */ 888 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 889 890 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 891 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 892 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 893 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 894 enum auth_stat why; 895 896 /* 897 * Handle replays and authenticate before queuing the 898 * request to be executed. 899 */ 900 SVC_ACQUIRE(xprt); 901 r->rq_xprt = xprt; 902 if (pool->sp_rcache) { 903 struct rpc_msg repmsg; 904 struct mbuf *repbody; 905 enum replay_state rs; 906 rs = replay_find(pool->sp_rcache, &msg, 907 svc_getrpccaller(r), &repmsg, &repbody); 908 switch (rs) { 909 case RS_NEW: 910 break; 911 case RS_DONE: 912 SVC_REPLY(xprt, &repmsg, r->rq_addr, 913 repbody, &r->rq_reply_seq); 914 if (r->rq_addr) { 915 free(r->rq_addr, M_SONAME); 916 r->rq_addr = NULL; 917 } 918 m_freem(args); 919 goto call_done; 920 921 default: 922 m_freem(args); 923 goto call_done; 924 } 925 } 926 927 r->rq_xid = msg.rm_xid; 928 r->rq_prog = msg.rm_call.cb_prog; 929 r->rq_vers = msg.rm_call.cb_vers; 930 r->rq_proc = msg.rm_call.cb_proc; 931 r->rq_size = sizeof(*r) + m_length(args, NULL); 932 r->rq_args = args; 933 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 934 /* 935 * RPCSEC_GSS uses this return code 936 * for requests that form part of its 937 * context establishment protocol and 938 * should not be dispatched to the 939 * application. 940 */ 941 if (why != RPCSEC_GSS_NODISPATCH) 942 svcerr_auth(r, why); 943 goto call_done; 944 } 945 946 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 947 svcerr_decode(r); 948 goto call_done; 949 } 950 951 /* 952 * Everything checks out, return request to caller. 953 */ 954 *rqstp_ret = r; 955 r = NULL; 956 } 957 call_done: 958 if (r) { 959 svc_freereq(r); 960 r = NULL; 961 } 962 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 963 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 964 (*s->slc_dispatch)(xprt); 965 xprt_unregister(xprt); 966 } 967 968 return (stat); 969 } 970 971 static void 972 svc_executereq(struct svc_req *rqstp) 973 { 974 SVCXPRT *xprt = rqstp->rq_xprt; 975 SVCPOOL *pool = xprt->xp_pool; 976 int prog_found; 977 rpcvers_t low_vers; 978 rpcvers_t high_vers; 979 struct svc_callout *s; 980 981 /* now match message with a registered service*/ 982 prog_found = FALSE; 983 low_vers = (rpcvers_t) -1L; 984 high_vers = (rpcvers_t) 0L; 985 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 986 if (s->sc_prog == rqstp->rq_prog) { 987 if (s->sc_vers == rqstp->rq_vers) { 988 /* 989 * We hand ownership of r to the 990 * dispatch method - they must call 991 * svc_freereq. 992 */ 993 (*s->sc_dispatch)(rqstp, xprt); 994 return; 995 } /* found correct version */ 996 prog_found = TRUE; 997 if (s->sc_vers < low_vers) 998 low_vers = s->sc_vers; 999 if (s->sc_vers > high_vers) 1000 high_vers = s->sc_vers; 1001 } /* found correct program */ 1002 } 1003 1004 /* 1005 * if we got here, the program or version 1006 * is not served ... 1007 */ 1008 if (prog_found) 1009 svcerr_progvers(rqstp, low_vers, high_vers); 1010 else 1011 svcerr_noprog(rqstp); 1012 1013 svc_freereq(rqstp); 1014 } 1015 1016 static void 1017 svc_checkidle(SVCGROUP *grp) 1018 { 1019 SVCXPRT *xprt, *nxprt; 1020 time_t timo; 1021 struct svcxprt_list cleanup; 1022 1023 TAILQ_INIT(&cleanup); 1024 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1025 /* 1026 * Only some transports have idle timers. Don't time 1027 * something out which is just waking up. 1028 */ 1029 if (!xprt->xp_idletimeout || xprt->xp_thread) 1030 continue; 1031 1032 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1033 if (time_uptime > timo) { 1034 xprt_unregister_locked(xprt); 1035 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1036 } 1037 } 1038 1039 mtx_unlock(&grp->sg_lock); 1040 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1041 SVC_RELEASE(xprt); 1042 } 1043 mtx_lock(&grp->sg_lock); 1044 } 1045 1046 static void 1047 svc_assign_waiting_sockets(SVCPOOL *pool) 1048 { 1049 SVCGROUP *grp; 1050 SVCXPRT *xprt; 1051 int g; 1052 1053 for (g = 0; g < pool->sp_groupcount; g++) { 1054 grp = &pool->sp_groups[g]; 1055 mtx_lock(&grp->sg_lock); 1056 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1057 if (xprt_assignthread(xprt)) 1058 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1059 else 1060 break; 1061 } 1062 mtx_unlock(&grp->sg_lock); 1063 } 1064 } 1065 1066 static void 1067 svc_change_space_used(SVCPOOL *pool, int delta) 1068 { 1069 unsigned int value; 1070 1071 value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta; 1072 if (delta > 0) { 1073 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1074 pool->sp_space_throttled = TRUE; 1075 pool->sp_space_throttle_count++; 1076 } 1077 if (value > pool->sp_space_used_highest) 1078 pool->sp_space_used_highest = value; 1079 } else { 1080 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1081 pool->sp_space_throttled = FALSE; 1082 svc_assign_waiting_sockets(pool); 1083 } 1084 } 1085 } 1086 1087 static bool_t 1088 svc_request_space_available(SVCPOOL *pool) 1089 { 1090 1091 if (pool->sp_space_throttled) 1092 return (FALSE); 1093 return (TRUE); 1094 } 1095 1096 static void 1097 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1098 { 1099 SVCPOOL *pool = grp->sg_pool; 1100 SVCTHREAD *st, *stpref; 1101 SVCXPRT *xprt; 1102 enum xprt_stat stat; 1103 struct svc_req *rqstp; 1104 struct proc *p; 1105 size_t sz; 1106 int error; 1107 1108 st = mem_alloc(sizeof(*st)); 1109 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1110 st->st_pool = pool; 1111 st->st_xprt = NULL; 1112 STAILQ_INIT(&st->st_reqs); 1113 cv_init(&st->st_cond, "rpcsvc"); 1114 1115 mtx_lock(&grp->sg_lock); 1116 1117 /* 1118 * If we are a new thread which was spawned to cope with 1119 * increased load, set the state back to SVCPOOL_ACTIVE. 1120 */ 1121 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1122 grp->sg_state = SVCPOOL_ACTIVE; 1123 1124 while (grp->sg_state != SVCPOOL_CLOSING) { 1125 /* 1126 * Create new thread if requested. 1127 */ 1128 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1129 grp->sg_state = SVCPOOL_THREADSTARTING; 1130 grp->sg_lastcreatetime = time_uptime; 1131 mtx_unlock(&grp->sg_lock); 1132 svc_new_thread(grp); 1133 mtx_lock(&grp->sg_lock); 1134 continue; 1135 } 1136 1137 /* 1138 * Check for idle transports once per second. 1139 */ 1140 if (time_uptime > grp->sg_lastidlecheck) { 1141 grp->sg_lastidlecheck = time_uptime; 1142 svc_checkidle(grp); 1143 } 1144 1145 xprt = st->st_xprt; 1146 if (!xprt) { 1147 /* 1148 * Enforce maxthreads count. 1149 */ 1150 if (grp->sg_threadcount > grp->sg_maxthreads) 1151 break; 1152 1153 /* 1154 * Before sleeping, see if we can find an 1155 * active transport which isn't being serviced 1156 * by a thread. 1157 */ 1158 if (svc_request_space_available(pool) && 1159 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1160 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1161 SVC_ACQUIRE(xprt); 1162 xprt->xp_thread = st; 1163 st->st_xprt = xprt; 1164 continue; 1165 } 1166 1167 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1168 if (ismaster || (!ismaster && 1169 grp->sg_threadcount > grp->sg_minthreads)) 1170 error = cv_timedwait_sig(&st->st_cond, 1171 &grp->sg_lock, 5 * hz); 1172 else 1173 error = cv_wait_sig(&st->st_cond, 1174 &grp->sg_lock); 1175 if (st->st_xprt == NULL) 1176 LIST_REMOVE(st, st_ilink); 1177 1178 /* 1179 * Reduce worker thread count when idle. 1180 */ 1181 if (error == EWOULDBLOCK) { 1182 if (!ismaster 1183 && (grp->sg_threadcount 1184 > grp->sg_minthreads) 1185 && !st->st_xprt) 1186 break; 1187 } else if (error != 0) { 1188 KASSERT(error == EINTR || error == ERESTART, 1189 ("non-signal error %d", error)); 1190 mtx_unlock(&grp->sg_lock); 1191 p = curproc; 1192 PROC_LOCK(p); 1193 if (P_SHOULDSTOP(p) || 1194 (p->p_flag & P_TOTAL_STOP) != 0) { 1195 thread_suspend_check(0); 1196 PROC_UNLOCK(p); 1197 mtx_lock(&grp->sg_lock); 1198 } else { 1199 PROC_UNLOCK(p); 1200 svc_exit(pool); 1201 mtx_lock(&grp->sg_lock); 1202 break; 1203 } 1204 } 1205 continue; 1206 } 1207 mtx_unlock(&grp->sg_lock); 1208 1209 /* 1210 * Drain the transport socket and queue up any RPCs. 1211 */ 1212 xprt->xp_lastactive = time_uptime; 1213 do { 1214 if (!svc_request_space_available(pool)) 1215 break; 1216 rqstp = NULL; 1217 stat = svc_getreq(xprt, &rqstp); 1218 if (rqstp) { 1219 svc_change_space_used(pool, rqstp->rq_size); 1220 /* 1221 * See if the application has a preference 1222 * for some other thread. 1223 */ 1224 if (pool->sp_assign) { 1225 stpref = pool->sp_assign(st, rqstp); 1226 rqstp->rq_thread = stpref; 1227 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1228 rqstp, rq_link); 1229 mtx_unlock(&stpref->st_lock); 1230 if (stpref != st) 1231 rqstp = NULL; 1232 } else { 1233 rqstp->rq_thread = st; 1234 STAILQ_INSERT_TAIL(&st->st_reqs, 1235 rqstp, rq_link); 1236 } 1237 } 1238 } while (rqstp == NULL && stat == XPRT_MOREREQS 1239 && grp->sg_state != SVCPOOL_CLOSING); 1240 1241 /* 1242 * Move this transport to the end of the active list to 1243 * ensure fairness when multiple transports are active. 1244 * If this was the last queued request, svc_getreq will end 1245 * up calling xprt_inactive to remove from the active list. 1246 */ 1247 mtx_lock(&grp->sg_lock); 1248 xprt->xp_thread = NULL; 1249 st->st_xprt = NULL; 1250 if (xprt->xp_active) { 1251 if (!svc_request_space_available(pool) || 1252 !xprt_assignthread(xprt)) 1253 TAILQ_INSERT_TAIL(&grp->sg_active, 1254 xprt, xp_alink); 1255 } 1256 mtx_unlock(&grp->sg_lock); 1257 SVC_RELEASE(xprt); 1258 1259 /* 1260 * Execute what we have queued. 1261 */ 1262 sz = 0; 1263 mtx_lock(&st->st_lock); 1264 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1265 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1266 mtx_unlock(&st->st_lock); 1267 sz += rqstp->rq_size; 1268 svc_executereq(rqstp); 1269 mtx_lock(&st->st_lock); 1270 } 1271 mtx_unlock(&st->st_lock); 1272 svc_change_space_used(pool, -sz); 1273 mtx_lock(&grp->sg_lock); 1274 } 1275 1276 if (st->st_xprt) { 1277 xprt = st->st_xprt; 1278 st->st_xprt = NULL; 1279 SVC_RELEASE(xprt); 1280 } 1281 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1282 mtx_destroy(&st->st_lock); 1283 cv_destroy(&st->st_cond); 1284 mem_free(st, sizeof(*st)); 1285 1286 grp->sg_threadcount--; 1287 if (!ismaster) 1288 wakeup(grp); 1289 mtx_unlock(&grp->sg_lock); 1290 } 1291 1292 static void 1293 svc_thread_start(void *arg) 1294 { 1295 1296 svc_run_internal((SVCGROUP *) arg, FALSE); 1297 kthread_exit(); 1298 } 1299 1300 static void 1301 svc_new_thread(SVCGROUP *grp) 1302 { 1303 SVCPOOL *pool = grp->sg_pool; 1304 struct thread *td; 1305 1306 grp->sg_threadcount++; 1307 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1308 "%s: service", pool->sp_name); 1309 } 1310 1311 void 1312 svc_run(SVCPOOL *pool) 1313 { 1314 int g, i; 1315 struct proc *p; 1316 struct thread *td; 1317 SVCGROUP *grp; 1318 1319 p = curproc; 1320 td = curthread; 1321 snprintf(td->td_name, sizeof(td->td_name), 1322 "%s: master", pool->sp_name); 1323 pool->sp_state = SVCPOOL_ACTIVE; 1324 pool->sp_proc = p; 1325 1326 /* Choose group count based on number of threads and CPUs. */ 1327 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1328 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1329 for (g = 0; g < pool->sp_groupcount; g++) { 1330 grp = &pool->sp_groups[g]; 1331 grp->sg_minthreads = max(1, 1332 pool->sp_minthreads / pool->sp_groupcount); 1333 grp->sg_maxthreads = max(1, 1334 pool->sp_maxthreads / pool->sp_groupcount); 1335 grp->sg_lastcreatetime = time_uptime; 1336 } 1337 1338 /* Starting threads */ 1339 for (g = 0; g < pool->sp_groupcount; g++) { 1340 grp = &pool->sp_groups[g]; 1341 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1342 svc_new_thread(grp); 1343 } 1344 pool->sp_groups[0].sg_threadcount++; 1345 svc_run_internal(&pool->sp_groups[0], TRUE); 1346 1347 /* Waiting for threads to stop. */ 1348 for (g = 0; g < pool->sp_groupcount; g++) { 1349 grp = &pool->sp_groups[g]; 1350 mtx_lock(&grp->sg_lock); 1351 while (grp->sg_threadcount > 0) 1352 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1353 mtx_unlock(&grp->sg_lock); 1354 } 1355 } 1356 1357 void 1358 svc_exit(SVCPOOL *pool) 1359 { 1360 SVCGROUP *grp; 1361 SVCTHREAD *st; 1362 int g; 1363 1364 pool->sp_state = SVCPOOL_CLOSING; 1365 for (g = 0; g < pool->sp_groupcount; g++) { 1366 grp = &pool->sp_groups[g]; 1367 mtx_lock(&grp->sg_lock); 1368 if (grp->sg_state != SVCPOOL_CLOSING) { 1369 grp->sg_state = SVCPOOL_CLOSING; 1370 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1371 cv_signal(&st->st_cond); 1372 } 1373 mtx_unlock(&grp->sg_lock); 1374 } 1375 } 1376 1377 bool_t 1378 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1379 { 1380 struct mbuf *m; 1381 XDR xdrs; 1382 bool_t stat; 1383 1384 m = rqstp->rq_args; 1385 rqstp->rq_args = NULL; 1386 1387 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1388 stat = xargs(&xdrs, args); 1389 XDR_DESTROY(&xdrs); 1390 1391 return (stat); 1392 } 1393 1394 bool_t 1395 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1396 { 1397 XDR xdrs; 1398 1399 if (rqstp->rq_addr) { 1400 free(rqstp->rq_addr, M_SONAME); 1401 rqstp->rq_addr = NULL; 1402 } 1403 1404 xdrs.x_op = XDR_FREE; 1405 return (xargs(&xdrs, args)); 1406 } 1407 1408 void 1409 svc_freereq(struct svc_req *rqstp) 1410 { 1411 SVCTHREAD *st; 1412 SVCPOOL *pool; 1413 1414 st = rqstp->rq_thread; 1415 if (st) { 1416 pool = st->st_pool; 1417 if (pool->sp_done) 1418 pool->sp_done(st, rqstp); 1419 } 1420 1421 if (rqstp->rq_auth.svc_ah_ops) 1422 SVCAUTH_RELEASE(&rqstp->rq_auth); 1423 1424 if (rqstp->rq_xprt) { 1425 SVC_RELEASE(rqstp->rq_xprt); 1426 } 1427 1428 if (rqstp->rq_addr) 1429 free(rqstp->rq_addr, M_SONAME); 1430 1431 if (rqstp->rq_args) 1432 m_freem(rqstp->rq_args); 1433 1434 free(rqstp, M_RPC); 1435 } 1436