1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * svc.c, Server-side remote procedure call interface. 42 * 43 * There are two sets of procedures here. The xprt routines are 44 * for handling transport handles. The svc routines handle the 45 * list of service routines. 46 * 47 * Copyright (C) 1984, Sun Microsystems, Inc. 48 */ 49 50 #include <sys/param.h> 51 #include <sys/lock.h> 52 #include <sys/kernel.h> 53 #include <sys/kthread.h> 54 #include <sys/malloc.h> 55 #include <sys/mbuf.h> 56 #include <sys/mutex.h> 57 #include <sys/proc.h> 58 #include <sys/queue.h> 59 #include <sys/socketvar.h> 60 #include <sys/systm.h> 61 #include <sys/smp.h> 62 #include <sys/sx.h> 63 #include <sys/ucred.h> 64 65 #include <rpc/rpc.h> 66 #include <rpc/rpcb_clnt.h> 67 #include <rpc/replay.h> 68 69 #include <rpc/rpc_com.h> 70 71 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 73 74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 75 char *); 76 static void svc_new_thread(SVCGROUP *grp); 77 static void xprt_unregister_locked(SVCXPRT *xprt); 78 static void svc_change_space_used(SVCPOOL *pool, long delta); 79 static bool_t svc_request_space_available(SVCPOOL *pool); 80 static void svcpool_cleanup(SVCPOOL *pool); 81 82 /* *************** SVCXPRT related stuff **************** */ 83 84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 87 88 SVCPOOL* 89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 90 { 91 SVCPOOL *pool; 92 SVCGROUP *grp; 93 int g; 94 95 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 96 97 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 98 pool->sp_name = name; 99 pool->sp_state = SVCPOOL_INIT; 100 pool->sp_proc = NULL; 101 TAILQ_INIT(&pool->sp_callouts); 102 TAILQ_INIT(&pool->sp_lcallouts); 103 pool->sp_minthreads = 1; 104 pool->sp_maxthreads = 1; 105 pool->sp_groupcount = 1; 106 for (g = 0; g < SVC_MAXGROUPS; g++) { 107 grp = &pool->sp_groups[g]; 108 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 109 grp->sg_pool = pool; 110 grp->sg_state = SVCPOOL_ACTIVE; 111 TAILQ_INIT(&grp->sg_xlist); 112 TAILQ_INIT(&grp->sg_active); 113 LIST_INIT(&grp->sg_idlethreads); 114 grp->sg_minthreads = 1; 115 grp->sg_maxthreads = 1; 116 } 117 118 /* 119 * Don't use more than a quarter of mbuf clusters. Nota bene: 120 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 121 * on LP64 architectures, so cast to u_long to avoid undefined 122 * behavior. (ILP32 architectures cannot have nmbclusters 123 * large enough to overflow for other reasons.) 124 */ 125 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 126 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 127 128 sysctl_ctx_init(&pool->sp_sysctl); 129 if (sysctl_base) { 130 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 132 pool, 0, svcpool_minthread_sysctl, "I", 133 "Minimal number of threads"); 134 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 135 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 136 pool, 0, svcpool_maxthread_sysctl, "I", 137 "Maximal number of threads"); 138 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 139 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 140 pool, 0, svcpool_threads_sysctl, "I", 141 "Current number of threads"); 142 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 143 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 144 "Number of thread groups"); 145 146 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_used", CTLFLAG_RD, 148 &pool->sp_space_used, 149 "Space in parsed but not handled requests."); 150 151 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 152 "request_space_used_highest", CTLFLAG_RD, 153 &pool->sp_space_used_highest, 154 "Highest space used since reboot."); 155 156 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 157 "request_space_high", CTLFLAG_RW, 158 &pool->sp_space_high, 159 "Maximum space in parsed but not handled requests."); 160 161 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 162 "request_space_low", CTLFLAG_RW, 163 &pool->sp_space_low, 164 "Low water mark for request space."); 165 166 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 167 "request_space_throttled", CTLFLAG_RD, 168 &pool->sp_space_throttled, 0, 169 "Whether nfs requests are currently throttled"); 170 171 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 172 "request_space_throttle_count", CTLFLAG_RD, 173 &pool->sp_space_throttle_count, 0, 174 "Count of times throttling based on request space has occurred"); 175 } 176 177 return pool; 178 } 179 180 /* 181 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 182 * the pool data structures. 183 */ 184 static void 185 svcpool_cleanup(SVCPOOL *pool) 186 { 187 SVCGROUP *grp; 188 SVCXPRT *xprt, *nxprt; 189 struct svc_callout *s; 190 struct svc_loss_callout *sl; 191 struct svcxprt_list cleanup; 192 int g; 193 194 TAILQ_INIT(&cleanup); 195 196 for (g = 0; g < SVC_MAXGROUPS; g++) { 197 grp = &pool->sp_groups[g]; 198 mtx_lock(&grp->sg_lock); 199 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 200 xprt_unregister_locked(xprt); 201 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 202 } 203 mtx_unlock(&grp->sg_lock); 204 } 205 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 206 SVC_RELEASE(xprt); 207 } 208 209 mtx_lock(&pool->sp_lock); 210 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 211 mtx_unlock(&pool->sp_lock); 212 svc_unreg(pool, s->sc_prog, s->sc_vers); 213 mtx_lock(&pool->sp_lock); 214 } 215 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 216 mtx_unlock(&pool->sp_lock); 217 svc_loss_unreg(pool, sl->slc_dispatch); 218 mtx_lock(&pool->sp_lock); 219 } 220 mtx_unlock(&pool->sp_lock); 221 } 222 223 void 224 svcpool_destroy(SVCPOOL *pool) 225 { 226 SVCGROUP *grp; 227 int g; 228 229 svcpool_cleanup(pool); 230 231 for (g = 0; g < SVC_MAXGROUPS; g++) { 232 grp = &pool->sp_groups[g]; 233 mtx_destroy(&grp->sg_lock); 234 } 235 mtx_destroy(&pool->sp_lock); 236 237 if (pool->sp_rcache) 238 replay_freecache(pool->sp_rcache); 239 240 sysctl_ctx_free(&pool->sp_sysctl); 241 free(pool, M_RPC); 242 } 243 244 /* 245 * Similar to svcpool_destroy(), except that it does not destroy the actual 246 * data structures. As such, "pool" may be used again. 247 */ 248 void 249 svcpool_close(SVCPOOL *pool) 250 { 251 SVCGROUP *grp; 252 int g; 253 254 svcpool_cleanup(pool); 255 256 /* Now, initialize the pool's state for a fresh svc_run() call. */ 257 mtx_lock(&pool->sp_lock); 258 pool->sp_state = SVCPOOL_INIT; 259 mtx_unlock(&pool->sp_lock); 260 for (g = 0; g < SVC_MAXGROUPS; g++) { 261 grp = &pool->sp_groups[g]; 262 mtx_lock(&grp->sg_lock); 263 grp->sg_state = SVCPOOL_ACTIVE; 264 mtx_unlock(&grp->sg_lock); 265 } 266 } 267 268 /* 269 * Sysctl handler to get the present thread count on a pool 270 */ 271 static int 272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 273 { 274 SVCPOOL *pool; 275 int threads, error, g; 276 277 pool = oidp->oid_arg1; 278 threads = 0; 279 mtx_lock(&pool->sp_lock); 280 for (g = 0; g < pool->sp_groupcount; g++) 281 threads += pool->sp_groups[g].sg_threadcount; 282 mtx_unlock(&pool->sp_lock); 283 error = sysctl_handle_int(oidp, &threads, 0, req); 284 return (error); 285 } 286 287 /* 288 * Sysctl handler to set the minimum thread count on a pool 289 */ 290 static int 291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 292 { 293 SVCPOOL *pool; 294 int newminthreads, error, g; 295 296 pool = oidp->oid_arg1; 297 newminthreads = pool->sp_minthreads; 298 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 299 if (error == 0 && newminthreads != pool->sp_minthreads) { 300 if (newminthreads > pool->sp_maxthreads) 301 return (EINVAL); 302 mtx_lock(&pool->sp_lock); 303 pool->sp_minthreads = newminthreads; 304 for (g = 0; g < pool->sp_groupcount; g++) { 305 pool->sp_groups[g].sg_minthreads = max(1, 306 pool->sp_minthreads / pool->sp_groupcount); 307 } 308 mtx_unlock(&pool->sp_lock); 309 } 310 return (error); 311 } 312 313 /* 314 * Sysctl handler to set the maximum thread count on a pool 315 */ 316 static int 317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 318 { 319 SVCPOOL *pool; 320 int newmaxthreads, error, g; 321 322 pool = oidp->oid_arg1; 323 newmaxthreads = pool->sp_maxthreads; 324 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 325 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 326 if (newmaxthreads < pool->sp_minthreads) 327 return (EINVAL); 328 mtx_lock(&pool->sp_lock); 329 pool->sp_maxthreads = newmaxthreads; 330 for (g = 0; g < pool->sp_groupcount; g++) { 331 pool->sp_groups[g].sg_maxthreads = max(1, 332 pool->sp_maxthreads / pool->sp_groupcount); 333 } 334 mtx_unlock(&pool->sp_lock); 335 } 336 return (error); 337 } 338 339 /* 340 * Activate a transport handle. 341 */ 342 void 343 xprt_register(SVCXPRT *xprt) 344 { 345 SVCPOOL *pool = xprt->xp_pool; 346 SVCGROUP *grp; 347 int g; 348 349 SVC_ACQUIRE(xprt); 350 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 351 xprt->xp_group = grp = &pool->sp_groups[g]; 352 mtx_lock(&grp->sg_lock); 353 xprt->xp_registered = TRUE; 354 xprt->xp_active = FALSE; 355 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 356 mtx_unlock(&grp->sg_lock); 357 } 358 359 /* 360 * De-activate a transport handle. Note: the locked version doesn't 361 * release the transport - caller must do that after dropping the pool 362 * lock. 363 */ 364 static void 365 xprt_unregister_locked(SVCXPRT *xprt) 366 { 367 SVCGROUP *grp = xprt->xp_group; 368 369 mtx_assert(&grp->sg_lock, MA_OWNED); 370 KASSERT(xprt->xp_registered == TRUE, 371 ("xprt_unregister_locked: not registered")); 372 xprt_inactive_locked(xprt); 373 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 374 xprt->xp_registered = FALSE; 375 } 376 377 void 378 xprt_unregister(SVCXPRT *xprt) 379 { 380 SVCGROUP *grp = xprt->xp_group; 381 382 mtx_lock(&grp->sg_lock); 383 if (xprt->xp_registered == FALSE) { 384 /* Already unregistered by another thread */ 385 mtx_unlock(&grp->sg_lock); 386 return; 387 } 388 xprt_unregister_locked(xprt); 389 mtx_unlock(&grp->sg_lock); 390 391 SVC_RELEASE(xprt); 392 } 393 394 /* 395 * Attempt to assign a service thread to this transport. 396 */ 397 static int 398 xprt_assignthread(SVCXPRT *xprt) 399 { 400 SVCGROUP *grp = xprt->xp_group; 401 SVCTHREAD *st; 402 403 mtx_assert(&grp->sg_lock, MA_OWNED); 404 st = LIST_FIRST(&grp->sg_idlethreads); 405 if (st) { 406 LIST_REMOVE(st, st_ilink); 407 SVC_ACQUIRE(xprt); 408 xprt->xp_thread = st; 409 st->st_xprt = xprt; 410 cv_signal(&st->st_cond); 411 return (TRUE); 412 } else { 413 /* 414 * See if we can create a new thread. The 415 * actual thread creation happens in 416 * svc_run_internal because our locking state 417 * is poorly defined (we are typically called 418 * from a socket upcall). Don't create more 419 * than one thread per second. 420 */ 421 if (grp->sg_state == SVCPOOL_ACTIVE 422 && grp->sg_lastcreatetime < time_uptime 423 && grp->sg_threadcount < grp->sg_maxthreads) { 424 grp->sg_state = SVCPOOL_THREADWANTED; 425 } 426 } 427 return (FALSE); 428 } 429 430 void 431 xprt_active(SVCXPRT *xprt) 432 { 433 SVCGROUP *grp = xprt->xp_group; 434 435 mtx_lock(&grp->sg_lock); 436 437 if (!xprt->xp_registered) { 438 /* 439 * Race with xprt_unregister - we lose. 440 */ 441 mtx_unlock(&grp->sg_lock); 442 return; 443 } 444 445 if (!xprt->xp_active) { 446 xprt->xp_active = TRUE; 447 if (xprt->xp_thread == NULL) { 448 if (!svc_request_space_available(xprt->xp_pool) || 449 !xprt_assignthread(xprt)) 450 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 451 xp_alink); 452 } 453 } 454 455 mtx_unlock(&grp->sg_lock); 456 } 457 458 void 459 xprt_inactive_locked(SVCXPRT *xprt) 460 { 461 SVCGROUP *grp = xprt->xp_group; 462 463 mtx_assert(&grp->sg_lock, MA_OWNED); 464 if (xprt->xp_active) { 465 if (xprt->xp_thread == NULL) 466 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 467 xprt->xp_active = FALSE; 468 } 469 } 470 471 void 472 xprt_inactive(SVCXPRT *xprt) 473 { 474 SVCGROUP *grp = xprt->xp_group; 475 476 mtx_lock(&grp->sg_lock); 477 xprt_inactive_locked(xprt); 478 mtx_unlock(&grp->sg_lock); 479 } 480 481 /* 482 * Variant of xprt_inactive() for use only when sure that port is 483 * assigned to thread. For example, within receive handlers. 484 */ 485 void 486 xprt_inactive_self(SVCXPRT *xprt) 487 { 488 489 KASSERT(xprt->xp_thread != NULL, 490 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 491 xprt->xp_active = FALSE; 492 } 493 494 /* 495 * Add a service program to the callout list. 496 * The dispatch routine will be called when a rpc request for this 497 * program number comes in. 498 */ 499 bool_t 500 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 501 void (*dispatch)(struct svc_req *, SVCXPRT *), 502 const struct netconfig *nconf) 503 { 504 SVCPOOL *pool = xprt->xp_pool; 505 struct svc_callout *s; 506 char *netid = NULL; 507 int flag = 0; 508 509 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 510 511 if (xprt->xp_netid) { 512 netid = strdup(xprt->xp_netid, M_RPC); 513 flag = 1; 514 } else if (nconf && nconf->nc_netid) { 515 netid = strdup(nconf->nc_netid, M_RPC); 516 flag = 1; 517 } /* must have been created with svc_raw_create */ 518 if ((netid == NULL) && (flag == 1)) { 519 return (FALSE); 520 } 521 522 mtx_lock(&pool->sp_lock); 523 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 524 if (netid) 525 free(netid, M_RPC); 526 if (s->sc_dispatch == dispatch) 527 goto rpcb_it; /* he is registering another xptr */ 528 mtx_unlock(&pool->sp_lock); 529 return (FALSE); 530 } 531 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 532 if (s == NULL) { 533 if (netid) 534 free(netid, M_RPC); 535 mtx_unlock(&pool->sp_lock); 536 return (FALSE); 537 } 538 539 s->sc_prog = prog; 540 s->sc_vers = vers; 541 s->sc_dispatch = dispatch; 542 s->sc_netid = netid; 543 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 544 545 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 546 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 547 548 rpcb_it: 549 mtx_unlock(&pool->sp_lock); 550 /* now register the information with the local binder service */ 551 if (nconf) { 552 bool_t dummy; 553 struct netconfig tnc; 554 struct netbuf nb; 555 tnc = *nconf; 556 nb.buf = &xprt->xp_ltaddr; 557 nb.len = xprt->xp_ltaddr.ss_len; 558 dummy = rpcb_set(prog, vers, &tnc, &nb); 559 return (dummy); 560 } 561 return (TRUE); 562 } 563 564 /* 565 * Remove a service program from the callout list. 566 */ 567 void 568 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 569 { 570 struct svc_callout *s; 571 572 /* unregister the information anyway */ 573 (void) rpcb_unset(prog, vers, NULL); 574 mtx_lock(&pool->sp_lock); 575 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 576 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 577 if (s->sc_netid) 578 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 579 mem_free(s, sizeof (struct svc_callout)); 580 } 581 mtx_unlock(&pool->sp_lock); 582 } 583 584 /* 585 * Add a service connection loss program to the callout list. 586 * The dispatch routine will be called when some port in ths pool die. 587 */ 588 bool_t 589 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 590 { 591 SVCPOOL *pool = xprt->xp_pool; 592 struct svc_loss_callout *s; 593 594 mtx_lock(&pool->sp_lock); 595 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 596 if (s->slc_dispatch == dispatch) 597 break; 598 } 599 if (s != NULL) { 600 mtx_unlock(&pool->sp_lock); 601 return (TRUE); 602 } 603 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 604 if (s == NULL) { 605 mtx_unlock(&pool->sp_lock); 606 return (FALSE); 607 } 608 s->slc_dispatch = dispatch; 609 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 610 mtx_unlock(&pool->sp_lock); 611 return (TRUE); 612 } 613 614 /* 615 * Remove a service connection loss program from the callout list. 616 */ 617 void 618 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 619 { 620 struct svc_loss_callout *s; 621 622 mtx_lock(&pool->sp_lock); 623 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 624 if (s->slc_dispatch == dispatch) { 625 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 626 free(s, M_RPC); 627 break; 628 } 629 } 630 mtx_unlock(&pool->sp_lock); 631 } 632 633 /* ********************** CALLOUT list related stuff ************* */ 634 635 /* 636 * Search the callout list for a program number, return the callout 637 * struct. 638 */ 639 static struct svc_callout * 640 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 641 { 642 struct svc_callout *s; 643 644 mtx_assert(&pool->sp_lock, MA_OWNED); 645 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 646 if (s->sc_prog == prog && s->sc_vers == vers 647 && (netid == NULL || s->sc_netid == NULL || 648 strcmp(netid, s->sc_netid) == 0)) 649 break; 650 } 651 652 return (s); 653 } 654 655 /* ******************* REPLY GENERATION ROUTINES ************ */ 656 657 static bool_t 658 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 659 struct mbuf *body) 660 { 661 SVCXPRT *xprt = rqstp->rq_xprt; 662 bool_t ok; 663 664 if (rqstp->rq_args) { 665 m_freem(rqstp->rq_args); 666 rqstp->rq_args = NULL; 667 } 668 669 if (xprt->xp_pool->sp_rcache) 670 replay_setreply(xprt->xp_pool->sp_rcache, 671 rply, svc_getrpccaller(rqstp), body); 672 673 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 674 return (FALSE); 675 676 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 677 if (rqstp->rq_addr) { 678 free(rqstp->rq_addr, M_SONAME); 679 rqstp->rq_addr = NULL; 680 } 681 682 return (ok); 683 } 684 685 /* 686 * Send a reply to an rpc request 687 */ 688 bool_t 689 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 690 { 691 struct rpc_msg rply; 692 struct mbuf *m; 693 XDR xdrs; 694 bool_t ok; 695 696 rply.rm_xid = rqstp->rq_xid; 697 rply.rm_direction = REPLY; 698 rply.rm_reply.rp_stat = MSG_ACCEPTED; 699 rply.acpted_rply.ar_verf = rqstp->rq_verf; 700 rply.acpted_rply.ar_stat = SUCCESS; 701 rply.acpted_rply.ar_results.where = NULL; 702 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 703 704 m = m_getcl(M_WAITOK, MT_DATA, 0); 705 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 706 ok = xdr_results(&xdrs, xdr_location); 707 XDR_DESTROY(&xdrs); 708 709 if (ok) { 710 return (svc_sendreply_common(rqstp, &rply, m)); 711 } else { 712 m_freem(m); 713 return (FALSE); 714 } 715 } 716 717 bool_t 718 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 719 { 720 struct rpc_msg rply; 721 722 rply.rm_xid = rqstp->rq_xid; 723 rply.rm_direction = REPLY; 724 rply.rm_reply.rp_stat = MSG_ACCEPTED; 725 rply.acpted_rply.ar_verf = rqstp->rq_verf; 726 rply.acpted_rply.ar_stat = SUCCESS; 727 rply.acpted_rply.ar_results.where = NULL; 728 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 729 730 return (svc_sendreply_common(rqstp, &rply, m)); 731 } 732 733 /* 734 * No procedure error reply 735 */ 736 void 737 svcerr_noproc(struct svc_req *rqstp) 738 { 739 SVCXPRT *xprt = rqstp->rq_xprt; 740 struct rpc_msg rply; 741 742 rply.rm_xid = rqstp->rq_xid; 743 rply.rm_direction = REPLY; 744 rply.rm_reply.rp_stat = MSG_ACCEPTED; 745 rply.acpted_rply.ar_verf = rqstp->rq_verf; 746 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 747 748 if (xprt->xp_pool->sp_rcache) 749 replay_setreply(xprt->xp_pool->sp_rcache, 750 &rply, svc_getrpccaller(rqstp), NULL); 751 752 svc_sendreply_common(rqstp, &rply, NULL); 753 } 754 755 /* 756 * Can't decode args error reply 757 */ 758 void 759 svcerr_decode(struct svc_req *rqstp) 760 { 761 SVCXPRT *xprt = rqstp->rq_xprt; 762 struct rpc_msg rply; 763 764 rply.rm_xid = rqstp->rq_xid; 765 rply.rm_direction = REPLY; 766 rply.rm_reply.rp_stat = MSG_ACCEPTED; 767 rply.acpted_rply.ar_verf = rqstp->rq_verf; 768 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 769 770 if (xprt->xp_pool->sp_rcache) 771 replay_setreply(xprt->xp_pool->sp_rcache, 772 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 773 774 svc_sendreply_common(rqstp, &rply, NULL); 775 } 776 777 /* 778 * Some system error 779 */ 780 void 781 svcerr_systemerr(struct svc_req *rqstp) 782 { 783 SVCXPRT *xprt = rqstp->rq_xprt; 784 struct rpc_msg rply; 785 786 rply.rm_xid = rqstp->rq_xid; 787 rply.rm_direction = REPLY; 788 rply.rm_reply.rp_stat = MSG_ACCEPTED; 789 rply.acpted_rply.ar_verf = rqstp->rq_verf; 790 rply.acpted_rply.ar_stat = SYSTEM_ERR; 791 792 if (xprt->xp_pool->sp_rcache) 793 replay_setreply(xprt->xp_pool->sp_rcache, 794 &rply, svc_getrpccaller(rqstp), NULL); 795 796 svc_sendreply_common(rqstp, &rply, NULL); 797 } 798 799 /* 800 * Authentication error reply 801 */ 802 void 803 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 804 { 805 SVCXPRT *xprt = rqstp->rq_xprt; 806 struct rpc_msg rply; 807 808 rply.rm_xid = rqstp->rq_xid; 809 rply.rm_direction = REPLY; 810 rply.rm_reply.rp_stat = MSG_DENIED; 811 rply.rjcted_rply.rj_stat = AUTH_ERROR; 812 rply.rjcted_rply.rj_why = why; 813 814 if (xprt->xp_pool->sp_rcache) 815 replay_setreply(xprt->xp_pool->sp_rcache, 816 &rply, svc_getrpccaller(rqstp), NULL); 817 818 svc_sendreply_common(rqstp, &rply, NULL); 819 } 820 821 /* 822 * Auth too weak error reply 823 */ 824 void 825 svcerr_weakauth(struct svc_req *rqstp) 826 { 827 828 svcerr_auth(rqstp, AUTH_TOOWEAK); 829 } 830 831 /* 832 * Program unavailable error reply 833 */ 834 void 835 svcerr_noprog(struct svc_req *rqstp) 836 { 837 SVCXPRT *xprt = rqstp->rq_xprt; 838 struct rpc_msg rply; 839 840 rply.rm_xid = rqstp->rq_xid; 841 rply.rm_direction = REPLY; 842 rply.rm_reply.rp_stat = MSG_ACCEPTED; 843 rply.acpted_rply.ar_verf = rqstp->rq_verf; 844 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 845 846 if (xprt->xp_pool->sp_rcache) 847 replay_setreply(xprt->xp_pool->sp_rcache, 848 &rply, svc_getrpccaller(rqstp), NULL); 849 850 svc_sendreply_common(rqstp, &rply, NULL); 851 } 852 853 /* 854 * Program version mismatch error reply 855 */ 856 void 857 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 858 { 859 SVCXPRT *xprt = rqstp->rq_xprt; 860 struct rpc_msg rply; 861 862 rply.rm_xid = rqstp->rq_xid; 863 rply.rm_direction = REPLY; 864 rply.rm_reply.rp_stat = MSG_ACCEPTED; 865 rply.acpted_rply.ar_verf = rqstp->rq_verf; 866 rply.acpted_rply.ar_stat = PROG_MISMATCH; 867 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 868 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 869 870 if (xprt->xp_pool->sp_rcache) 871 replay_setreply(xprt->xp_pool->sp_rcache, 872 &rply, svc_getrpccaller(rqstp), NULL); 873 874 svc_sendreply_common(rqstp, &rply, NULL); 875 } 876 877 /* 878 * Allocate a new server transport structure. All fields are 879 * initialized to zero and xp_p3 is initialized to point at an 880 * extension structure to hold various flags and authentication 881 * parameters. 882 */ 883 SVCXPRT * 884 svc_xprt_alloc(void) 885 { 886 SVCXPRT *xprt; 887 SVCXPRT_EXT *ext; 888 889 xprt = mem_alloc(sizeof(SVCXPRT)); 890 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 891 xprt->xp_p3 = ext; 892 refcount_init(&xprt->xp_refs, 1); 893 894 return (xprt); 895 } 896 897 /* 898 * Free a server transport structure. 899 */ 900 void 901 svc_xprt_free(SVCXPRT *xprt) 902 { 903 904 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 905 /* The size argument is ignored, so 0 is ok. */ 906 mem_free(xprt->xp_gidp, 0); 907 mem_free(xprt, sizeof(SVCXPRT)); 908 } 909 910 /* ******************* SERVER INPUT STUFF ******************* */ 911 912 /* 913 * Read RPC requests from a transport and queue them to be 914 * executed. We handle authentication and replay cache replies here. 915 * Actually dispatching the RPC is deferred till svc_executereq. 916 */ 917 static enum xprt_stat 918 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 919 { 920 SVCPOOL *pool = xprt->xp_pool; 921 struct svc_req *r; 922 struct rpc_msg msg; 923 struct mbuf *args; 924 struct svc_loss_callout *s; 925 enum xprt_stat stat; 926 927 /* now receive msgs from xprtprt (support batch calls) */ 928 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 929 930 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 931 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 932 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 933 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 934 enum auth_stat why; 935 936 /* 937 * Handle replays and authenticate before queuing the 938 * request to be executed. 939 */ 940 SVC_ACQUIRE(xprt); 941 r->rq_xprt = xprt; 942 if (pool->sp_rcache) { 943 struct rpc_msg repmsg; 944 struct mbuf *repbody; 945 enum replay_state rs; 946 rs = replay_find(pool->sp_rcache, &msg, 947 svc_getrpccaller(r), &repmsg, &repbody); 948 switch (rs) { 949 case RS_NEW: 950 break; 951 case RS_DONE: 952 SVC_REPLY(xprt, &repmsg, r->rq_addr, 953 repbody, &r->rq_reply_seq); 954 if (r->rq_addr) { 955 free(r->rq_addr, M_SONAME); 956 r->rq_addr = NULL; 957 } 958 m_freem(args); 959 goto call_done; 960 961 default: 962 m_freem(args); 963 goto call_done; 964 } 965 } 966 967 r->rq_xid = msg.rm_xid; 968 r->rq_prog = msg.rm_call.cb_prog; 969 r->rq_vers = msg.rm_call.cb_vers; 970 r->rq_proc = msg.rm_call.cb_proc; 971 r->rq_size = sizeof(*r) + m_length(args, NULL); 972 r->rq_args = args; 973 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 974 /* 975 * RPCSEC_GSS uses this return code 976 * for requests that form part of its 977 * context establishment protocol and 978 * should not be dispatched to the 979 * application. 980 */ 981 if (why != RPCSEC_GSS_NODISPATCH) 982 svcerr_auth(r, why); 983 goto call_done; 984 } 985 986 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 987 svcerr_decode(r); 988 goto call_done; 989 } 990 991 /* 992 * Everything checks out, return request to caller. 993 */ 994 *rqstp_ret = r; 995 r = NULL; 996 } 997 call_done: 998 if (r) { 999 svc_freereq(r); 1000 r = NULL; 1001 } 1002 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1003 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1004 (*s->slc_dispatch)(xprt); 1005 xprt_unregister(xprt); 1006 } 1007 1008 return (stat); 1009 } 1010 1011 static void 1012 svc_executereq(struct svc_req *rqstp) 1013 { 1014 SVCXPRT *xprt = rqstp->rq_xprt; 1015 SVCPOOL *pool = xprt->xp_pool; 1016 int prog_found; 1017 rpcvers_t low_vers; 1018 rpcvers_t high_vers; 1019 struct svc_callout *s; 1020 1021 /* now match message with a registered service*/ 1022 prog_found = FALSE; 1023 low_vers = (rpcvers_t) -1L; 1024 high_vers = (rpcvers_t) 0L; 1025 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1026 if (s->sc_prog == rqstp->rq_prog) { 1027 if (s->sc_vers == rqstp->rq_vers) { 1028 /* 1029 * We hand ownership of r to the 1030 * dispatch method - they must call 1031 * svc_freereq. 1032 */ 1033 (*s->sc_dispatch)(rqstp, xprt); 1034 return; 1035 } /* found correct version */ 1036 prog_found = TRUE; 1037 if (s->sc_vers < low_vers) 1038 low_vers = s->sc_vers; 1039 if (s->sc_vers > high_vers) 1040 high_vers = s->sc_vers; 1041 } /* found correct program */ 1042 } 1043 1044 /* 1045 * if we got here, the program or version 1046 * is not served ... 1047 */ 1048 if (prog_found) 1049 svcerr_progvers(rqstp, low_vers, high_vers); 1050 else 1051 svcerr_noprog(rqstp); 1052 1053 svc_freereq(rqstp); 1054 } 1055 1056 static void 1057 svc_checkidle(SVCGROUP *grp) 1058 { 1059 SVCXPRT *xprt, *nxprt; 1060 time_t timo; 1061 struct svcxprt_list cleanup; 1062 1063 TAILQ_INIT(&cleanup); 1064 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1065 /* 1066 * Only some transports have idle timers. Don't time 1067 * something out which is just waking up. 1068 */ 1069 if (!xprt->xp_idletimeout || xprt->xp_thread) 1070 continue; 1071 1072 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1073 if (time_uptime > timo) { 1074 xprt_unregister_locked(xprt); 1075 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1076 } 1077 } 1078 1079 mtx_unlock(&grp->sg_lock); 1080 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1081 SVC_RELEASE(xprt); 1082 } 1083 mtx_lock(&grp->sg_lock); 1084 } 1085 1086 static void 1087 svc_assign_waiting_sockets(SVCPOOL *pool) 1088 { 1089 SVCGROUP *grp; 1090 SVCXPRT *xprt; 1091 int g; 1092 1093 for (g = 0; g < pool->sp_groupcount; g++) { 1094 grp = &pool->sp_groups[g]; 1095 mtx_lock(&grp->sg_lock); 1096 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1097 if (xprt_assignthread(xprt)) 1098 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1099 else 1100 break; 1101 } 1102 mtx_unlock(&grp->sg_lock); 1103 } 1104 } 1105 1106 static void 1107 svc_change_space_used(SVCPOOL *pool, long delta) 1108 { 1109 unsigned long value; 1110 1111 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1112 if (delta > 0) { 1113 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1114 pool->sp_space_throttled = TRUE; 1115 pool->sp_space_throttle_count++; 1116 } 1117 if (value > pool->sp_space_used_highest) 1118 pool->sp_space_used_highest = value; 1119 } else { 1120 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1121 pool->sp_space_throttled = FALSE; 1122 svc_assign_waiting_sockets(pool); 1123 } 1124 } 1125 } 1126 1127 static bool_t 1128 svc_request_space_available(SVCPOOL *pool) 1129 { 1130 1131 if (pool->sp_space_throttled) 1132 return (FALSE); 1133 return (TRUE); 1134 } 1135 1136 static void 1137 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1138 { 1139 SVCPOOL *pool = grp->sg_pool; 1140 SVCTHREAD *st, *stpref; 1141 SVCXPRT *xprt; 1142 enum xprt_stat stat; 1143 struct svc_req *rqstp; 1144 struct proc *p; 1145 long sz; 1146 int error; 1147 1148 st = mem_alloc(sizeof(*st)); 1149 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1150 st->st_pool = pool; 1151 st->st_xprt = NULL; 1152 STAILQ_INIT(&st->st_reqs); 1153 cv_init(&st->st_cond, "rpcsvc"); 1154 1155 mtx_lock(&grp->sg_lock); 1156 1157 /* 1158 * If we are a new thread which was spawned to cope with 1159 * increased load, set the state back to SVCPOOL_ACTIVE. 1160 */ 1161 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1162 grp->sg_state = SVCPOOL_ACTIVE; 1163 1164 while (grp->sg_state != SVCPOOL_CLOSING) { 1165 /* 1166 * Create new thread if requested. 1167 */ 1168 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1169 grp->sg_state = SVCPOOL_THREADSTARTING; 1170 grp->sg_lastcreatetime = time_uptime; 1171 mtx_unlock(&grp->sg_lock); 1172 svc_new_thread(grp); 1173 mtx_lock(&grp->sg_lock); 1174 continue; 1175 } 1176 1177 /* 1178 * Check for idle transports once per second. 1179 */ 1180 if (time_uptime > grp->sg_lastidlecheck) { 1181 grp->sg_lastidlecheck = time_uptime; 1182 svc_checkidle(grp); 1183 } 1184 1185 xprt = st->st_xprt; 1186 if (!xprt) { 1187 /* 1188 * Enforce maxthreads count. 1189 */ 1190 if (!ismaster && grp->sg_threadcount > 1191 grp->sg_maxthreads) 1192 break; 1193 1194 /* 1195 * Before sleeping, see if we can find an 1196 * active transport which isn't being serviced 1197 * by a thread. 1198 */ 1199 if (svc_request_space_available(pool) && 1200 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1201 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1202 SVC_ACQUIRE(xprt); 1203 xprt->xp_thread = st; 1204 st->st_xprt = xprt; 1205 continue; 1206 } 1207 1208 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1209 if (ismaster || (!ismaster && 1210 grp->sg_threadcount > grp->sg_minthreads)) 1211 error = cv_timedwait_sig(&st->st_cond, 1212 &grp->sg_lock, 5 * hz); 1213 else 1214 error = cv_wait_sig(&st->st_cond, 1215 &grp->sg_lock); 1216 if (st->st_xprt == NULL) 1217 LIST_REMOVE(st, st_ilink); 1218 1219 /* 1220 * Reduce worker thread count when idle. 1221 */ 1222 if (error == EWOULDBLOCK) { 1223 if (!ismaster 1224 && (grp->sg_threadcount 1225 > grp->sg_minthreads) 1226 && !st->st_xprt) 1227 break; 1228 } else if (error != 0) { 1229 KASSERT(error == EINTR || error == ERESTART, 1230 ("non-signal error %d", error)); 1231 mtx_unlock(&grp->sg_lock); 1232 p = curproc; 1233 PROC_LOCK(p); 1234 if (P_SHOULDSTOP(p) || 1235 (p->p_flag & P_TOTAL_STOP) != 0) { 1236 thread_suspend_check(0); 1237 PROC_UNLOCK(p); 1238 mtx_lock(&grp->sg_lock); 1239 } else { 1240 PROC_UNLOCK(p); 1241 svc_exit(pool); 1242 mtx_lock(&grp->sg_lock); 1243 break; 1244 } 1245 } 1246 continue; 1247 } 1248 mtx_unlock(&grp->sg_lock); 1249 1250 /* 1251 * Drain the transport socket and queue up any RPCs. 1252 */ 1253 xprt->xp_lastactive = time_uptime; 1254 do { 1255 if (!svc_request_space_available(pool)) 1256 break; 1257 rqstp = NULL; 1258 stat = svc_getreq(xprt, &rqstp); 1259 if (rqstp) { 1260 svc_change_space_used(pool, rqstp->rq_size); 1261 /* 1262 * See if the application has a preference 1263 * for some other thread. 1264 */ 1265 if (pool->sp_assign) { 1266 stpref = pool->sp_assign(st, rqstp); 1267 rqstp->rq_thread = stpref; 1268 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1269 rqstp, rq_link); 1270 mtx_unlock(&stpref->st_lock); 1271 if (stpref != st) 1272 rqstp = NULL; 1273 } else { 1274 rqstp->rq_thread = st; 1275 STAILQ_INSERT_TAIL(&st->st_reqs, 1276 rqstp, rq_link); 1277 } 1278 } 1279 } while (rqstp == NULL && stat == XPRT_MOREREQS 1280 && grp->sg_state != SVCPOOL_CLOSING); 1281 1282 /* 1283 * Move this transport to the end of the active list to 1284 * ensure fairness when multiple transports are active. 1285 * If this was the last queued request, svc_getreq will end 1286 * up calling xprt_inactive to remove from the active list. 1287 */ 1288 mtx_lock(&grp->sg_lock); 1289 xprt->xp_thread = NULL; 1290 st->st_xprt = NULL; 1291 if (xprt->xp_active) { 1292 if (!svc_request_space_available(pool) || 1293 !xprt_assignthread(xprt)) 1294 TAILQ_INSERT_TAIL(&grp->sg_active, 1295 xprt, xp_alink); 1296 } 1297 mtx_unlock(&grp->sg_lock); 1298 SVC_RELEASE(xprt); 1299 1300 /* 1301 * Execute what we have queued. 1302 */ 1303 mtx_lock(&st->st_lock); 1304 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1305 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1306 mtx_unlock(&st->st_lock); 1307 sz = (long)rqstp->rq_size; 1308 svc_executereq(rqstp); 1309 svc_change_space_used(pool, -sz); 1310 mtx_lock(&st->st_lock); 1311 } 1312 mtx_unlock(&st->st_lock); 1313 mtx_lock(&grp->sg_lock); 1314 } 1315 1316 if (st->st_xprt) { 1317 xprt = st->st_xprt; 1318 st->st_xprt = NULL; 1319 SVC_RELEASE(xprt); 1320 } 1321 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1322 mtx_destroy(&st->st_lock); 1323 cv_destroy(&st->st_cond); 1324 mem_free(st, sizeof(*st)); 1325 1326 grp->sg_threadcount--; 1327 if (!ismaster) 1328 wakeup(grp); 1329 mtx_unlock(&grp->sg_lock); 1330 } 1331 1332 static void 1333 svc_thread_start(void *arg) 1334 { 1335 1336 svc_run_internal((SVCGROUP *) arg, FALSE); 1337 kthread_exit(); 1338 } 1339 1340 static void 1341 svc_new_thread(SVCGROUP *grp) 1342 { 1343 SVCPOOL *pool = grp->sg_pool; 1344 struct thread *td; 1345 1346 mtx_lock(&grp->sg_lock); 1347 grp->sg_threadcount++; 1348 mtx_unlock(&grp->sg_lock); 1349 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1350 "%s: service", pool->sp_name); 1351 } 1352 1353 void 1354 svc_run(SVCPOOL *pool) 1355 { 1356 int g, i; 1357 struct proc *p; 1358 struct thread *td; 1359 SVCGROUP *grp; 1360 1361 p = curproc; 1362 td = curthread; 1363 snprintf(td->td_name, sizeof(td->td_name), 1364 "%s: master", pool->sp_name); 1365 pool->sp_state = SVCPOOL_ACTIVE; 1366 pool->sp_proc = p; 1367 1368 /* Choose group count based on number of threads and CPUs. */ 1369 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1370 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1371 for (g = 0; g < pool->sp_groupcount; g++) { 1372 grp = &pool->sp_groups[g]; 1373 grp->sg_minthreads = max(1, 1374 pool->sp_minthreads / pool->sp_groupcount); 1375 grp->sg_maxthreads = max(1, 1376 pool->sp_maxthreads / pool->sp_groupcount); 1377 grp->sg_lastcreatetime = time_uptime; 1378 } 1379 1380 /* Starting threads */ 1381 pool->sp_groups[0].sg_threadcount++; 1382 for (g = 0; g < pool->sp_groupcount; g++) { 1383 grp = &pool->sp_groups[g]; 1384 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1385 svc_new_thread(grp); 1386 } 1387 svc_run_internal(&pool->sp_groups[0], TRUE); 1388 1389 /* Waiting for threads to stop. */ 1390 for (g = 0; g < pool->sp_groupcount; g++) { 1391 grp = &pool->sp_groups[g]; 1392 mtx_lock(&grp->sg_lock); 1393 while (grp->sg_threadcount > 0) 1394 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1395 mtx_unlock(&grp->sg_lock); 1396 } 1397 } 1398 1399 void 1400 svc_exit(SVCPOOL *pool) 1401 { 1402 SVCGROUP *grp; 1403 SVCTHREAD *st; 1404 int g; 1405 1406 pool->sp_state = SVCPOOL_CLOSING; 1407 for (g = 0; g < pool->sp_groupcount; g++) { 1408 grp = &pool->sp_groups[g]; 1409 mtx_lock(&grp->sg_lock); 1410 if (grp->sg_state != SVCPOOL_CLOSING) { 1411 grp->sg_state = SVCPOOL_CLOSING; 1412 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1413 cv_signal(&st->st_cond); 1414 } 1415 mtx_unlock(&grp->sg_lock); 1416 } 1417 } 1418 1419 bool_t 1420 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1421 { 1422 struct mbuf *m; 1423 XDR xdrs; 1424 bool_t stat; 1425 1426 m = rqstp->rq_args; 1427 rqstp->rq_args = NULL; 1428 1429 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1430 stat = xargs(&xdrs, args); 1431 XDR_DESTROY(&xdrs); 1432 1433 return (stat); 1434 } 1435 1436 bool_t 1437 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1438 { 1439 XDR xdrs; 1440 1441 if (rqstp->rq_addr) { 1442 free(rqstp->rq_addr, M_SONAME); 1443 rqstp->rq_addr = NULL; 1444 } 1445 1446 xdrs.x_op = XDR_FREE; 1447 return (xargs(&xdrs, args)); 1448 } 1449 1450 void 1451 svc_freereq(struct svc_req *rqstp) 1452 { 1453 SVCTHREAD *st; 1454 SVCPOOL *pool; 1455 1456 st = rqstp->rq_thread; 1457 if (st) { 1458 pool = st->st_pool; 1459 if (pool->sp_done) 1460 pool->sp_done(st, rqstp); 1461 } 1462 1463 if (rqstp->rq_auth.svc_ah_ops) 1464 SVCAUTH_RELEASE(&rqstp->rq_auth); 1465 1466 if (rqstp->rq_xprt) { 1467 SVC_RELEASE(rqstp->rq_xprt); 1468 } 1469 1470 if (rqstp->rq_addr) 1471 free(rqstp->rq_addr, M_SONAME); 1472 1473 if (rqstp->rq_args) 1474 m_freem(rqstp->rq_args); 1475 1476 free(rqstp, M_RPC); 1477 } 1478