1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * Copyright (c) 2009, Sun Microsystems, Inc. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * - Redistributions of source code must retain the above copyright notice, 10 * this list of conditions and the following disclaimer. 11 * - Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * - Neither the name of Sun Microsystems, Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived 16 * from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28 * POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 #if defined(LIBC_SCCS) && !defined(lint) 32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 34 #endif 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 /* 39 * svc.c, Server-side remote procedure call interface. 40 * 41 * There are two sets of procedures here. The xprt routines are 42 * for handling transport handles. The svc routines handle the 43 * list of service routines. 44 * 45 * Copyright (C) 1984, Sun Microsystems, Inc. 46 */ 47 48 #include <sys/param.h> 49 #include <sys/lock.h> 50 #include <sys/kernel.h> 51 #include <sys/kthread.h> 52 #include <sys/malloc.h> 53 #include <sys/mbuf.h> 54 #include <sys/mutex.h> 55 #include <sys/proc.h> 56 #include <sys/queue.h> 57 #include <sys/socketvar.h> 58 #include <sys/systm.h> 59 #include <sys/smp.h> 60 #include <sys/sx.h> 61 #include <sys/ucred.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpcb_clnt.h> 65 #include <rpc/replay.h> 66 67 #include <rpc/rpc_com.h> 68 69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 71 72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 73 char *); 74 static void svc_new_thread(SVCGROUP *grp); 75 static void xprt_unregister_locked(SVCXPRT *xprt); 76 static void svc_change_space_used(SVCPOOL *pool, long delta); 77 static bool_t svc_request_space_available(SVCPOOL *pool); 78 static void svcpool_cleanup(SVCPOOL *pool); 79 80 /* *************** SVCXPRT related stuff **************** */ 81 82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 85 86 SVCPOOL* 87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 88 { 89 SVCPOOL *pool; 90 SVCGROUP *grp; 91 int g; 92 93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 94 95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 96 pool->sp_name = name; 97 pool->sp_state = SVCPOOL_INIT; 98 pool->sp_proc = NULL; 99 TAILQ_INIT(&pool->sp_callouts); 100 TAILQ_INIT(&pool->sp_lcallouts); 101 pool->sp_minthreads = 1; 102 pool->sp_maxthreads = 1; 103 pool->sp_groupcount = 1; 104 for (g = 0; g < SVC_MAXGROUPS; g++) { 105 grp = &pool->sp_groups[g]; 106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 107 grp->sg_pool = pool; 108 grp->sg_state = SVCPOOL_ACTIVE; 109 TAILQ_INIT(&grp->sg_xlist); 110 TAILQ_INIT(&grp->sg_active); 111 LIST_INIT(&grp->sg_idlethreads); 112 grp->sg_minthreads = 1; 113 grp->sg_maxthreads = 1; 114 } 115 116 /* 117 * Don't use more than a quarter of mbuf clusters. Nota bene: 118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 119 * on LP64 architectures, so cast to u_long to avoid undefined 120 * behavior. (ILP32 architectures cannot have nmbclusters 121 * large enough to overflow for other reasons.) 122 */ 123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 124 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 125 126 sysctl_ctx_init(&pool->sp_sysctl); 127 if (sysctl_base) { 128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 129 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 130 pool, 0, svcpool_minthread_sysctl, "I", 131 "Minimal number of threads"); 132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 134 pool, 0, svcpool_maxthread_sysctl, "I", 135 "Maximal number of threads"); 136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 137 "threads", CTLTYPE_INT | CTLFLAG_RD, 138 pool, 0, svcpool_threads_sysctl, "I", 139 "Current number of threads"); 140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 142 "Number of thread groups"); 143 144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 145 "request_space_used", CTLFLAG_RD, 146 &pool->sp_space_used, 147 "Space in parsed but not handled requests."); 148 149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 150 "request_space_used_highest", CTLFLAG_RD, 151 &pool->sp_space_used_highest, 152 "Highest space used since reboot."); 153 154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 155 "request_space_high", CTLFLAG_RW, 156 &pool->sp_space_high, 157 "Maximum space in parsed but not handled requests."); 158 159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 160 "request_space_low", CTLFLAG_RW, 161 &pool->sp_space_low, 162 "Low water mark for request space."); 163 164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 165 "request_space_throttled", CTLFLAG_RD, 166 &pool->sp_space_throttled, 0, 167 "Whether nfs requests are currently throttled"); 168 169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 170 "request_space_throttle_count", CTLFLAG_RD, 171 &pool->sp_space_throttle_count, 0, 172 "Count of times throttling based on request space has occurred"); 173 } 174 175 return pool; 176 } 177 178 /* 179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 180 * the pool data structures. 181 */ 182 static void 183 svcpool_cleanup(SVCPOOL *pool) 184 { 185 SVCGROUP *grp; 186 SVCXPRT *xprt, *nxprt; 187 struct svc_callout *s; 188 struct svc_loss_callout *sl; 189 struct svcxprt_list cleanup; 190 int g; 191 192 TAILQ_INIT(&cleanup); 193 194 for (g = 0; g < SVC_MAXGROUPS; g++) { 195 grp = &pool->sp_groups[g]; 196 mtx_lock(&grp->sg_lock); 197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 198 xprt_unregister_locked(xprt); 199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 200 } 201 mtx_unlock(&grp->sg_lock); 202 } 203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 204 SVC_RELEASE(xprt); 205 } 206 207 mtx_lock(&pool->sp_lock); 208 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 209 mtx_unlock(&pool->sp_lock); 210 svc_unreg(pool, s->sc_prog, s->sc_vers); 211 mtx_lock(&pool->sp_lock); 212 } 213 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 214 mtx_unlock(&pool->sp_lock); 215 svc_loss_unreg(pool, sl->slc_dispatch); 216 mtx_lock(&pool->sp_lock); 217 } 218 mtx_unlock(&pool->sp_lock); 219 } 220 221 void 222 svcpool_destroy(SVCPOOL *pool) 223 { 224 SVCGROUP *grp; 225 int g; 226 227 svcpool_cleanup(pool); 228 229 for (g = 0; g < SVC_MAXGROUPS; g++) { 230 grp = &pool->sp_groups[g]; 231 mtx_destroy(&grp->sg_lock); 232 } 233 mtx_destroy(&pool->sp_lock); 234 235 if (pool->sp_rcache) 236 replay_freecache(pool->sp_rcache); 237 238 sysctl_ctx_free(&pool->sp_sysctl); 239 free(pool, M_RPC); 240 } 241 242 /* 243 * Similar to svcpool_destroy(), except that it does not destroy the actual 244 * data structures. As such, "pool" may be used again. 245 */ 246 void 247 svcpool_close(SVCPOOL *pool) 248 { 249 SVCGROUP *grp; 250 int g; 251 252 svcpool_cleanup(pool); 253 254 /* Now, initialize the pool's state for a fresh svc_run() call. */ 255 mtx_lock(&pool->sp_lock); 256 pool->sp_state = SVCPOOL_INIT; 257 mtx_unlock(&pool->sp_lock); 258 for (g = 0; g < SVC_MAXGROUPS; g++) { 259 grp = &pool->sp_groups[g]; 260 mtx_lock(&grp->sg_lock); 261 grp->sg_state = SVCPOOL_ACTIVE; 262 mtx_unlock(&grp->sg_lock); 263 } 264 } 265 266 /* 267 * Sysctl handler to get the present thread count on a pool 268 */ 269 static int 270 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 271 { 272 SVCPOOL *pool; 273 int threads, error, g; 274 275 pool = oidp->oid_arg1; 276 threads = 0; 277 mtx_lock(&pool->sp_lock); 278 for (g = 0; g < pool->sp_groupcount; g++) 279 threads += pool->sp_groups[g].sg_threadcount; 280 mtx_unlock(&pool->sp_lock); 281 error = sysctl_handle_int(oidp, &threads, 0, req); 282 return (error); 283 } 284 285 /* 286 * Sysctl handler to set the minimum thread count on a pool 287 */ 288 static int 289 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 290 { 291 SVCPOOL *pool; 292 int newminthreads, error, g; 293 294 pool = oidp->oid_arg1; 295 newminthreads = pool->sp_minthreads; 296 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 297 if (error == 0 && newminthreads != pool->sp_minthreads) { 298 if (newminthreads > pool->sp_maxthreads) 299 return (EINVAL); 300 mtx_lock(&pool->sp_lock); 301 pool->sp_minthreads = newminthreads; 302 for (g = 0; g < pool->sp_groupcount; g++) { 303 pool->sp_groups[g].sg_minthreads = max(1, 304 pool->sp_minthreads / pool->sp_groupcount); 305 } 306 mtx_unlock(&pool->sp_lock); 307 } 308 return (error); 309 } 310 311 /* 312 * Sysctl handler to set the maximum thread count on a pool 313 */ 314 static int 315 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 316 { 317 SVCPOOL *pool; 318 int newmaxthreads, error, g; 319 320 pool = oidp->oid_arg1; 321 newmaxthreads = pool->sp_maxthreads; 322 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 323 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 324 if (newmaxthreads < pool->sp_minthreads) 325 return (EINVAL); 326 mtx_lock(&pool->sp_lock); 327 pool->sp_maxthreads = newmaxthreads; 328 for (g = 0; g < pool->sp_groupcount; g++) { 329 pool->sp_groups[g].sg_maxthreads = max(1, 330 pool->sp_maxthreads / pool->sp_groupcount); 331 } 332 mtx_unlock(&pool->sp_lock); 333 } 334 return (error); 335 } 336 337 /* 338 * Activate a transport handle. 339 */ 340 void 341 xprt_register(SVCXPRT *xprt) 342 { 343 SVCPOOL *pool = xprt->xp_pool; 344 SVCGROUP *grp; 345 int g; 346 347 SVC_ACQUIRE(xprt); 348 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 349 xprt->xp_group = grp = &pool->sp_groups[g]; 350 mtx_lock(&grp->sg_lock); 351 xprt->xp_registered = TRUE; 352 xprt->xp_active = FALSE; 353 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 354 mtx_unlock(&grp->sg_lock); 355 } 356 357 /* 358 * De-activate a transport handle. Note: the locked version doesn't 359 * release the transport - caller must do that after dropping the pool 360 * lock. 361 */ 362 static void 363 xprt_unregister_locked(SVCXPRT *xprt) 364 { 365 SVCGROUP *grp = xprt->xp_group; 366 367 mtx_assert(&grp->sg_lock, MA_OWNED); 368 KASSERT(xprt->xp_registered == TRUE, 369 ("xprt_unregister_locked: not registered")); 370 xprt_inactive_locked(xprt); 371 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 372 xprt->xp_registered = FALSE; 373 } 374 375 void 376 xprt_unregister(SVCXPRT *xprt) 377 { 378 SVCGROUP *grp = xprt->xp_group; 379 380 mtx_lock(&grp->sg_lock); 381 if (xprt->xp_registered == FALSE) { 382 /* Already unregistered by another thread */ 383 mtx_unlock(&grp->sg_lock); 384 return; 385 } 386 xprt_unregister_locked(xprt); 387 mtx_unlock(&grp->sg_lock); 388 389 SVC_RELEASE(xprt); 390 } 391 392 /* 393 * Attempt to assign a service thread to this transport. 394 */ 395 static int 396 xprt_assignthread(SVCXPRT *xprt) 397 { 398 SVCGROUP *grp = xprt->xp_group; 399 SVCTHREAD *st; 400 401 mtx_assert(&grp->sg_lock, MA_OWNED); 402 st = LIST_FIRST(&grp->sg_idlethreads); 403 if (st) { 404 LIST_REMOVE(st, st_ilink); 405 SVC_ACQUIRE(xprt); 406 xprt->xp_thread = st; 407 st->st_xprt = xprt; 408 cv_signal(&st->st_cond); 409 return (TRUE); 410 } else { 411 /* 412 * See if we can create a new thread. The 413 * actual thread creation happens in 414 * svc_run_internal because our locking state 415 * is poorly defined (we are typically called 416 * from a socket upcall). Don't create more 417 * than one thread per second. 418 */ 419 if (grp->sg_state == SVCPOOL_ACTIVE 420 && grp->sg_lastcreatetime < time_uptime 421 && grp->sg_threadcount < grp->sg_maxthreads) { 422 grp->sg_state = SVCPOOL_THREADWANTED; 423 } 424 } 425 return (FALSE); 426 } 427 428 void 429 xprt_active(SVCXPRT *xprt) 430 { 431 SVCGROUP *grp = xprt->xp_group; 432 433 mtx_lock(&grp->sg_lock); 434 435 if (!xprt->xp_registered) { 436 /* 437 * Race with xprt_unregister - we lose. 438 */ 439 mtx_unlock(&grp->sg_lock); 440 return; 441 } 442 443 if (!xprt->xp_active) { 444 xprt->xp_active = TRUE; 445 if (xprt->xp_thread == NULL) { 446 if (!svc_request_space_available(xprt->xp_pool) || 447 !xprt_assignthread(xprt)) 448 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 449 xp_alink); 450 } 451 } 452 453 mtx_unlock(&grp->sg_lock); 454 } 455 456 void 457 xprt_inactive_locked(SVCXPRT *xprt) 458 { 459 SVCGROUP *grp = xprt->xp_group; 460 461 mtx_assert(&grp->sg_lock, MA_OWNED); 462 if (xprt->xp_active) { 463 if (xprt->xp_thread == NULL) 464 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 465 xprt->xp_active = FALSE; 466 } 467 } 468 469 void 470 xprt_inactive(SVCXPRT *xprt) 471 { 472 SVCGROUP *grp = xprt->xp_group; 473 474 mtx_lock(&grp->sg_lock); 475 xprt_inactive_locked(xprt); 476 mtx_unlock(&grp->sg_lock); 477 } 478 479 /* 480 * Variant of xprt_inactive() for use only when sure that port is 481 * assigned to thread. For example, within receive handlers. 482 */ 483 void 484 xprt_inactive_self(SVCXPRT *xprt) 485 { 486 487 KASSERT(xprt->xp_thread != NULL, 488 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 489 xprt->xp_active = FALSE; 490 } 491 492 /* 493 * Add a service program to the callout list. 494 * The dispatch routine will be called when a rpc request for this 495 * program number comes in. 496 */ 497 bool_t 498 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 499 void (*dispatch)(struct svc_req *, SVCXPRT *), 500 const struct netconfig *nconf) 501 { 502 SVCPOOL *pool = xprt->xp_pool; 503 struct svc_callout *s; 504 char *netid = NULL; 505 int flag = 0; 506 507 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 508 509 if (xprt->xp_netid) { 510 netid = strdup(xprt->xp_netid, M_RPC); 511 flag = 1; 512 } else if (nconf && nconf->nc_netid) { 513 netid = strdup(nconf->nc_netid, M_RPC); 514 flag = 1; 515 } /* must have been created with svc_raw_create */ 516 if ((netid == NULL) && (flag == 1)) { 517 return (FALSE); 518 } 519 520 mtx_lock(&pool->sp_lock); 521 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 522 if (netid) 523 free(netid, M_RPC); 524 if (s->sc_dispatch == dispatch) 525 goto rpcb_it; /* he is registering another xptr */ 526 mtx_unlock(&pool->sp_lock); 527 return (FALSE); 528 } 529 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 530 if (s == NULL) { 531 if (netid) 532 free(netid, M_RPC); 533 mtx_unlock(&pool->sp_lock); 534 return (FALSE); 535 } 536 537 s->sc_prog = prog; 538 s->sc_vers = vers; 539 s->sc_dispatch = dispatch; 540 s->sc_netid = netid; 541 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 542 543 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 544 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 545 546 rpcb_it: 547 mtx_unlock(&pool->sp_lock); 548 /* now register the information with the local binder service */ 549 if (nconf) { 550 bool_t dummy; 551 struct netconfig tnc; 552 struct netbuf nb; 553 tnc = *nconf; 554 nb.buf = &xprt->xp_ltaddr; 555 nb.len = xprt->xp_ltaddr.ss_len; 556 dummy = rpcb_set(prog, vers, &tnc, &nb); 557 return (dummy); 558 } 559 return (TRUE); 560 } 561 562 /* 563 * Remove a service program from the callout list. 564 */ 565 void 566 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 567 { 568 struct svc_callout *s; 569 570 /* unregister the information anyway */ 571 (void) rpcb_unset(prog, vers, NULL); 572 mtx_lock(&pool->sp_lock); 573 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 574 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 575 if (s->sc_netid) 576 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 577 mem_free(s, sizeof (struct svc_callout)); 578 } 579 mtx_unlock(&pool->sp_lock); 580 } 581 582 /* 583 * Add a service connection loss program to the callout list. 584 * The dispatch routine will be called when some port in ths pool die. 585 */ 586 bool_t 587 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 588 { 589 SVCPOOL *pool = xprt->xp_pool; 590 struct svc_loss_callout *s; 591 592 mtx_lock(&pool->sp_lock); 593 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 594 if (s->slc_dispatch == dispatch) 595 break; 596 } 597 if (s != NULL) { 598 mtx_unlock(&pool->sp_lock); 599 return (TRUE); 600 } 601 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 602 if (s == NULL) { 603 mtx_unlock(&pool->sp_lock); 604 return (FALSE); 605 } 606 s->slc_dispatch = dispatch; 607 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 608 mtx_unlock(&pool->sp_lock); 609 return (TRUE); 610 } 611 612 /* 613 * Remove a service connection loss program from the callout list. 614 */ 615 void 616 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 617 { 618 struct svc_loss_callout *s; 619 620 mtx_lock(&pool->sp_lock); 621 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 622 if (s->slc_dispatch == dispatch) { 623 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 624 free(s, M_RPC); 625 break; 626 } 627 } 628 mtx_unlock(&pool->sp_lock); 629 } 630 631 /* ********************** CALLOUT list related stuff ************* */ 632 633 /* 634 * Search the callout list for a program number, return the callout 635 * struct. 636 */ 637 static struct svc_callout * 638 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 639 { 640 struct svc_callout *s; 641 642 mtx_assert(&pool->sp_lock, MA_OWNED); 643 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 644 if (s->sc_prog == prog && s->sc_vers == vers 645 && (netid == NULL || s->sc_netid == NULL || 646 strcmp(netid, s->sc_netid) == 0)) 647 break; 648 } 649 650 return (s); 651 } 652 653 /* ******************* REPLY GENERATION ROUTINES ************ */ 654 655 static bool_t 656 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 657 struct mbuf *body) 658 { 659 SVCXPRT *xprt = rqstp->rq_xprt; 660 bool_t ok; 661 662 if (rqstp->rq_args) { 663 m_freem(rqstp->rq_args); 664 rqstp->rq_args = NULL; 665 } 666 667 if (xprt->xp_pool->sp_rcache) 668 replay_setreply(xprt->xp_pool->sp_rcache, 669 rply, svc_getrpccaller(rqstp), body); 670 671 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 672 return (FALSE); 673 674 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 675 if (rqstp->rq_addr) { 676 free(rqstp->rq_addr, M_SONAME); 677 rqstp->rq_addr = NULL; 678 } 679 680 return (ok); 681 } 682 683 /* 684 * Send a reply to an rpc request 685 */ 686 bool_t 687 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 688 { 689 struct rpc_msg rply; 690 struct mbuf *m; 691 XDR xdrs; 692 bool_t ok; 693 694 rply.rm_xid = rqstp->rq_xid; 695 rply.rm_direction = REPLY; 696 rply.rm_reply.rp_stat = MSG_ACCEPTED; 697 rply.acpted_rply.ar_verf = rqstp->rq_verf; 698 rply.acpted_rply.ar_stat = SUCCESS; 699 rply.acpted_rply.ar_results.where = NULL; 700 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 701 702 m = m_getcl(M_WAITOK, MT_DATA, 0); 703 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 704 ok = xdr_results(&xdrs, xdr_location); 705 XDR_DESTROY(&xdrs); 706 707 if (ok) { 708 return (svc_sendreply_common(rqstp, &rply, m)); 709 } else { 710 m_freem(m); 711 return (FALSE); 712 } 713 } 714 715 bool_t 716 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 717 { 718 struct rpc_msg rply; 719 720 rply.rm_xid = rqstp->rq_xid; 721 rply.rm_direction = REPLY; 722 rply.rm_reply.rp_stat = MSG_ACCEPTED; 723 rply.acpted_rply.ar_verf = rqstp->rq_verf; 724 rply.acpted_rply.ar_stat = SUCCESS; 725 rply.acpted_rply.ar_results.where = NULL; 726 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 727 728 return (svc_sendreply_common(rqstp, &rply, m)); 729 } 730 731 /* 732 * No procedure error reply 733 */ 734 void 735 svcerr_noproc(struct svc_req *rqstp) 736 { 737 SVCXPRT *xprt = rqstp->rq_xprt; 738 struct rpc_msg rply; 739 740 rply.rm_xid = rqstp->rq_xid; 741 rply.rm_direction = REPLY; 742 rply.rm_reply.rp_stat = MSG_ACCEPTED; 743 rply.acpted_rply.ar_verf = rqstp->rq_verf; 744 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 745 746 if (xprt->xp_pool->sp_rcache) 747 replay_setreply(xprt->xp_pool->sp_rcache, 748 &rply, svc_getrpccaller(rqstp), NULL); 749 750 svc_sendreply_common(rqstp, &rply, NULL); 751 } 752 753 /* 754 * Can't decode args error reply 755 */ 756 void 757 svcerr_decode(struct svc_req *rqstp) 758 { 759 SVCXPRT *xprt = rqstp->rq_xprt; 760 struct rpc_msg rply; 761 762 rply.rm_xid = rqstp->rq_xid; 763 rply.rm_direction = REPLY; 764 rply.rm_reply.rp_stat = MSG_ACCEPTED; 765 rply.acpted_rply.ar_verf = rqstp->rq_verf; 766 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 767 768 if (xprt->xp_pool->sp_rcache) 769 replay_setreply(xprt->xp_pool->sp_rcache, 770 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 771 772 svc_sendreply_common(rqstp, &rply, NULL); 773 } 774 775 /* 776 * Some system error 777 */ 778 void 779 svcerr_systemerr(struct svc_req *rqstp) 780 { 781 SVCXPRT *xprt = rqstp->rq_xprt; 782 struct rpc_msg rply; 783 784 rply.rm_xid = rqstp->rq_xid; 785 rply.rm_direction = REPLY; 786 rply.rm_reply.rp_stat = MSG_ACCEPTED; 787 rply.acpted_rply.ar_verf = rqstp->rq_verf; 788 rply.acpted_rply.ar_stat = SYSTEM_ERR; 789 790 if (xprt->xp_pool->sp_rcache) 791 replay_setreply(xprt->xp_pool->sp_rcache, 792 &rply, svc_getrpccaller(rqstp), NULL); 793 794 svc_sendreply_common(rqstp, &rply, NULL); 795 } 796 797 /* 798 * Authentication error reply 799 */ 800 void 801 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 802 { 803 SVCXPRT *xprt = rqstp->rq_xprt; 804 struct rpc_msg rply; 805 806 rply.rm_xid = rqstp->rq_xid; 807 rply.rm_direction = REPLY; 808 rply.rm_reply.rp_stat = MSG_DENIED; 809 rply.rjcted_rply.rj_stat = AUTH_ERROR; 810 rply.rjcted_rply.rj_why = why; 811 812 if (xprt->xp_pool->sp_rcache) 813 replay_setreply(xprt->xp_pool->sp_rcache, 814 &rply, svc_getrpccaller(rqstp), NULL); 815 816 svc_sendreply_common(rqstp, &rply, NULL); 817 } 818 819 /* 820 * Auth too weak error reply 821 */ 822 void 823 svcerr_weakauth(struct svc_req *rqstp) 824 { 825 826 svcerr_auth(rqstp, AUTH_TOOWEAK); 827 } 828 829 /* 830 * Program unavailable error reply 831 */ 832 void 833 svcerr_noprog(struct svc_req *rqstp) 834 { 835 SVCXPRT *xprt = rqstp->rq_xprt; 836 struct rpc_msg rply; 837 838 rply.rm_xid = rqstp->rq_xid; 839 rply.rm_direction = REPLY; 840 rply.rm_reply.rp_stat = MSG_ACCEPTED; 841 rply.acpted_rply.ar_verf = rqstp->rq_verf; 842 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 843 844 if (xprt->xp_pool->sp_rcache) 845 replay_setreply(xprt->xp_pool->sp_rcache, 846 &rply, svc_getrpccaller(rqstp), NULL); 847 848 svc_sendreply_common(rqstp, &rply, NULL); 849 } 850 851 /* 852 * Program version mismatch error reply 853 */ 854 void 855 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 856 { 857 SVCXPRT *xprt = rqstp->rq_xprt; 858 struct rpc_msg rply; 859 860 rply.rm_xid = rqstp->rq_xid; 861 rply.rm_direction = REPLY; 862 rply.rm_reply.rp_stat = MSG_ACCEPTED; 863 rply.acpted_rply.ar_verf = rqstp->rq_verf; 864 rply.acpted_rply.ar_stat = PROG_MISMATCH; 865 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 866 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 867 868 if (xprt->xp_pool->sp_rcache) 869 replay_setreply(xprt->xp_pool->sp_rcache, 870 &rply, svc_getrpccaller(rqstp), NULL); 871 872 svc_sendreply_common(rqstp, &rply, NULL); 873 } 874 875 /* 876 * Allocate a new server transport structure. All fields are 877 * initialized to zero and xp_p3 is initialized to point at an 878 * extension structure to hold various flags and authentication 879 * parameters. 880 */ 881 SVCXPRT * 882 svc_xprt_alloc(void) 883 { 884 SVCXPRT *xprt; 885 SVCXPRT_EXT *ext; 886 887 xprt = mem_alloc(sizeof(SVCXPRT)); 888 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 889 xprt->xp_p3 = ext; 890 refcount_init(&xprt->xp_refs, 1); 891 892 return (xprt); 893 } 894 895 /* 896 * Free a server transport structure. 897 */ 898 void 899 svc_xprt_free(SVCXPRT *xprt) 900 { 901 902 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 903 mem_free(xprt, sizeof(SVCXPRT)); 904 } 905 906 /* ******************* SERVER INPUT STUFF ******************* */ 907 908 /* 909 * Read RPC requests from a transport and queue them to be 910 * executed. We handle authentication and replay cache replies here. 911 * Actually dispatching the RPC is deferred till svc_executereq. 912 */ 913 static enum xprt_stat 914 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 915 { 916 SVCPOOL *pool = xprt->xp_pool; 917 struct svc_req *r; 918 struct rpc_msg msg; 919 struct mbuf *args; 920 struct svc_loss_callout *s; 921 enum xprt_stat stat; 922 923 /* now receive msgs from xprtprt (support batch calls) */ 924 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 925 926 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 927 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 928 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 929 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 930 enum auth_stat why; 931 932 /* 933 * Handle replays and authenticate before queuing the 934 * request to be executed. 935 */ 936 SVC_ACQUIRE(xprt); 937 r->rq_xprt = xprt; 938 if (pool->sp_rcache) { 939 struct rpc_msg repmsg; 940 struct mbuf *repbody; 941 enum replay_state rs; 942 rs = replay_find(pool->sp_rcache, &msg, 943 svc_getrpccaller(r), &repmsg, &repbody); 944 switch (rs) { 945 case RS_NEW: 946 break; 947 case RS_DONE: 948 SVC_REPLY(xprt, &repmsg, r->rq_addr, 949 repbody, &r->rq_reply_seq); 950 if (r->rq_addr) { 951 free(r->rq_addr, M_SONAME); 952 r->rq_addr = NULL; 953 } 954 m_freem(args); 955 goto call_done; 956 957 default: 958 m_freem(args); 959 goto call_done; 960 } 961 } 962 963 r->rq_xid = msg.rm_xid; 964 r->rq_prog = msg.rm_call.cb_prog; 965 r->rq_vers = msg.rm_call.cb_vers; 966 r->rq_proc = msg.rm_call.cb_proc; 967 r->rq_size = sizeof(*r) + m_length(args, NULL); 968 r->rq_args = args; 969 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 970 /* 971 * RPCSEC_GSS uses this return code 972 * for requests that form part of its 973 * context establishment protocol and 974 * should not be dispatched to the 975 * application. 976 */ 977 if (why != RPCSEC_GSS_NODISPATCH) 978 svcerr_auth(r, why); 979 goto call_done; 980 } 981 982 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 983 svcerr_decode(r); 984 goto call_done; 985 } 986 987 /* 988 * Everything checks out, return request to caller. 989 */ 990 *rqstp_ret = r; 991 r = NULL; 992 } 993 call_done: 994 if (r) { 995 svc_freereq(r); 996 r = NULL; 997 } 998 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 999 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1000 (*s->slc_dispatch)(xprt); 1001 xprt_unregister(xprt); 1002 } 1003 1004 return (stat); 1005 } 1006 1007 static void 1008 svc_executereq(struct svc_req *rqstp) 1009 { 1010 SVCXPRT *xprt = rqstp->rq_xprt; 1011 SVCPOOL *pool = xprt->xp_pool; 1012 int prog_found; 1013 rpcvers_t low_vers; 1014 rpcvers_t high_vers; 1015 struct svc_callout *s; 1016 1017 /* now match message with a registered service*/ 1018 prog_found = FALSE; 1019 low_vers = (rpcvers_t) -1L; 1020 high_vers = (rpcvers_t) 0L; 1021 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1022 if (s->sc_prog == rqstp->rq_prog) { 1023 if (s->sc_vers == rqstp->rq_vers) { 1024 /* 1025 * We hand ownership of r to the 1026 * dispatch method - they must call 1027 * svc_freereq. 1028 */ 1029 (*s->sc_dispatch)(rqstp, xprt); 1030 return; 1031 } /* found correct version */ 1032 prog_found = TRUE; 1033 if (s->sc_vers < low_vers) 1034 low_vers = s->sc_vers; 1035 if (s->sc_vers > high_vers) 1036 high_vers = s->sc_vers; 1037 } /* found correct program */ 1038 } 1039 1040 /* 1041 * if we got here, the program or version 1042 * is not served ... 1043 */ 1044 if (prog_found) 1045 svcerr_progvers(rqstp, low_vers, high_vers); 1046 else 1047 svcerr_noprog(rqstp); 1048 1049 svc_freereq(rqstp); 1050 } 1051 1052 static void 1053 svc_checkidle(SVCGROUP *grp) 1054 { 1055 SVCXPRT *xprt, *nxprt; 1056 time_t timo; 1057 struct svcxprt_list cleanup; 1058 1059 TAILQ_INIT(&cleanup); 1060 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1061 /* 1062 * Only some transports have idle timers. Don't time 1063 * something out which is just waking up. 1064 */ 1065 if (!xprt->xp_idletimeout || xprt->xp_thread) 1066 continue; 1067 1068 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1069 if (time_uptime > timo) { 1070 xprt_unregister_locked(xprt); 1071 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1072 } 1073 } 1074 1075 mtx_unlock(&grp->sg_lock); 1076 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1077 SVC_RELEASE(xprt); 1078 } 1079 mtx_lock(&grp->sg_lock); 1080 } 1081 1082 static void 1083 svc_assign_waiting_sockets(SVCPOOL *pool) 1084 { 1085 SVCGROUP *grp; 1086 SVCXPRT *xprt; 1087 int g; 1088 1089 for (g = 0; g < pool->sp_groupcount; g++) { 1090 grp = &pool->sp_groups[g]; 1091 mtx_lock(&grp->sg_lock); 1092 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1093 if (xprt_assignthread(xprt)) 1094 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1095 else 1096 break; 1097 } 1098 mtx_unlock(&grp->sg_lock); 1099 } 1100 } 1101 1102 static void 1103 svc_change_space_used(SVCPOOL *pool, long delta) 1104 { 1105 unsigned long value; 1106 1107 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1108 if (delta > 0) { 1109 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1110 pool->sp_space_throttled = TRUE; 1111 pool->sp_space_throttle_count++; 1112 } 1113 if (value > pool->sp_space_used_highest) 1114 pool->sp_space_used_highest = value; 1115 } else { 1116 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1117 pool->sp_space_throttled = FALSE; 1118 svc_assign_waiting_sockets(pool); 1119 } 1120 } 1121 } 1122 1123 static bool_t 1124 svc_request_space_available(SVCPOOL *pool) 1125 { 1126 1127 if (pool->sp_space_throttled) 1128 return (FALSE); 1129 return (TRUE); 1130 } 1131 1132 static void 1133 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1134 { 1135 SVCPOOL *pool = grp->sg_pool; 1136 SVCTHREAD *st, *stpref; 1137 SVCXPRT *xprt; 1138 enum xprt_stat stat; 1139 struct svc_req *rqstp; 1140 struct proc *p; 1141 long sz; 1142 int error; 1143 1144 st = mem_alloc(sizeof(*st)); 1145 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1146 st->st_pool = pool; 1147 st->st_xprt = NULL; 1148 STAILQ_INIT(&st->st_reqs); 1149 cv_init(&st->st_cond, "rpcsvc"); 1150 1151 mtx_lock(&grp->sg_lock); 1152 1153 /* 1154 * If we are a new thread which was spawned to cope with 1155 * increased load, set the state back to SVCPOOL_ACTIVE. 1156 */ 1157 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1158 grp->sg_state = SVCPOOL_ACTIVE; 1159 1160 while (grp->sg_state != SVCPOOL_CLOSING) { 1161 /* 1162 * Create new thread if requested. 1163 */ 1164 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1165 grp->sg_state = SVCPOOL_THREADSTARTING; 1166 grp->sg_lastcreatetime = time_uptime; 1167 mtx_unlock(&grp->sg_lock); 1168 svc_new_thread(grp); 1169 mtx_lock(&grp->sg_lock); 1170 continue; 1171 } 1172 1173 /* 1174 * Check for idle transports once per second. 1175 */ 1176 if (time_uptime > grp->sg_lastidlecheck) { 1177 grp->sg_lastidlecheck = time_uptime; 1178 svc_checkidle(grp); 1179 } 1180 1181 xprt = st->st_xprt; 1182 if (!xprt) { 1183 /* 1184 * Enforce maxthreads count. 1185 */ 1186 if (grp->sg_threadcount > grp->sg_maxthreads) 1187 break; 1188 1189 /* 1190 * Before sleeping, see if we can find an 1191 * active transport which isn't being serviced 1192 * by a thread. 1193 */ 1194 if (svc_request_space_available(pool) && 1195 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1196 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1197 SVC_ACQUIRE(xprt); 1198 xprt->xp_thread = st; 1199 st->st_xprt = xprt; 1200 continue; 1201 } 1202 1203 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1204 if (ismaster || (!ismaster && 1205 grp->sg_threadcount > grp->sg_minthreads)) 1206 error = cv_timedwait_sig(&st->st_cond, 1207 &grp->sg_lock, 5 * hz); 1208 else 1209 error = cv_wait_sig(&st->st_cond, 1210 &grp->sg_lock); 1211 if (st->st_xprt == NULL) 1212 LIST_REMOVE(st, st_ilink); 1213 1214 /* 1215 * Reduce worker thread count when idle. 1216 */ 1217 if (error == EWOULDBLOCK) { 1218 if (!ismaster 1219 && (grp->sg_threadcount 1220 > grp->sg_minthreads) 1221 && !st->st_xprt) 1222 break; 1223 } else if (error != 0) { 1224 KASSERT(error == EINTR || error == ERESTART, 1225 ("non-signal error %d", error)); 1226 mtx_unlock(&grp->sg_lock); 1227 p = curproc; 1228 PROC_LOCK(p); 1229 if (P_SHOULDSTOP(p) || 1230 (p->p_flag & P_TOTAL_STOP) != 0) { 1231 thread_suspend_check(0); 1232 PROC_UNLOCK(p); 1233 mtx_lock(&grp->sg_lock); 1234 } else { 1235 PROC_UNLOCK(p); 1236 svc_exit(pool); 1237 mtx_lock(&grp->sg_lock); 1238 break; 1239 } 1240 } 1241 continue; 1242 } 1243 mtx_unlock(&grp->sg_lock); 1244 1245 /* 1246 * Drain the transport socket and queue up any RPCs. 1247 */ 1248 xprt->xp_lastactive = time_uptime; 1249 do { 1250 if (!svc_request_space_available(pool)) 1251 break; 1252 rqstp = NULL; 1253 stat = svc_getreq(xprt, &rqstp); 1254 if (rqstp) { 1255 svc_change_space_used(pool, rqstp->rq_size); 1256 /* 1257 * See if the application has a preference 1258 * for some other thread. 1259 */ 1260 if (pool->sp_assign) { 1261 stpref = pool->sp_assign(st, rqstp); 1262 rqstp->rq_thread = stpref; 1263 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1264 rqstp, rq_link); 1265 mtx_unlock(&stpref->st_lock); 1266 if (stpref != st) 1267 rqstp = NULL; 1268 } else { 1269 rqstp->rq_thread = st; 1270 STAILQ_INSERT_TAIL(&st->st_reqs, 1271 rqstp, rq_link); 1272 } 1273 } 1274 } while (rqstp == NULL && stat == XPRT_MOREREQS 1275 && grp->sg_state != SVCPOOL_CLOSING); 1276 1277 /* 1278 * Move this transport to the end of the active list to 1279 * ensure fairness when multiple transports are active. 1280 * If this was the last queued request, svc_getreq will end 1281 * up calling xprt_inactive to remove from the active list. 1282 */ 1283 mtx_lock(&grp->sg_lock); 1284 xprt->xp_thread = NULL; 1285 st->st_xprt = NULL; 1286 if (xprt->xp_active) { 1287 if (!svc_request_space_available(pool) || 1288 !xprt_assignthread(xprt)) 1289 TAILQ_INSERT_TAIL(&grp->sg_active, 1290 xprt, xp_alink); 1291 } 1292 mtx_unlock(&grp->sg_lock); 1293 SVC_RELEASE(xprt); 1294 1295 /* 1296 * Execute what we have queued. 1297 */ 1298 mtx_lock(&st->st_lock); 1299 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1300 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1301 mtx_unlock(&st->st_lock); 1302 sz = (long)rqstp->rq_size; 1303 svc_executereq(rqstp); 1304 svc_change_space_used(pool, -sz); 1305 mtx_lock(&st->st_lock); 1306 } 1307 mtx_unlock(&st->st_lock); 1308 mtx_lock(&grp->sg_lock); 1309 } 1310 1311 if (st->st_xprt) { 1312 xprt = st->st_xprt; 1313 st->st_xprt = NULL; 1314 SVC_RELEASE(xprt); 1315 } 1316 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1317 mtx_destroy(&st->st_lock); 1318 cv_destroy(&st->st_cond); 1319 mem_free(st, sizeof(*st)); 1320 1321 grp->sg_threadcount--; 1322 if (!ismaster) 1323 wakeup(grp); 1324 mtx_unlock(&grp->sg_lock); 1325 } 1326 1327 static void 1328 svc_thread_start(void *arg) 1329 { 1330 1331 svc_run_internal((SVCGROUP *) arg, FALSE); 1332 kthread_exit(); 1333 } 1334 1335 static void 1336 svc_new_thread(SVCGROUP *grp) 1337 { 1338 SVCPOOL *pool = grp->sg_pool; 1339 struct thread *td; 1340 1341 mtx_lock(&grp->sg_lock); 1342 grp->sg_threadcount++; 1343 mtx_unlock(&grp->sg_lock); 1344 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1345 "%s: service", pool->sp_name); 1346 } 1347 1348 void 1349 svc_run(SVCPOOL *pool) 1350 { 1351 int g, i; 1352 struct proc *p; 1353 struct thread *td; 1354 SVCGROUP *grp; 1355 1356 p = curproc; 1357 td = curthread; 1358 snprintf(td->td_name, sizeof(td->td_name), 1359 "%s: master", pool->sp_name); 1360 pool->sp_state = SVCPOOL_ACTIVE; 1361 pool->sp_proc = p; 1362 1363 /* Choose group count based on number of threads and CPUs. */ 1364 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1365 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1366 for (g = 0; g < pool->sp_groupcount; g++) { 1367 grp = &pool->sp_groups[g]; 1368 grp->sg_minthreads = max(1, 1369 pool->sp_minthreads / pool->sp_groupcount); 1370 grp->sg_maxthreads = max(1, 1371 pool->sp_maxthreads / pool->sp_groupcount); 1372 grp->sg_lastcreatetime = time_uptime; 1373 } 1374 1375 /* Starting threads */ 1376 pool->sp_groups[0].sg_threadcount++; 1377 for (g = 0; g < pool->sp_groupcount; g++) { 1378 grp = &pool->sp_groups[g]; 1379 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1380 svc_new_thread(grp); 1381 } 1382 svc_run_internal(&pool->sp_groups[0], TRUE); 1383 1384 /* Waiting for threads to stop. */ 1385 for (g = 0; g < pool->sp_groupcount; g++) { 1386 grp = &pool->sp_groups[g]; 1387 mtx_lock(&grp->sg_lock); 1388 while (grp->sg_threadcount > 0) 1389 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1390 mtx_unlock(&grp->sg_lock); 1391 } 1392 } 1393 1394 void 1395 svc_exit(SVCPOOL *pool) 1396 { 1397 SVCGROUP *grp; 1398 SVCTHREAD *st; 1399 int g; 1400 1401 pool->sp_state = SVCPOOL_CLOSING; 1402 for (g = 0; g < pool->sp_groupcount; g++) { 1403 grp = &pool->sp_groups[g]; 1404 mtx_lock(&grp->sg_lock); 1405 if (grp->sg_state != SVCPOOL_CLOSING) { 1406 grp->sg_state = SVCPOOL_CLOSING; 1407 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1408 cv_signal(&st->st_cond); 1409 } 1410 mtx_unlock(&grp->sg_lock); 1411 } 1412 } 1413 1414 bool_t 1415 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1416 { 1417 struct mbuf *m; 1418 XDR xdrs; 1419 bool_t stat; 1420 1421 m = rqstp->rq_args; 1422 rqstp->rq_args = NULL; 1423 1424 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1425 stat = xargs(&xdrs, args); 1426 XDR_DESTROY(&xdrs); 1427 1428 return (stat); 1429 } 1430 1431 bool_t 1432 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1433 { 1434 XDR xdrs; 1435 1436 if (rqstp->rq_addr) { 1437 free(rqstp->rq_addr, M_SONAME); 1438 rqstp->rq_addr = NULL; 1439 } 1440 1441 xdrs.x_op = XDR_FREE; 1442 return (xargs(&xdrs, args)); 1443 } 1444 1445 void 1446 svc_freereq(struct svc_req *rqstp) 1447 { 1448 SVCTHREAD *st; 1449 SVCPOOL *pool; 1450 1451 st = rqstp->rq_thread; 1452 if (st) { 1453 pool = st->st_pool; 1454 if (pool->sp_done) 1455 pool->sp_done(st, rqstp); 1456 } 1457 1458 if (rqstp->rq_auth.svc_ah_ops) 1459 SVCAUTH_RELEASE(&rqstp->rq_auth); 1460 1461 if (rqstp->rq_xprt) { 1462 SVC_RELEASE(rqstp->rq_xprt); 1463 } 1464 1465 if (rqstp->rq_addr) 1466 free(rqstp->rq_addr, M_SONAME); 1467 1468 if (rqstp->rq_args) 1469 m_freem(rqstp->rq_args); 1470 1471 free(rqstp, M_RPC); 1472 } 1473