1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * svc.c, Server-side remote procedure call interface. 42 * 43 * There are two sets of procedures here. The xprt routines are 44 * for handling transport handles. The svc routines handle the 45 * list of service routines. 46 * 47 * Copyright (C) 1984, Sun Microsystems, Inc. 48 */ 49 50 #include <sys/param.h> 51 #include <sys/lock.h> 52 #include <sys/kernel.h> 53 #include <sys/kthread.h> 54 #include <sys/malloc.h> 55 #include <sys/mbuf.h> 56 #include <sys/mutex.h> 57 #include <sys/proc.h> 58 #include <sys/queue.h> 59 #include <sys/socketvar.h> 60 #include <sys/systm.h> 61 #include <sys/smp.h> 62 #include <sys/sx.h> 63 #include <sys/ucred.h> 64 65 #include <rpc/rpc.h> 66 #include <rpc/rpcb_clnt.h> 67 #include <rpc/replay.h> 68 69 #include <rpc/rpc_com.h> 70 71 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 73 74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 75 char *); 76 static void svc_new_thread(SVCGROUP *grp); 77 static void xprt_unregister_locked(SVCXPRT *xprt); 78 static void svc_change_space_used(SVCPOOL *pool, long delta); 79 static bool_t svc_request_space_available(SVCPOOL *pool); 80 static void svcpool_cleanup(SVCPOOL *pool); 81 82 /* *************** SVCXPRT related stuff **************** */ 83 84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 87 88 SVCPOOL* 89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 90 { 91 SVCPOOL *pool; 92 SVCGROUP *grp; 93 int g; 94 95 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 96 97 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 98 pool->sp_name = name; 99 pool->sp_state = SVCPOOL_INIT; 100 pool->sp_proc = NULL; 101 TAILQ_INIT(&pool->sp_callouts); 102 TAILQ_INIT(&pool->sp_lcallouts); 103 pool->sp_minthreads = 1; 104 pool->sp_maxthreads = 1; 105 pool->sp_groupcount = 1; 106 for (g = 0; g < SVC_MAXGROUPS; g++) { 107 grp = &pool->sp_groups[g]; 108 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 109 grp->sg_pool = pool; 110 grp->sg_state = SVCPOOL_ACTIVE; 111 TAILQ_INIT(&grp->sg_xlist); 112 TAILQ_INIT(&grp->sg_active); 113 LIST_INIT(&grp->sg_idlethreads); 114 grp->sg_minthreads = 1; 115 grp->sg_maxthreads = 1; 116 } 117 118 /* 119 * Don't use more than a quarter of mbuf clusters. Nota bene: 120 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 121 * on LP64 architectures, so cast to u_long to avoid undefined 122 * behavior. (ILP32 architectures cannot have nmbclusters 123 * large enough to overflow for other reasons.) 124 */ 125 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 126 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 127 128 sysctl_ctx_init(&pool->sp_sysctl); 129 if (sysctl_base) { 130 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 132 pool, 0, svcpool_minthread_sysctl, "I", 133 "Minimal number of threads"); 134 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 135 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 136 pool, 0, svcpool_maxthread_sysctl, "I", 137 "Maximal number of threads"); 138 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 139 "threads", CTLTYPE_INT | CTLFLAG_RD, 140 pool, 0, svcpool_threads_sysctl, "I", 141 "Current number of threads"); 142 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 143 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 144 "Number of thread groups"); 145 146 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_used", CTLFLAG_RD, 148 &pool->sp_space_used, 149 "Space in parsed but not handled requests."); 150 151 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 152 "request_space_used_highest", CTLFLAG_RD, 153 &pool->sp_space_used_highest, 154 "Highest space used since reboot."); 155 156 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 157 "request_space_high", CTLFLAG_RW, 158 &pool->sp_space_high, 159 "Maximum space in parsed but not handled requests."); 160 161 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 162 "request_space_low", CTLFLAG_RW, 163 &pool->sp_space_low, 164 "Low water mark for request space."); 165 166 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 167 "request_space_throttled", CTLFLAG_RD, 168 &pool->sp_space_throttled, 0, 169 "Whether nfs requests are currently throttled"); 170 171 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 172 "request_space_throttle_count", CTLFLAG_RD, 173 &pool->sp_space_throttle_count, 0, 174 "Count of times throttling based on request space has occurred"); 175 } 176 177 return pool; 178 } 179 180 /* 181 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 182 * the pool data structures. 183 */ 184 static void 185 svcpool_cleanup(SVCPOOL *pool) 186 { 187 SVCGROUP *grp; 188 SVCXPRT *xprt, *nxprt; 189 struct svc_callout *s; 190 struct svc_loss_callout *sl; 191 struct svcxprt_list cleanup; 192 int g; 193 194 TAILQ_INIT(&cleanup); 195 196 for (g = 0; g < SVC_MAXGROUPS; g++) { 197 grp = &pool->sp_groups[g]; 198 mtx_lock(&grp->sg_lock); 199 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 200 xprt_unregister_locked(xprt); 201 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 202 } 203 mtx_unlock(&grp->sg_lock); 204 } 205 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 206 SVC_RELEASE(xprt); 207 } 208 209 mtx_lock(&pool->sp_lock); 210 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 211 mtx_unlock(&pool->sp_lock); 212 svc_unreg(pool, s->sc_prog, s->sc_vers); 213 mtx_lock(&pool->sp_lock); 214 } 215 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 216 mtx_unlock(&pool->sp_lock); 217 svc_loss_unreg(pool, sl->slc_dispatch); 218 mtx_lock(&pool->sp_lock); 219 } 220 mtx_unlock(&pool->sp_lock); 221 } 222 223 void 224 svcpool_destroy(SVCPOOL *pool) 225 { 226 SVCGROUP *grp; 227 int g; 228 229 svcpool_cleanup(pool); 230 231 for (g = 0; g < SVC_MAXGROUPS; g++) { 232 grp = &pool->sp_groups[g]; 233 mtx_destroy(&grp->sg_lock); 234 } 235 mtx_destroy(&pool->sp_lock); 236 237 if (pool->sp_rcache) 238 replay_freecache(pool->sp_rcache); 239 240 sysctl_ctx_free(&pool->sp_sysctl); 241 free(pool, M_RPC); 242 } 243 244 /* 245 * Similar to svcpool_destroy(), except that it does not destroy the actual 246 * data structures. As such, "pool" may be used again. 247 */ 248 void 249 svcpool_close(SVCPOOL *pool) 250 { 251 SVCGROUP *grp; 252 int g; 253 254 svcpool_cleanup(pool); 255 256 /* Now, initialize the pool's state for a fresh svc_run() call. */ 257 mtx_lock(&pool->sp_lock); 258 pool->sp_state = SVCPOOL_INIT; 259 mtx_unlock(&pool->sp_lock); 260 for (g = 0; g < SVC_MAXGROUPS; g++) { 261 grp = &pool->sp_groups[g]; 262 mtx_lock(&grp->sg_lock); 263 grp->sg_state = SVCPOOL_ACTIVE; 264 mtx_unlock(&grp->sg_lock); 265 } 266 } 267 268 /* 269 * Sysctl handler to get the present thread count on a pool 270 */ 271 static int 272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 273 { 274 SVCPOOL *pool; 275 int threads, error, g; 276 277 pool = oidp->oid_arg1; 278 threads = 0; 279 mtx_lock(&pool->sp_lock); 280 for (g = 0; g < pool->sp_groupcount; g++) 281 threads += pool->sp_groups[g].sg_threadcount; 282 mtx_unlock(&pool->sp_lock); 283 error = sysctl_handle_int(oidp, &threads, 0, req); 284 return (error); 285 } 286 287 /* 288 * Sysctl handler to set the minimum thread count on a pool 289 */ 290 static int 291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 292 { 293 SVCPOOL *pool; 294 int newminthreads, error, g; 295 296 pool = oidp->oid_arg1; 297 newminthreads = pool->sp_minthreads; 298 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 299 if (error == 0 && newminthreads != pool->sp_minthreads) { 300 if (newminthreads > pool->sp_maxthreads) 301 return (EINVAL); 302 mtx_lock(&pool->sp_lock); 303 pool->sp_minthreads = newminthreads; 304 for (g = 0; g < pool->sp_groupcount; g++) { 305 pool->sp_groups[g].sg_minthreads = max(1, 306 pool->sp_minthreads / pool->sp_groupcount); 307 } 308 mtx_unlock(&pool->sp_lock); 309 } 310 return (error); 311 } 312 313 /* 314 * Sysctl handler to set the maximum thread count on a pool 315 */ 316 static int 317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 318 { 319 SVCPOOL *pool; 320 int newmaxthreads, error, g; 321 322 pool = oidp->oid_arg1; 323 newmaxthreads = pool->sp_maxthreads; 324 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 325 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 326 if (newmaxthreads < pool->sp_minthreads) 327 return (EINVAL); 328 mtx_lock(&pool->sp_lock); 329 pool->sp_maxthreads = newmaxthreads; 330 for (g = 0; g < pool->sp_groupcount; g++) { 331 pool->sp_groups[g].sg_maxthreads = max(1, 332 pool->sp_maxthreads / pool->sp_groupcount); 333 } 334 mtx_unlock(&pool->sp_lock); 335 } 336 return (error); 337 } 338 339 /* 340 * Activate a transport handle. 341 */ 342 void 343 xprt_register(SVCXPRT *xprt) 344 { 345 SVCPOOL *pool = xprt->xp_pool; 346 SVCGROUP *grp; 347 int g; 348 349 SVC_ACQUIRE(xprt); 350 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 351 xprt->xp_group = grp = &pool->sp_groups[g]; 352 mtx_lock(&grp->sg_lock); 353 xprt->xp_registered = TRUE; 354 xprt->xp_active = FALSE; 355 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 356 mtx_unlock(&grp->sg_lock); 357 } 358 359 /* 360 * De-activate a transport handle. Note: the locked version doesn't 361 * release the transport - caller must do that after dropping the pool 362 * lock. 363 */ 364 static void 365 xprt_unregister_locked(SVCXPRT *xprt) 366 { 367 SVCGROUP *grp = xprt->xp_group; 368 369 mtx_assert(&grp->sg_lock, MA_OWNED); 370 KASSERT(xprt->xp_registered == TRUE, 371 ("xprt_unregister_locked: not registered")); 372 xprt_inactive_locked(xprt); 373 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 374 xprt->xp_registered = FALSE; 375 } 376 377 void 378 xprt_unregister(SVCXPRT *xprt) 379 { 380 SVCGROUP *grp = xprt->xp_group; 381 382 mtx_lock(&grp->sg_lock); 383 if (xprt->xp_registered == FALSE) { 384 /* Already unregistered by another thread */ 385 mtx_unlock(&grp->sg_lock); 386 return; 387 } 388 xprt_unregister_locked(xprt); 389 mtx_unlock(&grp->sg_lock); 390 391 SVC_RELEASE(xprt); 392 } 393 394 /* 395 * Attempt to assign a service thread to this transport. 396 */ 397 static int 398 xprt_assignthread(SVCXPRT *xprt) 399 { 400 SVCGROUP *grp = xprt->xp_group; 401 SVCTHREAD *st; 402 403 mtx_assert(&grp->sg_lock, MA_OWNED); 404 st = LIST_FIRST(&grp->sg_idlethreads); 405 if (st) { 406 LIST_REMOVE(st, st_ilink); 407 SVC_ACQUIRE(xprt); 408 xprt->xp_thread = st; 409 st->st_xprt = xprt; 410 cv_signal(&st->st_cond); 411 return (TRUE); 412 } else { 413 /* 414 * See if we can create a new thread. The 415 * actual thread creation happens in 416 * svc_run_internal because our locking state 417 * is poorly defined (we are typically called 418 * from a socket upcall). Don't create more 419 * than one thread per second. 420 */ 421 if (grp->sg_state == SVCPOOL_ACTIVE 422 && grp->sg_lastcreatetime < time_uptime 423 && grp->sg_threadcount < grp->sg_maxthreads) { 424 grp->sg_state = SVCPOOL_THREADWANTED; 425 } 426 } 427 return (FALSE); 428 } 429 430 void 431 xprt_active(SVCXPRT *xprt) 432 { 433 SVCGROUP *grp = xprt->xp_group; 434 435 mtx_lock(&grp->sg_lock); 436 437 if (!xprt->xp_registered) { 438 /* 439 * Race with xprt_unregister - we lose. 440 */ 441 mtx_unlock(&grp->sg_lock); 442 return; 443 } 444 445 if (!xprt->xp_active) { 446 xprt->xp_active = TRUE; 447 if (xprt->xp_thread == NULL) { 448 if (!svc_request_space_available(xprt->xp_pool) || 449 !xprt_assignthread(xprt)) 450 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 451 xp_alink); 452 } 453 } 454 455 mtx_unlock(&grp->sg_lock); 456 } 457 458 void 459 xprt_inactive_locked(SVCXPRT *xprt) 460 { 461 SVCGROUP *grp = xprt->xp_group; 462 463 mtx_assert(&grp->sg_lock, MA_OWNED); 464 if (xprt->xp_active) { 465 if (xprt->xp_thread == NULL) 466 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 467 xprt->xp_active = FALSE; 468 } 469 } 470 471 void 472 xprt_inactive(SVCXPRT *xprt) 473 { 474 SVCGROUP *grp = xprt->xp_group; 475 476 mtx_lock(&grp->sg_lock); 477 xprt_inactive_locked(xprt); 478 mtx_unlock(&grp->sg_lock); 479 } 480 481 /* 482 * Variant of xprt_inactive() for use only when sure that port is 483 * assigned to thread. For example, within receive handlers. 484 */ 485 void 486 xprt_inactive_self(SVCXPRT *xprt) 487 { 488 489 KASSERT(xprt->xp_thread != NULL, 490 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 491 xprt->xp_active = FALSE; 492 } 493 494 /* 495 * Add a service program to the callout list. 496 * The dispatch routine will be called when a rpc request for this 497 * program number comes in. 498 */ 499 bool_t 500 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 501 void (*dispatch)(struct svc_req *, SVCXPRT *), 502 const struct netconfig *nconf) 503 { 504 SVCPOOL *pool = xprt->xp_pool; 505 struct svc_callout *s; 506 char *netid = NULL; 507 int flag = 0; 508 509 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 510 511 if (xprt->xp_netid) { 512 netid = strdup(xprt->xp_netid, M_RPC); 513 flag = 1; 514 } else if (nconf && nconf->nc_netid) { 515 netid = strdup(nconf->nc_netid, M_RPC); 516 flag = 1; 517 } /* must have been created with svc_raw_create */ 518 if ((netid == NULL) && (flag == 1)) { 519 return (FALSE); 520 } 521 522 mtx_lock(&pool->sp_lock); 523 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 524 if (netid) 525 free(netid, M_RPC); 526 if (s->sc_dispatch == dispatch) 527 goto rpcb_it; /* he is registering another xptr */ 528 mtx_unlock(&pool->sp_lock); 529 return (FALSE); 530 } 531 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 532 if (s == NULL) { 533 if (netid) 534 free(netid, M_RPC); 535 mtx_unlock(&pool->sp_lock); 536 return (FALSE); 537 } 538 539 s->sc_prog = prog; 540 s->sc_vers = vers; 541 s->sc_dispatch = dispatch; 542 s->sc_netid = netid; 543 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 544 545 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 546 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 547 548 rpcb_it: 549 mtx_unlock(&pool->sp_lock); 550 /* now register the information with the local binder service */ 551 if (nconf) { 552 bool_t dummy; 553 struct netconfig tnc; 554 struct netbuf nb; 555 tnc = *nconf; 556 nb.buf = &xprt->xp_ltaddr; 557 nb.len = xprt->xp_ltaddr.ss_len; 558 dummy = rpcb_set(prog, vers, &tnc, &nb); 559 return (dummy); 560 } 561 return (TRUE); 562 } 563 564 /* 565 * Remove a service program from the callout list. 566 */ 567 void 568 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 569 { 570 struct svc_callout *s; 571 572 /* unregister the information anyway */ 573 (void) rpcb_unset(prog, vers, NULL); 574 mtx_lock(&pool->sp_lock); 575 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 576 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 577 if (s->sc_netid) 578 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 579 mem_free(s, sizeof (struct svc_callout)); 580 } 581 mtx_unlock(&pool->sp_lock); 582 } 583 584 /* 585 * Add a service connection loss program to the callout list. 586 * The dispatch routine will be called when some port in ths pool die. 587 */ 588 bool_t 589 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 590 { 591 SVCPOOL *pool = xprt->xp_pool; 592 struct svc_loss_callout *s; 593 594 mtx_lock(&pool->sp_lock); 595 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 596 if (s->slc_dispatch == dispatch) 597 break; 598 } 599 if (s != NULL) { 600 mtx_unlock(&pool->sp_lock); 601 return (TRUE); 602 } 603 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 604 if (s == NULL) { 605 mtx_unlock(&pool->sp_lock); 606 return (FALSE); 607 } 608 s->slc_dispatch = dispatch; 609 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 610 mtx_unlock(&pool->sp_lock); 611 return (TRUE); 612 } 613 614 /* 615 * Remove a service connection loss program from the callout list. 616 */ 617 void 618 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 619 { 620 struct svc_loss_callout *s; 621 622 mtx_lock(&pool->sp_lock); 623 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 624 if (s->slc_dispatch == dispatch) { 625 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 626 free(s, M_RPC); 627 break; 628 } 629 } 630 mtx_unlock(&pool->sp_lock); 631 } 632 633 /* ********************** CALLOUT list related stuff ************* */ 634 635 /* 636 * Search the callout list for a program number, return the callout 637 * struct. 638 */ 639 static struct svc_callout * 640 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 641 { 642 struct svc_callout *s; 643 644 mtx_assert(&pool->sp_lock, MA_OWNED); 645 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 646 if (s->sc_prog == prog && s->sc_vers == vers 647 && (netid == NULL || s->sc_netid == NULL || 648 strcmp(netid, s->sc_netid) == 0)) 649 break; 650 } 651 652 return (s); 653 } 654 655 /* ******************* REPLY GENERATION ROUTINES ************ */ 656 657 static bool_t 658 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 659 struct mbuf *body) 660 { 661 SVCXPRT *xprt = rqstp->rq_xprt; 662 bool_t ok; 663 664 if (rqstp->rq_args) { 665 m_freem(rqstp->rq_args); 666 rqstp->rq_args = NULL; 667 } 668 669 if (xprt->xp_pool->sp_rcache) 670 replay_setreply(xprt->xp_pool->sp_rcache, 671 rply, svc_getrpccaller(rqstp), body); 672 673 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 674 return (FALSE); 675 676 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 677 if (rqstp->rq_addr) { 678 free(rqstp->rq_addr, M_SONAME); 679 rqstp->rq_addr = NULL; 680 } 681 682 return (ok); 683 } 684 685 /* 686 * Send a reply to an rpc request 687 */ 688 bool_t 689 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 690 { 691 struct rpc_msg rply; 692 struct mbuf *m; 693 XDR xdrs; 694 bool_t ok; 695 696 rply.rm_xid = rqstp->rq_xid; 697 rply.rm_direction = REPLY; 698 rply.rm_reply.rp_stat = MSG_ACCEPTED; 699 rply.acpted_rply.ar_verf = rqstp->rq_verf; 700 rply.acpted_rply.ar_stat = SUCCESS; 701 rply.acpted_rply.ar_results.where = NULL; 702 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 703 704 m = m_getcl(M_WAITOK, MT_DATA, 0); 705 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 706 ok = xdr_results(&xdrs, xdr_location); 707 XDR_DESTROY(&xdrs); 708 709 if (ok) { 710 return (svc_sendreply_common(rqstp, &rply, m)); 711 } else { 712 m_freem(m); 713 return (FALSE); 714 } 715 } 716 717 bool_t 718 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 719 { 720 struct rpc_msg rply; 721 722 rply.rm_xid = rqstp->rq_xid; 723 rply.rm_direction = REPLY; 724 rply.rm_reply.rp_stat = MSG_ACCEPTED; 725 rply.acpted_rply.ar_verf = rqstp->rq_verf; 726 rply.acpted_rply.ar_stat = SUCCESS; 727 rply.acpted_rply.ar_results.where = NULL; 728 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 729 730 return (svc_sendreply_common(rqstp, &rply, m)); 731 } 732 733 /* 734 * No procedure error reply 735 */ 736 void 737 svcerr_noproc(struct svc_req *rqstp) 738 { 739 SVCXPRT *xprt = rqstp->rq_xprt; 740 struct rpc_msg rply; 741 742 rply.rm_xid = rqstp->rq_xid; 743 rply.rm_direction = REPLY; 744 rply.rm_reply.rp_stat = MSG_ACCEPTED; 745 rply.acpted_rply.ar_verf = rqstp->rq_verf; 746 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 747 748 if (xprt->xp_pool->sp_rcache) 749 replay_setreply(xprt->xp_pool->sp_rcache, 750 &rply, svc_getrpccaller(rqstp), NULL); 751 752 svc_sendreply_common(rqstp, &rply, NULL); 753 } 754 755 /* 756 * Can't decode args error reply 757 */ 758 void 759 svcerr_decode(struct svc_req *rqstp) 760 { 761 SVCXPRT *xprt = rqstp->rq_xprt; 762 struct rpc_msg rply; 763 764 rply.rm_xid = rqstp->rq_xid; 765 rply.rm_direction = REPLY; 766 rply.rm_reply.rp_stat = MSG_ACCEPTED; 767 rply.acpted_rply.ar_verf = rqstp->rq_verf; 768 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 769 770 if (xprt->xp_pool->sp_rcache) 771 replay_setreply(xprt->xp_pool->sp_rcache, 772 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 773 774 svc_sendreply_common(rqstp, &rply, NULL); 775 } 776 777 /* 778 * Some system error 779 */ 780 void 781 svcerr_systemerr(struct svc_req *rqstp) 782 { 783 SVCXPRT *xprt = rqstp->rq_xprt; 784 struct rpc_msg rply; 785 786 rply.rm_xid = rqstp->rq_xid; 787 rply.rm_direction = REPLY; 788 rply.rm_reply.rp_stat = MSG_ACCEPTED; 789 rply.acpted_rply.ar_verf = rqstp->rq_verf; 790 rply.acpted_rply.ar_stat = SYSTEM_ERR; 791 792 if (xprt->xp_pool->sp_rcache) 793 replay_setreply(xprt->xp_pool->sp_rcache, 794 &rply, svc_getrpccaller(rqstp), NULL); 795 796 svc_sendreply_common(rqstp, &rply, NULL); 797 } 798 799 /* 800 * Authentication error reply 801 */ 802 void 803 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 804 { 805 SVCXPRT *xprt = rqstp->rq_xprt; 806 struct rpc_msg rply; 807 808 rply.rm_xid = rqstp->rq_xid; 809 rply.rm_direction = REPLY; 810 rply.rm_reply.rp_stat = MSG_DENIED; 811 rply.rjcted_rply.rj_stat = AUTH_ERROR; 812 rply.rjcted_rply.rj_why = why; 813 814 if (xprt->xp_pool->sp_rcache) 815 replay_setreply(xprt->xp_pool->sp_rcache, 816 &rply, svc_getrpccaller(rqstp), NULL); 817 818 svc_sendreply_common(rqstp, &rply, NULL); 819 } 820 821 /* 822 * Auth too weak error reply 823 */ 824 void 825 svcerr_weakauth(struct svc_req *rqstp) 826 { 827 828 svcerr_auth(rqstp, AUTH_TOOWEAK); 829 } 830 831 /* 832 * Program unavailable error reply 833 */ 834 void 835 svcerr_noprog(struct svc_req *rqstp) 836 { 837 SVCXPRT *xprt = rqstp->rq_xprt; 838 struct rpc_msg rply; 839 840 rply.rm_xid = rqstp->rq_xid; 841 rply.rm_direction = REPLY; 842 rply.rm_reply.rp_stat = MSG_ACCEPTED; 843 rply.acpted_rply.ar_verf = rqstp->rq_verf; 844 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 845 846 if (xprt->xp_pool->sp_rcache) 847 replay_setreply(xprt->xp_pool->sp_rcache, 848 &rply, svc_getrpccaller(rqstp), NULL); 849 850 svc_sendreply_common(rqstp, &rply, NULL); 851 } 852 853 /* 854 * Program version mismatch error reply 855 */ 856 void 857 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 858 { 859 SVCXPRT *xprt = rqstp->rq_xprt; 860 struct rpc_msg rply; 861 862 rply.rm_xid = rqstp->rq_xid; 863 rply.rm_direction = REPLY; 864 rply.rm_reply.rp_stat = MSG_ACCEPTED; 865 rply.acpted_rply.ar_verf = rqstp->rq_verf; 866 rply.acpted_rply.ar_stat = PROG_MISMATCH; 867 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 868 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 869 870 if (xprt->xp_pool->sp_rcache) 871 replay_setreply(xprt->xp_pool->sp_rcache, 872 &rply, svc_getrpccaller(rqstp), NULL); 873 874 svc_sendreply_common(rqstp, &rply, NULL); 875 } 876 877 /* 878 * Allocate a new server transport structure. All fields are 879 * initialized to zero and xp_p3 is initialized to point at an 880 * extension structure to hold various flags and authentication 881 * parameters. 882 */ 883 SVCXPRT * 884 svc_xprt_alloc(void) 885 { 886 SVCXPRT *xprt; 887 SVCXPRT_EXT *ext; 888 889 xprt = mem_alloc(sizeof(SVCXPRT)); 890 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 891 xprt->xp_p3 = ext; 892 refcount_init(&xprt->xp_refs, 1); 893 894 return (xprt); 895 } 896 897 /* 898 * Free a server transport structure. 899 */ 900 void 901 svc_xprt_free(SVCXPRT *xprt) 902 { 903 904 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 905 mem_free(xprt, sizeof(SVCXPRT)); 906 } 907 908 /* ******************* SERVER INPUT STUFF ******************* */ 909 910 /* 911 * Read RPC requests from a transport and queue them to be 912 * executed. We handle authentication and replay cache replies here. 913 * Actually dispatching the RPC is deferred till svc_executereq. 914 */ 915 static enum xprt_stat 916 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 917 { 918 SVCPOOL *pool = xprt->xp_pool; 919 struct svc_req *r; 920 struct rpc_msg msg; 921 struct mbuf *args; 922 struct svc_loss_callout *s; 923 enum xprt_stat stat; 924 925 /* now receive msgs from xprtprt (support batch calls) */ 926 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 927 928 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 929 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 930 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 931 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 932 enum auth_stat why; 933 934 /* 935 * Handle replays and authenticate before queuing the 936 * request to be executed. 937 */ 938 SVC_ACQUIRE(xprt); 939 r->rq_xprt = xprt; 940 if (pool->sp_rcache) { 941 struct rpc_msg repmsg; 942 struct mbuf *repbody; 943 enum replay_state rs; 944 rs = replay_find(pool->sp_rcache, &msg, 945 svc_getrpccaller(r), &repmsg, &repbody); 946 switch (rs) { 947 case RS_NEW: 948 break; 949 case RS_DONE: 950 SVC_REPLY(xprt, &repmsg, r->rq_addr, 951 repbody, &r->rq_reply_seq); 952 if (r->rq_addr) { 953 free(r->rq_addr, M_SONAME); 954 r->rq_addr = NULL; 955 } 956 m_freem(args); 957 goto call_done; 958 959 default: 960 m_freem(args); 961 goto call_done; 962 } 963 } 964 965 r->rq_xid = msg.rm_xid; 966 r->rq_prog = msg.rm_call.cb_prog; 967 r->rq_vers = msg.rm_call.cb_vers; 968 r->rq_proc = msg.rm_call.cb_proc; 969 r->rq_size = sizeof(*r) + m_length(args, NULL); 970 r->rq_args = args; 971 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 972 /* 973 * RPCSEC_GSS uses this return code 974 * for requests that form part of its 975 * context establishment protocol and 976 * should not be dispatched to the 977 * application. 978 */ 979 if (why != RPCSEC_GSS_NODISPATCH) 980 svcerr_auth(r, why); 981 goto call_done; 982 } 983 984 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 985 svcerr_decode(r); 986 goto call_done; 987 } 988 989 /* 990 * Everything checks out, return request to caller. 991 */ 992 *rqstp_ret = r; 993 r = NULL; 994 } 995 call_done: 996 if (r) { 997 svc_freereq(r); 998 r = NULL; 999 } 1000 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1001 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1002 (*s->slc_dispatch)(xprt); 1003 xprt_unregister(xprt); 1004 } 1005 1006 return (stat); 1007 } 1008 1009 static void 1010 svc_executereq(struct svc_req *rqstp) 1011 { 1012 SVCXPRT *xprt = rqstp->rq_xprt; 1013 SVCPOOL *pool = xprt->xp_pool; 1014 int prog_found; 1015 rpcvers_t low_vers; 1016 rpcvers_t high_vers; 1017 struct svc_callout *s; 1018 1019 /* now match message with a registered service*/ 1020 prog_found = FALSE; 1021 low_vers = (rpcvers_t) -1L; 1022 high_vers = (rpcvers_t) 0L; 1023 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1024 if (s->sc_prog == rqstp->rq_prog) { 1025 if (s->sc_vers == rqstp->rq_vers) { 1026 /* 1027 * We hand ownership of r to the 1028 * dispatch method - they must call 1029 * svc_freereq. 1030 */ 1031 (*s->sc_dispatch)(rqstp, xprt); 1032 return; 1033 } /* found correct version */ 1034 prog_found = TRUE; 1035 if (s->sc_vers < low_vers) 1036 low_vers = s->sc_vers; 1037 if (s->sc_vers > high_vers) 1038 high_vers = s->sc_vers; 1039 } /* found correct program */ 1040 } 1041 1042 /* 1043 * if we got here, the program or version 1044 * is not served ... 1045 */ 1046 if (prog_found) 1047 svcerr_progvers(rqstp, low_vers, high_vers); 1048 else 1049 svcerr_noprog(rqstp); 1050 1051 svc_freereq(rqstp); 1052 } 1053 1054 static void 1055 svc_checkidle(SVCGROUP *grp) 1056 { 1057 SVCXPRT *xprt, *nxprt; 1058 time_t timo; 1059 struct svcxprt_list cleanup; 1060 1061 TAILQ_INIT(&cleanup); 1062 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1063 /* 1064 * Only some transports have idle timers. Don't time 1065 * something out which is just waking up. 1066 */ 1067 if (!xprt->xp_idletimeout || xprt->xp_thread) 1068 continue; 1069 1070 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1071 if (time_uptime > timo) { 1072 xprt_unregister_locked(xprt); 1073 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1074 } 1075 } 1076 1077 mtx_unlock(&grp->sg_lock); 1078 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1079 SVC_RELEASE(xprt); 1080 } 1081 mtx_lock(&grp->sg_lock); 1082 } 1083 1084 static void 1085 svc_assign_waiting_sockets(SVCPOOL *pool) 1086 { 1087 SVCGROUP *grp; 1088 SVCXPRT *xprt; 1089 int g; 1090 1091 for (g = 0; g < pool->sp_groupcount; g++) { 1092 grp = &pool->sp_groups[g]; 1093 mtx_lock(&grp->sg_lock); 1094 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1095 if (xprt_assignthread(xprt)) 1096 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1097 else 1098 break; 1099 } 1100 mtx_unlock(&grp->sg_lock); 1101 } 1102 } 1103 1104 static void 1105 svc_change_space_used(SVCPOOL *pool, long delta) 1106 { 1107 unsigned long value; 1108 1109 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1110 if (delta > 0) { 1111 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1112 pool->sp_space_throttled = TRUE; 1113 pool->sp_space_throttle_count++; 1114 } 1115 if (value > pool->sp_space_used_highest) 1116 pool->sp_space_used_highest = value; 1117 } else { 1118 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1119 pool->sp_space_throttled = FALSE; 1120 svc_assign_waiting_sockets(pool); 1121 } 1122 } 1123 } 1124 1125 static bool_t 1126 svc_request_space_available(SVCPOOL *pool) 1127 { 1128 1129 if (pool->sp_space_throttled) 1130 return (FALSE); 1131 return (TRUE); 1132 } 1133 1134 static void 1135 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1136 { 1137 SVCPOOL *pool = grp->sg_pool; 1138 SVCTHREAD *st, *stpref; 1139 SVCXPRT *xprt; 1140 enum xprt_stat stat; 1141 struct svc_req *rqstp; 1142 struct proc *p; 1143 long sz; 1144 int error; 1145 1146 st = mem_alloc(sizeof(*st)); 1147 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1148 st->st_pool = pool; 1149 st->st_xprt = NULL; 1150 STAILQ_INIT(&st->st_reqs); 1151 cv_init(&st->st_cond, "rpcsvc"); 1152 1153 mtx_lock(&grp->sg_lock); 1154 1155 /* 1156 * If we are a new thread which was spawned to cope with 1157 * increased load, set the state back to SVCPOOL_ACTIVE. 1158 */ 1159 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1160 grp->sg_state = SVCPOOL_ACTIVE; 1161 1162 while (grp->sg_state != SVCPOOL_CLOSING) { 1163 /* 1164 * Create new thread if requested. 1165 */ 1166 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1167 grp->sg_state = SVCPOOL_THREADSTARTING; 1168 grp->sg_lastcreatetime = time_uptime; 1169 mtx_unlock(&grp->sg_lock); 1170 svc_new_thread(grp); 1171 mtx_lock(&grp->sg_lock); 1172 continue; 1173 } 1174 1175 /* 1176 * Check for idle transports once per second. 1177 */ 1178 if (time_uptime > grp->sg_lastidlecheck) { 1179 grp->sg_lastidlecheck = time_uptime; 1180 svc_checkidle(grp); 1181 } 1182 1183 xprt = st->st_xprt; 1184 if (!xprt) { 1185 /* 1186 * Enforce maxthreads count. 1187 */ 1188 if (grp->sg_threadcount > grp->sg_maxthreads) 1189 break; 1190 1191 /* 1192 * Before sleeping, see if we can find an 1193 * active transport which isn't being serviced 1194 * by a thread. 1195 */ 1196 if (svc_request_space_available(pool) && 1197 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1198 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1199 SVC_ACQUIRE(xprt); 1200 xprt->xp_thread = st; 1201 st->st_xprt = xprt; 1202 continue; 1203 } 1204 1205 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1206 if (ismaster || (!ismaster && 1207 grp->sg_threadcount > grp->sg_minthreads)) 1208 error = cv_timedwait_sig(&st->st_cond, 1209 &grp->sg_lock, 5 * hz); 1210 else 1211 error = cv_wait_sig(&st->st_cond, 1212 &grp->sg_lock); 1213 if (st->st_xprt == NULL) 1214 LIST_REMOVE(st, st_ilink); 1215 1216 /* 1217 * Reduce worker thread count when idle. 1218 */ 1219 if (error == EWOULDBLOCK) { 1220 if (!ismaster 1221 && (grp->sg_threadcount 1222 > grp->sg_minthreads) 1223 && !st->st_xprt) 1224 break; 1225 } else if (error != 0) { 1226 KASSERT(error == EINTR || error == ERESTART, 1227 ("non-signal error %d", error)); 1228 mtx_unlock(&grp->sg_lock); 1229 p = curproc; 1230 PROC_LOCK(p); 1231 if (P_SHOULDSTOP(p) || 1232 (p->p_flag & P_TOTAL_STOP) != 0) { 1233 thread_suspend_check(0); 1234 PROC_UNLOCK(p); 1235 mtx_lock(&grp->sg_lock); 1236 } else { 1237 PROC_UNLOCK(p); 1238 svc_exit(pool); 1239 mtx_lock(&grp->sg_lock); 1240 break; 1241 } 1242 } 1243 continue; 1244 } 1245 mtx_unlock(&grp->sg_lock); 1246 1247 /* 1248 * Drain the transport socket and queue up any RPCs. 1249 */ 1250 xprt->xp_lastactive = time_uptime; 1251 do { 1252 if (!svc_request_space_available(pool)) 1253 break; 1254 rqstp = NULL; 1255 stat = svc_getreq(xprt, &rqstp); 1256 if (rqstp) { 1257 svc_change_space_used(pool, rqstp->rq_size); 1258 /* 1259 * See if the application has a preference 1260 * for some other thread. 1261 */ 1262 if (pool->sp_assign) { 1263 stpref = pool->sp_assign(st, rqstp); 1264 rqstp->rq_thread = stpref; 1265 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1266 rqstp, rq_link); 1267 mtx_unlock(&stpref->st_lock); 1268 if (stpref != st) 1269 rqstp = NULL; 1270 } else { 1271 rqstp->rq_thread = st; 1272 STAILQ_INSERT_TAIL(&st->st_reqs, 1273 rqstp, rq_link); 1274 } 1275 } 1276 } while (rqstp == NULL && stat == XPRT_MOREREQS 1277 && grp->sg_state != SVCPOOL_CLOSING); 1278 1279 /* 1280 * Move this transport to the end of the active list to 1281 * ensure fairness when multiple transports are active. 1282 * If this was the last queued request, svc_getreq will end 1283 * up calling xprt_inactive to remove from the active list. 1284 */ 1285 mtx_lock(&grp->sg_lock); 1286 xprt->xp_thread = NULL; 1287 st->st_xprt = NULL; 1288 if (xprt->xp_active) { 1289 if (!svc_request_space_available(pool) || 1290 !xprt_assignthread(xprt)) 1291 TAILQ_INSERT_TAIL(&grp->sg_active, 1292 xprt, xp_alink); 1293 } 1294 mtx_unlock(&grp->sg_lock); 1295 SVC_RELEASE(xprt); 1296 1297 /* 1298 * Execute what we have queued. 1299 */ 1300 mtx_lock(&st->st_lock); 1301 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1302 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1303 mtx_unlock(&st->st_lock); 1304 sz = (long)rqstp->rq_size; 1305 svc_executereq(rqstp); 1306 svc_change_space_used(pool, -sz); 1307 mtx_lock(&st->st_lock); 1308 } 1309 mtx_unlock(&st->st_lock); 1310 mtx_lock(&grp->sg_lock); 1311 } 1312 1313 if (st->st_xprt) { 1314 xprt = st->st_xprt; 1315 st->st_xprt = NULL; 1316 SVC_RELEASE(xprt); 1317 } 1318 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1319 mtx_destroy(&st->st_lock); 1320 cv_destroy(&st->st_cond); 1321 mem_free(st, sizeof(*st)); 1322 1323 grp->sg_threadcount--; 1324 if (!ismaster) 1325 wakeup(grp); 1326 mtx_unlock(&grp->sg_lock); 1327 } 1328 1329 static void 1330 svc_thread_start(void *arg) 1331 { 1332 1333 svc_run_internal((SVCGROUP *) arg, FALSE); 1334 kthread_exit(); 1335 } 1336 1337 static void 1338 svc_new_thread(SVCGROUP *grp) 1339 { 1340 SVCPOOL *pool = grp->sg_pool; 1341 struct thread *td; 1342 1343 mtx_lock(&grp->sg_lock); 1344 grp->sg_threadcount++; 1345 mtx_unlock(&grp->sg_lock); 1346 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1347 "%s: service", pool->sp_name); 1348 } 1349 1350 void 1351 svc_run(SVCPOOL *pool) 1352 { 1353 int g, i; 1354 struct proc *p; 1355 struct thread *td; 1356 SVCGROUP *grp; 1357 1358 p = curproc; 1359 td = curthread; 1360 snprintf(td->td_name, sizeof(td->td_name), 1361 "%s: master", pool->sp_name); 1362 pool->sp_state = SVCPOOL_ACTIVE; 1363 pool->sp_proc = p; 1364 1365 /* Choose group count based on number of threads and CPUs. */ 1366 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1367 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1368 for (g = 0; g < pool->sp_groupcount; g++) { 1369 grp = &pool->sp_groups[g]; 1370 grp->sg_minthreads = max(1, 1371 pool->sp_minthreads / pool->sp_groupcount); 1372 grp->sg_maxthreads = max(1, 1373 pool->sp_maxthreads / pool->sp_groupcount); 1374 grp->sg_lastcreatetime = time_uptime; 1375 } 1376 1377 /* Starting threads */ 1378 pool->sp_groups[0].sg_threadcount++; 1379 for (g = 0; g < pool->sp_groupcount; g++) { 1380 grp = &pool->sp_groups[g]; 1381 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1382 svc_new_thread(grp); 1383 } 1384 svc_run_internal(&pool->sp_groups[0], TRUE); 1385 1386 /* Waiting for threads to stop. */ 1387 for (g = 0; g < pool->sp_groupcount; g++) { 1388 grp = &pool->sp_groups[g]; 1389 mtx_lock(&grp->sg_lock); 1390 while (grp->sg_threadcount > 0) 1391 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1392 mtx_unlock(&grp->sg_lock); 1393 } 1394 } 1395 1396 void 1397 svc_exit(SVCPOOL *pool) 1398 { 1399 SVCGROUP *grp; 1400 SVCTHREAD *st; 1401 int g; 1402 1403 pool->sp_state = SVCPOOL_CLOSING; 1404 for (g = 0; g < pool->sp_groupcount; g++) { 1405 grp = &pool->sp_groups[g]; 1406 mtx_lock(&grp->sg_lock); 1407 if (grp->sg_state != SVCPOOL_CLOSING) { 1408 grp->sg_state = SVCPOOL_CLOSING; 1409 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1410 cv_signal(&st->st_cond); 1411 } 1412 mtx_unlock(&grp->sg_lock); 1413 } 1414 } 1415 1416 bool_t 1417 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1418 { 1419 struct mbuf *m; 1420 XDR xdrs; 1421 bool_t stat; 1422 1423 m = rqstp->rq_args; 1424 rqstp->rq_args = NULL; 1425 1426 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1427 stat = xargs(&xdrs, args); 1428 XDR_DESTROY(&xdrs); 1429 1430 return (stat); 1431 } 1432 1433 bool_t 1434 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1435 { 1436 XDR xdrs; 1437 1438 if (rqstp->rq_addr) { 1439 free(rqstp->rq_addr, M_SONAME); 1440 rqstp->rq_addr = NULL; 1441 } 1442 1443 xdrs.x_op = XDR_FREE; 1444 return (xargs(&xdrs, args)); 1445 } 1446 1447 void 1448 svc_freereq(struct svc_req *rqstp) 1449 { 1450 SVCTHREAD *st; 1451 SVCPOOL *pool; 1452 1453 st = rqstp->rq_thread; 1454 if (st) { 1455 pool = st->st_pool; 1456 if (pool->sp_done) 1457 pool->sp_done(st, rqstp); 1458 } 1459 1460 if (rqstp->rq_auth.svc_ah_ops) 1461 SVCAUTH_RELEASE(&rqstp->rq_auth); 1462 1463 if (rqstp->rq_xprt) { 1464 SVC_RELEASE(rqstp->rq_xprt); 1465 } 1466 1467 if (rqstp->rq_addr) 1468 free(rqstp->rq_addr, M_SONAME); 1469 1470 if (rqstp->rq_args) 1471 m_freem(rqstp->rq_args); 1472 1473 free(rqstp, M_RPC); 1474 } 1475