1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <sys/cdefs.h> 34 /* 35 * svc.c, Server-side remote procedure call interface. 36 * 37 * There are two sets of procedures here. The xprt routines are 38 * for handling transport handles. The svc routines handle the 39 * list of service routines. 40 * 41 * Copyright (C) 1984, Sun Microsystems, Inc. 42 */ 43 44 #include <sys/param.h> 45 #include <sys/jail.h> 46 #include <sys/lock.h> 47 #include <sys/kernel.h> 48 #include <sys/kthread.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/proc.h> 53 #include <sys/queue.h> 54 #include <sys/socketvar.h> 55 #include <sys/systm.h> 56 #include <sys/smp.h> 57 #include <sys/sx.h> 58 #include <sys/ucred.h> 59 60 #include <rpc/rpc.h> 61 #include <rpc/rpcb_clnt.h> 62 #include <rpc/replay.h> 63 64 #include <rpc/rpc_com.h> 65 66 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 67 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 68 69 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 70 char *); 71 static void svc_new_thread(SVCGROUP *grp); 72 static void xprt_unregister_locked(SVCXPRT *xprt); 73 static void svc_change_space_used(SVCPOOL *pool, long delta); 74 static bool_t svc_request_space_available(SVCPOOL *pool); 75 static void svcpool_cleanup(SVCPOOL *pool); 76 77 /* *************** SVCXPRT related stuff **************** */ 78 79 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 80 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 81 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 82 83 SVCPOOL* 84 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 85 { 86 SVCPOOL *pool; 87 SVCGROUP *grp; 88 int g; 89 90 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 91 92 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 93 pool->sp_name = name; 94 pool->sp_state = SVCPOOL_INIT; 95 pool->sp_proc = NULL; 96 TAILQ_INIT(&pool->sp_callouts); 97 TAILQ_INIT(&pool->sp_lcallouts); 98 pool->sp_minthreads = 1; 99 pool->sp_maxthreads = 1; 100 pool->sp_groupcount = 1; 101 for (g = 0; g < SVC_MAXGROUPS; g++) { 102 grp = &pool->sp_groups[g]; 103 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 104 grp->sg_pool = pool; 105 grp->sg_state = SVCPOOL_ACTIVE; 106 TAILQ_INIT(&grp->sg_xlist); 107 TAILQ_INIT(&grp->sg_active); 108 LIST_INIT(&grp->sg_idlethreads); 109 grp->sg_minthreads = 1; 110 grp->sg_maxthreads = 1; 111 } 112 113 /* 114 * Don't use more than a quarter of mbuf clusters. Nota bene: 115 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 116 * on LP64 architectures, so cast to u_long to avoid undefined 117 * behavior. (ILP32 architectures cannot have nmbclusters 118 * large enough to overflow for other reasons.) 119 */ 120 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 121 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 122 123 sysctl_ctx_init(&pool->sp_sysctl); 124 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) { 125 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 126 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 127 pool, 0, svcpool_minthread_sysctl, "I", 128 "Minimal number of threads"); 129 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 130 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 131 pool, 0, svcpool_maxthread_sysctl, "I", 132 "Maximal number of threads"); 133 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 134 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 135 pool, 0, svcpool_threads_sysctl, "I", 136 "Current number of threads"); 137 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 138 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 139 "Number of thread groups"); 140 141 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 142 "request_space_used", CTLFLAG_RD, 143 &pool->sp_space_used, 144 "Space in parsed but not handled requests."); 145 146 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_used_highest", CTLFLAG_RD, 148 &pool->sp_space_used_highest, 149 "Highest space used since reboot."); 150 151 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 152 "request_space_high", CTLFLAG_RW, 153 &pool->sp_space_high, 154 "Maximum space in parsed but not handled requests."); 155 156 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 157 "request_space_low", CTLFLAG_RW, 158 &pool->sp_space_low, 159 "Low water mark for request space."); 160 161 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 162 "request_space_throttled", CTLFLAG_RD, 163 &pool->sp_space_throttled, 0, 164 "Whether nfs requests are currently throttled"); 165 166 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 167 "request_space_throttle_count", CTLFLAG_RD, 168 &pool->sp_space_throttle_count, 0, 169 "Count of times throttling based on request space has occurred"); 170 } 171 172 return pool; 173 } 174 175 /* 176 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 177 * the pool data structures. 178 */ 179 static void 180 svcpool_cleanup(SVCPOOL *pool) 181 { 182 SVCGROUP *grp; 183 SVCXPRT *xprt, *nxprt; 184 struct svc_callout *s; 185 struct svc_loss_callout *sl; 186 struct svcxprt_list cleanup; 187 int g; 188 189 TAILQ_INIT(&cleanup); 190 191 for (g = 0; g < SVC_MAXGROUPS; g++) { 192 grp = &pool->sp_groups[g]; 193 mtx_lock(&grp->sg_lock); 194 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 195 xprt_unregister_locked(xprt); 196 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 197 } 198 mtx_unlock(&grp->sg_lock); 199 } 200 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 201 if (xprt->xp_socket != NULL) 202 soshutdown(xprt->xp_socket, SHUT_WR); 203 SVC_RELEASE(xprt); 204 } 205 206 mtx_lock(&pool->sp_lock); 207 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 208 mtx_unlock(&pool->sp_lock); 209 svc_unreg(pool, s->sc_prog, s->sc_vers); 210 mtx_lock(&pool->sp_lock); 211 } 212 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 213 mtx_unlock(&pool->sp_lock); 214 svc_loss_unreg(pool, sl->slc_dispatch); 215 mtx_lock(&pool->sp_lock); 216 } 217 mtx_unlock(&pool->sp_lock); 218 } 219 220 void 221 svcpool_destroy(SVCPOOL *pool) 222 { 223 SVCGROUP *grp; 224 int g; 225 226 svcpool_cleanup(pool); 227 228 for (g = 0; g < SVC_MAXGROUPS; g++) { 229 grp = &pool->sp_groups[g]; 230 mtx_destroy(&grp->sg_lock); 231 } 232 mtx_destroy(&pool->sp_lock); 233 234 if (pool->sp_rcache) 235 replay_freecache(pool->sp_rcache); 236 237 sysctl_ctx_free(&pool->sp_sysctl); 238 free(pool, M_RPC); 239 } 240 241 /* 242 * Similar to svcpool_destroy(), except that it does not destroy the actual 243 * data structures. As such, "pool" may be used again. 244 */ 245 void 246 svcpool_close(SVCPOOL *pool) 247 { 248 SVCGROUP *grp; 249 int g; 250 251 svcpool_cleanup(pool); 252 253 /* Now, initialize the pool's state for a fresh svc_run() call. */ 254 mtx_lock(&pool->sp_lock); 255 pool->sp_state = SVCPOOL_INIT; 256 mtx_unlock(&pool->sp_lock); 257 for (g = 0; g < SVC_MAXGROUPS; g++) { 258 grp = &pool->sp_groups[g]; 259 mtx_lock(&grp->sg_lock); 260 grp->sg_state = SVCPOOL_ACTIVE; 261 mtx_unlock(&grp->sg_lock); 262 } 263 } 264 265 /* 266 * Sysctl handler to get the present thread count on a pool 267 */ 268 static int 269 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 270 { 271 SVCPOOL *pool; 272 int threads, error, g; 273 274 pool = oidp->oid_arg1; 275 threads = 0; 276 mtx_lock(&pool->sp_lock); 277 for (g = 0; g < pool->sp_groupcount; g++) 278 threads += pool->sp_groups[g].sg_threadcount; 279 mtx_unlock(&pool->sp_lock); 280 error = sysctl_handle_int(oidp, &threads, 0, req); 281 return (error); 282 } 283 284 /* 285 * Sysctl handler to set the minimum thread count on a pool 286 */ 287 static int 288 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 289 { 290 SVCPOOL *pool; 291 int newminthreads, error, g; 292 293 pool = oidp->oid_arg1; 294 newminthreads = pool->sp_minthreads; 295 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 296 if (error == 0 && newminthreads != pool->sp_minthreads) { 297 if (newminthreads > pool->sp_maxthreads) 298 return (EINVAL); 299 mtx_lock(&pool->sp_lock); 300 pool->sp_minthreads = newminthreads; 301 for (g = 0; g < pool->sp_groupcount; g++) { 302 pool->sp_groups[g].sg_minthreads = max(1, 303 pool->sp_minthreads / pool->sp_groupcount); 304 } 305 mtx_unlock(&pool->sp_lock); 306 } 307 return (error); 308 } 309 310 /* 311 * Sysctl handler to set the maximum thread count on a pool 312 */ 313 static int 314 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 315 { 316 SVCPOOL *pool; 317 int newmaxthreads, error, g; 318 319 pool = oidp->oid_arg1; 320 newmaxthreads = pool->sp_maxthreads; 321 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 322 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 323 if (newmaxthreads < pool->sp_minthreads) 324 return (EINVAL); 325 mtx_lock(&pool->sp_lock); 326 pool->sp_maxthreads = newmaxthreads; 327 for (g = 0; g < pool->sp_groupcount; g++) { 328 pool->sp_groups[g].sg_maxthreads = max(1, 329 pool->sp_maxthreads / pool->sp_groupcount); 330 } 331 mtx_unlock(&pool->sp_lock); 332 } 333 return (error); 334 } 335 336 /* 337 * Activate a transport handle. 338 */ 339 void 340 xprt_register(SVCXPRT *xprt) 341 { 342 SVCPOOL *pool = xprt->xp_pool; 343 SVCGROUP *grp; 344 int g; 345 346 SVC_ACQUIRE(xprt); 347 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 348 xprt->xp_group = grp = &pool->sp_groups[g]; 349 mtx_lock(&grp->sg_lock); 350 xprt->xp_registered = TRUE; 351 xprt->xp_active = FALSE; 352 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 353 mtx_unlock(&grp->sg_lock); 354 } 355 356 /* 357 * De-activate a transport handle. Note: the locked version doesn't 358 * release the transport - caller must do that after dropping the pool 359 * lock. 360 */ 361 static void 362 xprt_unregister_locked(SVCXPRT *xprt) 363 { 364 SVCGROUP *grp = xprt->xp_group; 365 366 mtx_assert(&grp->sg_lock, MA_OWNED); 367 KASSERT(xprt->xp_registered == TRUE, 368 ("xprt_unregister_locked: not registered")); 369 xprt_inactive_locked(xprt); 370 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 371 xprt->xp_registered = FALSE; 372 } 373 374 void 375 xprt_unregister(SVCXPRT *xprt) 376 { 377 SVCGROUP *grp = xprt->xp_group; 378 379 mtx_lock(&grp->sg_lock); 380 if (xprt->xp_registered == FALSE) { 381 /* Already unregistered by another thread */ 382 mtx_unlock(&grp->sg_lock); 383 return; 384 } 385 xprt_unregister_locked(xprt); 386 mtx_unlock(&grp->sg_lock); 387 388 if (xprt->xp_socket != NULL) 389 soshutdown(xprt->xp_socket, SHUT_WR); 390 SVC_RELEASE(xprt); 391 } 392 393 /* 394 * Attempt to assign a service thread to this transport. 395 */ 396 static int 397 xprt_assignthread(SVCXPRT *xprt) 398 { 399 SVCGROUP *grp = xprt->xp_group; 400 SVCTHREAD *st; 401 402 mtx_assert(&grp->sg_lock, MA_OWNED); 403 st = LIST_FIRST(&grp->sg_idlethreads); 404 if (st) { 405 LIST_REMOVE(st, st_ilink); 406 SVC_ACQUIRE(xprt); 407 xprt->xp_thread = st; 408 st->st_xprt = xprt; 409 cv_signal(&st->st_cond); 410 return (TRUE); 411 } else { 412 /* 413 * See if we can create a new thread. The 414 * actual thread creation happens in 415 * svc_run_internal because our locking state 416 * is poorly defined (we are typically called 417 * from a socket upcall). Don't create more 418 * than one thread per second. 419 */ 420 if (grp->sg_state == SVCPOOL_ACTIVE 421 && grp->sg_lastcreatetime < time_uptime 422 && grp->sg_threadcount < grp->sg_maxthreads) { 423 grp->sg_state = SVCPOOL_THREADWANTED; 424 } 425 } 426 return (FALSE); 427 } 428 429 void 430 xprt_active(SVCXPRT *xprt) 431 { 432 SVCGROUP *grp = xprt->xp_group; 433 434 mtx_lock(&grp->sg_lock); 435 436 if (!xprt->xp_registered) { 437 /* 438 * Race with xprt_unregister - we lose. 439 */ 440 mtx_unlock(&grp->sg_lock); 441 return; 442 } 443 444 if (!xprt->xp_active) { 445 xprt->xp_active = TRUE; 446 if (xprt->xp_thread == NULL) { 447 if (!svc_request_space_available(xprt->xp_pool) || 448 !xprt_assignthread(xprt)) 449 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 450 xp_alink); 451 } 452 } 453 454 mtx_unlock(&grp->sg_lock); 455 } 456 457 void 458 xprt_inactive_locked(SVCXPRT *xprt) 459 { 460 SVCGROUP *grp = xprt->xp_group; 461 462 mtx_assert(&grp->sg_lock, MA_OWNED); 463 if (xprt->xp_active) { 464 if (xprt->xp_thread == NULL) 465 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 466 xprt->xp_active = FALSE; 467 } 468 } 469 470 void 471 xprt_inactive(SVCXPRT *xprt) 472 { 473 SVCGROUP *grp = xprt->xp_group; 474 475 mtx_lock(&grp->sg_lock); 476 xprt_inactive_locked(xprt); 477 mtx_unlock(&grp->sg_lock); 478 } 479 480 /* 481 * Variant of xprt_inactive() for use only when sure that port is 482 * assigned to thread. For example, within receive handlers. 483 */ 484 void 485 xprt_inactive_self(SVCXPRT *xprt) 486 { 487 488 KASSERT(xprt->xp_thread != NULL, 489 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 490 xprt->xp_active = FALSE; 491 } 492 493 /* 494 * Add a service program to the callout list. 495 * The dispatch routine will be called when a rpc request for this 496 * program number comes in. 497 */ 498 bool_t 499 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 500 void (*dispatch)(struct svc_req *, SVCXPRT *), 501 const struct netconfig *nconf) 502 { 503 SVCPOOL *pool = xprt->xp_pool; 504 struct svc_callout *s; 505 char *netid = NULL; 506 int flag = 0; 507 508 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 509 510 if (xprt->xp_netid) { 511 netid = strdup(xprt->xp_netid, M_RPC); 512 flag = 1; 513 } else if (nconf && nconf->nc_netid) { 514 netid = strdup(nconf->nc_netid, M_RPC); 515 flag = 1; 516 } /* must have been created with svc_raw_create */ 517 if ((netid == NULL) && (flag == 1)) { 518 return (FALSE); 519 } 520 521 mtx_lock(&pool->sp_lock); 522 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 523 if (netid) 524 free(netid, M_RPC); 525 if (s->sc_dispatch == dispatch) 526 goto rpcb_it; /* he is registering another xptr */ 527 mtx_unlock(&pool->sp_lock); 528 return (FALSE); 529 } 530 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 531 if (s == NULL) { 532 if (netid) 533 free(netid, M_RPC); 534 mtx_unlock(&pool->sp_lock); 535 return (FALSE); 536 } 537 538 s->sc_prog = prog; 539 s->sc_vers = vers; 540 s->sc_dispatch = dispatch; 541 s->sc_netid = netid; 542 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 543 544 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 545 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 546 547 rpcb_it: 548 mtx_unlock(&pool->sp_lock); 549 /* now register the information with the local binder service */ 550 if (nconf) { 551 bool_t dummy; 552 struct netconfig tnc; 553 struct netbuf nb; 554 tnc = *nconf; 555 nb.buf = &xprt->xp_ltaddr; 556 nb.len = xprt->xp_ltaddr.ss_len; 557 dummy = rpcb_set(prog, vers, &tnc, &nb); 558 return (dummy); 559 } 560 return (TRUE); 561 } 562 563 /* 564 * Remove a service program from the callout list. 565 */ 566 void 567 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 568 { 569 struct svc_callout *s; 570 571 /* unregister the information anyway */ 572 (void) rpcb_unset(prog, vers, NULL); 573 mtx_lock(&pool->sp_lock); 574 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 575 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 576 if (s->sc_netid) 577 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 578 mem_free(s, sizeof (struct svc_callout)); 579 } 580 mtx_unlock(&pool->sp_lock); 581 } 582 583 /* 584 * Add a service connection loss program to the callout list. 585 * The dispatch routine will be called when some port in ths pool die. 586 */ 587 bool_t 588 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 589 { 590 SVCPOOL *pool = xprt->xp_pool; 591 struct svc_loss_callout *s; 592 593 mtx_lock(&pool->sp_lock); 594 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 595 if (s->slc_dispatch == dispatch) 596 break; 597 } 598 if (s != NULL) { 599 mtx_unlock(&pool->sp_lock); 600 return (TRUE); 601 } 602 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 603 if (s == NULL) { 604 mtx_unlock(&pool->sp_lock); 605 return (FALSE); 606 } 607 s->slc_dispatch = dispatch; 608 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 609 mtx_unlock(&pool->sp_lock); 610 return (TRUE); 611 } 612 613 /* 614 * Remove a service connection loss program from the callout list. 615 */ 616 void 617 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 618 { 619 struct svc_loss_callout *s; 620 621 mtx_lock(&pool->sp_lock); 622 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 623 if (s->slc_dispatch == dispatch) { 624 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 625 free(s, M_RPC); 626 break; 627 } 628 } 629 mtx_unlock(&pool->sp_lock); 630 } 631 632 /* ********************** CALLOUT list related stuff ************* */ 633 634 /* 635 * Search the callout list for a program number, return the callout 636 * struct. 637 */ 638 static struct svc_callout * 639 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 640 { 641 struct svc_callout *s; 642 643 mtx_assert(&pool->sp_lock, MA_OWNED); 644 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 645 if (s->sc_prog == prog && s->sc_vers == vers 646 && (netid == NULL || s->sc_netid == NULL || 647 strcmp(netid, s->sc_netid) == 0)) 648 break; 649 } 650 651 return (s); 652 } 653 654 /* ******************* REPLY GENERATION ROUTINES ************ */ 655 656 static bool_t 657 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 658 struct mbuf *body) 659 { 660 SVCXPRT *xprt = rqstp->rq_xprt; 661 bool_t ok; 662 663 if (rqstp->rq_args) { 664 m_freem(rqstp->rq_args); 665 rqstp->rq_args = NULL; 666 } 667 668 if (xprt->xp_pool->sp_rcache) 669 replay_setreply(xprt->xp_pool->sp_rcache, 670 rply, svc_getrpccaller(rqstp), body); 671 672 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 673 return (FALSE); 674 675 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 676 if (rqstp->rq_addr) { 677 free(rqstp->rq_addr, M_SONAME); 678 rqstp->rq_addr = NULL; 679 } 680 681 return (ok); 682 } 683 684 /* 685 * Send a reply to an rpc request 686 */ 687 bool_t 688 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 689 { 690 struct rpc_msg rply; 691 struct mbuf *m; 692 XDR xdrs; 693 bool_t ok; 694 695 rply.rm_xid = rqstp->rq_xid; 696 rply.rm_direction = REPLY; 697 rply.rm_reply.rp_stat = MSG_ACCEPTED; 698 rply.acpted_rply.ar_verf = rqstp->rq_verf; 699 rply.acpted_rply.ar_stat = SUCCESS; 700 rply.acpted_rply.ar_results.where = NULL; 701 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 702 703 m = m_getcl(M_WAITOK, MT_DATA, 0); 704 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 705 ok = xdr_results(&xdrs, xdr_location); 706 XDR_DESTROY(&xdrs); 707 708 if (ok) { 709 return (svc_sendreply_common(rqstp, &rply, m)); 710 } else { 711 m_freem(m); 712 return (FALSE); 713 } 714 } 715 716 bool_t 717 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 718 { 719 struct rpc_msg rply; 720 721 rply.rm_xid = rqstp->rq_xid; 722 rply.rm_direction = REPLY; 723 rply.rm_reply.rp_stat = MSG_ACCEPTED; 724 rply.acpted_rply.ar_verf = rqstp->rq_verf; 725 rply.acpted_rply.ar_stat = SUCCESS; 726 rply.acpted_rply.ar_results.where = NULL; 727 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 728 729 return (svc_sendreply_common(rqstp, &rply, m)); 730 } 731 732 /* 733 * No procedure error reply 734 */ 735 void 736 svcerr_noproc(struct svc_req *rqstp) 737 { 738 SVCXPRT *xprt = rqstp->rq_xprt; 739 struct rpc_msg rply; 740 741 rply.rm_xid = rqstp->rq_xid; 742 rply.rm_direction = REPLY; 743 rply.rm_reply.rp_stat = MSG_ACCEPTED; 744 rply.acpted_rply.ar_verf = rqstp->rq_verf; 745 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 746 747 if (xprt->xp_pool->sp_rcache) 748 replay_setreply(xprt->xp_pool->sp_rcache, 749 &rply, svc_getrpccaller(rqstp), NULL); 750 751 svc_sendreply_common(rqstp, &rply, NULL); 752 } 753 754 /* 755 * Can't decode args error reply 756 */ 757 void 758 svcerr_decode(struct svc_req *rqstp) 759 { 760 SVCXPRT *xprt = rqstp->rq_xprt; 761 struct rpc_msg rply; 762 763 rply.rm_xid = rqstp->rq_xid; 764 rply.rm_direction = REPLY; 765 rply.rm_reply.rp_stat = MSG_ACCEPTED; 766 rply.acpted_rply.ar_verf = rqstp->rq_verf; 767 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 768 769 if (xprt->xp_pool->sp_rcache) 770 replay_setreply(xprt->xp_pool->sp_rcache, 771 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 772 773 svc_sendreply_common(rqstp, &rply, NULL); 774 } 775 776 /* 777 * Some system error 778 */ 779 void 780 svcerr_systemerr(struct svc_req *rqstp) 781 { 782 SVCXPRT *xprt = rqstp->rq_xprt; 783 struct rpc_msg rply; 784 785 rply.rm_xid = rqstp->rq_xid; 786 rply.rm_direction = REPLY; 787 rply.rm_reply.rp_stat = MSG_ACCEPTED; 788 rply.acpted_rply.ar_verf = rqstp->rq_verf; 789 rply.acpted_rply.ar_stat = SYSTEM_ERR; 790 791 if (xprt->xp_pool->sp_rcache) 792 replay_setreply(xprt->xp_pool->sp_rcache, 793 &rply, svc_getrpccaller(rqstp), NULL); 794 795 svc_sendreply_common(rqstp, &rply, NULL); 796 } 797 798 /* 799 * Authentication error reply 800 */ 801 void 802 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 803 { 804 SVCXPRT *xprt = rqstp->rq_xprt; 805 struct rpc_msg rply; 806 807 rply.rm_xid = rqstp->rq_xid; 808 rply.rm_direction = REPLY; 809 rply.rm_reply.rp_stat = MSG_DENIED; 810 rply.rjcted_rply.rj_stat = AUTH_ERROR; 811 rply.rjcted_rply.rj_why = why; 812 813 if (xprt->xp_pool->sp_rcache) 814 replay_setreply(xprt->xp_pool->sp_rcache, 815 &rply, svc_getrpccaller(rqstp), NULL); 816 817 svc_sendreply_common(rqstp, &rply, NULL); 818 } 819 820 /* 821 * Auth too weak error reply 822 */ 823 void 824 svcerr_weakauth(struct svc_req *rqstp) 825 { 826 827 svcerr_auth(rqstp, AUTH_TOOWEAK); 828 } 829 830 /* 831 * Program unavailable error reply 832 */ 833 void 834 svcerr_noprog(struct svc_req *rqstp) 835 { 836 SVCXPRT *xprt = rqstp->rq_xprt; 837 struct rpc_msg rply; 838 839 rply.rm_xid = rqstp->rq_xid; 840 rply.rm_direction = REPLY; 841 rply.rm_reply.rp_stat = MSG_ACCEPTED; 842 rply.acpted_rply.ar_verf = rqstp->rq_verf; 843 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 844 845 if (xprt->xp_pool->sp_rcache) 846 replay_setreply(xprt->xp_pool->sp_rcache, 847 &rply, svc_getrpccaller(rqstp), NULL); 848 849 svc_sendreply_common(rqstp, &rply, NULL); 850 } 851 852 /* 853 * Program version mismatch error reply 854 */ 855 void 856 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 857 { 858 SVCXPRT *xprt = rqstp->rq_xprt; 859 struct rpc_msg rply; 860 861 rply.rm_xid = rqstp->rq_xid; 862 rply.rm_direction = REPLY; 863 rply.rm_reply.rp_stat = MSG_ACCEPTED; 864 rply.acpted_rply.ar_verf = rqstp->rq_verf; 865 rply.acpted_rply.ar_stat = PROG_MISMATCH; 866 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 867 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 868 869 if (xprt->xp_pool->sp_rcache) 870 replay_setreply(xprt->xp_pool->sp_rcache, 871 &rply, svc_getrpccaller(rqstp), NULL); 872 873 svc_sendreply_common(rqstp, &rply, NULL); 874 } 875 876 /* 877 * Allocate a new server transport structure. All fields are 878 * initialized to zero and xp_p3 is initialized to point at an 879 * extension structure to hold various flags and authentication 880 * parameters. 881 */ 882 SVCXPRT * 883 svc_xprt_alloc(void) 884 { 885 SVCXPRT *xprt; 886 SVCXPRT_EXT *ext; 887 888 xprt = mem_alloc(sizeof(SVCXPRT)); 889 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 890 xprt->xp_p3 = ext; 891 refcount_init(&xprt->xp_refs, 1); 892 893 return (xprt); 894 } 895 896 /* 897 * Free a server transport structure. 898 */ 899 void 900 svc_xprt_free(SVCXPRT *xprt) 901 { 902 903 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 904 /* The size argument is ignored, so 0 is ok. */ 905 mem_free(xprt->xp_gidp, 0); 906 mem_free(xprt, sizeof(SVCXPRT)); 907 } 908 909 /* ******************* SERVER INPUT STUFF ******************* */ 910 911 /* 912 * Read RPC requests from a transport and queue them to be 913 * executed. We handle authentication and replay cache replies here. 914 * Actually dispatching the RPC is deferred till svc_executereq. 915 */ 916 static enum xprt_stat 917 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 918 { 919 SVCPOOL *pool = xprt->xp_pool; 920 struct svc_req *r; 921 struct rpc_msg msg; 922 struct mbuf *args; 923 struct svc_loss_callout *s; 924 enum xprt_stat stat; 925 926 /* now receive msgs from xprtprt (support batch calls) */ 927 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 928 929 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 930 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 931 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 932 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 933 enum auth_stat why; 934 935 /* 936 * Handle replays and authenticate before queuing the 937 * request to be executed. 938 */ 939 SVC_ACQUIRE(xprt); 940 r->rq_xprt = xprt; 941 if (pool->sp_rcache) { 942 struct rpc_msg repmsg; 943 struct mbuf *repbody; 944 enum replay_state rs; 945 rs = replay_find(pool->sp_rcache, &msg, 946 svc_getrpccaller(r), &repmsg, &repbody); 947 switch (rs) { 948 case RS_NEW: 949 break; 950 case RS_DONE: 951 SVC_REPLY(xprt, &repmsg, r->rq_addr, 952 repbody, &r->rq_reply_seq); 953 if (r->rq_addr) { 954 free(r->rq_addr, M_SONAME); 955 r->rq_addr = NULL; 956 } 957 m_freem(args); 958 goto call_done; 959 960 default: 961 m_freem(args); 962 goto call_done; 963 } 964 } 965 966 r->rq_xid = msg.rm_xid; 967 r->rq_prog = msg.rm_call.cb_prog; 968 r->rq_vers = msg.rm_call.cb_vers; 969 r->rq_proc = msg.rm_call.cb_proc; 970 r->rq_size = sizeof(*r) + m_length(args, NULL); 971 r->rq_args = args; 972 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 973 /* 974 * RPCSEC_GSS uses this return code 975 * for requests that form part of its 976 * context establishment protocol and 977 * should not be dispatched to the 978 * application. 979 */ 980 if (why != RPCSEC_GSS_NODISPATCH) 981 svcerr_auth(r, why); 982 goto call_done; 983 } 984 985 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 986 svcerr_decode(r); 987 goto call_done; 988 } 989 990 /* 991 * Everything checks out, return request to caller. 992 */ 993 *rqstp_ret = r; 994 r = NULL; 995 } 996 call_done: 997 if (r) { 998 svc_freereq(r); 999 r = NULL; 1000 } 1001 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1002 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1003 (*s->slc_dispatch)(xprt); 1004 xprt_unregister(xprt); 1005 } 1006 1007 return (stat); 1008 } 1009 1010 static void 1011 svc_executereq(struct svc_req *rqstp) 1012 { 1013 SVCXPRT *xprt = rqstp->rq_xprt; 1014 SVCPOOL *pool = xprt->xp_pool; 1015 int prog_found; 1016 rpcvers_t low_vers; 1017 rpcvers_t high_vers; 1018 struct svc_callout *s; 1019 1020 /* now match message with a registered service*/ 1021 prog_found = FALSE; 1022 low_vers = (rpcvers_t) -1L; 1023 high_vers = (rpcvers_t) 0L; 1024 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1025 if (s->sc_prog == rqstp->rq_prog) { 1026 if (s->sc_vers == rqstp->rq_vers) { 1027 /* 1028 * We hand ownership of r to the 1029 * dispatch method - they must call 1030 * svc_freereq. 1031 */ 1032 (*s->sc_dispatch)(rqstp, xprt); 1033 return; 1034 } /* found correct version */ 1035 prog_found = TRUE; 1036 if (s->sc_vers < low_vers) 1037 low_vers = s->sc_vers; 1038 if (s->sc_vers > high_vers) 1039 high_vers = s->sc_vers; 1040 } /* found correct program */ 1041 } 1042 1043 /* 1044 * if we got here, the program or version 1045 * is not served ... 1046 */ 1047 if (prog_found) 1048 svcerr_progvers(rqstp, low_vers, high_vers); 1049 else 1050 svcerr_noprog(rqstp); 1051 1052 svc_freereq(rqstp); 1053 } 1054 1055 static void 1056 svc_checkidle(SVCGROUP *grp) 1057 { 1058 SVCXPRT *xprt, *nxprt; 1059 time_t timo; 1060 struct svcxprt_list cleanup; 1061 1062 TAILQ_INIT(&cleanup); 1063 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1064 /* 1065 * Only some transports have idle timers. Don't time 1066 * something out which is just waking up. 1067 */ 1068 if (!xprt->xp_idletimeout || xprt->xp_thread) 1069 continue; 1070 1071 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1072 if (time_uptime > timo) { 1073 xprt_unregister_locked(xprt); 1074 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1075 } 1076 } 1077 1078 mtx_unlock(&grp->sg_lock); 1079 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1080 soshutdown(xprt->xp_socket, SHUT_WR); 1081 SVC_RELEASE(xprt); 1082 } 1083 mtx_lock(&grp->sg_lock); 1084 } 1085 1086 static void 1087 svc_assign_waiting_sockets(SVCPOOL *pool) 1088 { 1089 SVCGROUP *grp; 1090 SVCXPRT *xprt; 1091 int g; 1092 1093 for (g = 0; g < pool->sp_groupcount; g++) { 1094 grp = &pool->sp_groups[g]; 1095 mtx_lock(&grp->sg_lock); 1096 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1097 if (xprt_assignthread(xprt)) 1098 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1099 else 1100 break; 1101 } 1102 mtx_unlock(&grp->sg_lock); 1103 } 1104 } 1105 1106 static void 1107 svc_change_space_used(SVCPOOL *pool, long delta) 1108 { 1109 unsigned long value; 1110 1111 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1112 if (delta > 0) { 1113 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1114 pool->sp_space_throttled = TRUE; 1115 pool->sp_space_throttle_count++; 1116 } 1117 if (value > pool->sp_space_used_highest) 1118 pool->sp_space_used_highest = value; 1119 } else { 1120 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1121 pool->sp_space_throttled = FALSE; 1122 svc_assign_waiting_sockets(pool); 1123 } 1124 } 1125 } 1126 1127 static bool_t 1128 svc_request_space_available(SVCPOOL *pool) 1129 { 1130 1131 if (pool->sp_space_throttled) 1132 return (FALSE); 1133 return (TRUE); 1134 } 1135 1136 static void 1137 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1138 { 1139 SVCPOOL *pool = grp->sg_pool; 1140 SVCTHREAD *st, *stpref; 1141 SVCXPRT *xprt; 1142 enum xprt_stat stat; 1143 struct svc_req *rqstp; 1144 struct proc *p; 1145 long sz; 1146 int error; 1147 1148 st = mem_alloc(sizeof(*st)); 1149 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1150 st->st_pool = pool; 1151 st->st_xprt = NULL; 1152 STAILQ_INIT(&st->st_reqs); 1153 cv_init(&st->st_cond, "rpcsvc"); 1154 1155 mtx_lock(&grp->sg_lock); 1156 1157 /* 1158 * If we are a new thread which was spawned to cope with 1159 * increased load, set the state back to SVCPOOL_ACTIVE. 1160 */ 1161 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1162 grp->sg_state = SVCPOOL_ACTIVE; 1163 1164 while (grp->sg_state != SVCPOOL_CLOSING) { 1165 /* 1166 * Create new thread if requested. 1167 */ 1168 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1169 grp->sg_state = SVCPOOL_THREADSTARTING; 1170 grp->sg_lastcreatetime = time_uptime; 1171 mtx_unlock(&grp->sg_lock); 1172 svc_new_thread(grp); 1173 mtx_lock(&grp->sg_lock); 1174 continue; 1175 } 1176 1177 /* 1178 * Check for idle transports once per second. 1179 */ 1180 if (time_uptime > grp->sg_lastidlecheck) { 1181 grp->sg_lastidlecheck = time_uptime; 1182 svc_checkidle(grp); 1183 } 1184 1185 xprt = st->st_xprt; 1186 if (!xprt) { 1187 /* 1188 * Enforce maxthreads count. 1189 */ 1190 if (!ismaster && grp->sg_threadcount > 1191 grp->sg_maxthreads) 1192 break; 1193 1194 /* 1195 * Before sleeping, see if we can find an 1196 * active transport which isn't being serviced 1197 * by a thread. 1198 */ 1199 if (svc_request_space_available(pool) && 1200 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1201 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1202 SVC_ACQUIRE(xprt); 1203 xprt->xp_thread = st; 1204 st->st_xprt = xprt; 1205 continue; 1206 } 1207 1208 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1209 if (ismaster || (!ismaster && 1210 grp->sg_threadcount > grp->sg_minthreads)) 1211 error = cv_timedwait_sig(&st->st_cond, 1212 &grp->sg_lock, 5 * hz); 1213 else 1214 error = cv_wait_sig(&st->st_cond, 1215 &grp->sg_lock); 1216 if (st->st_xprt == NULL) 1217 LIST_REMOVE(st, st_ilink); 1218 1219 /* 1220 * Reduce worker thread count when idle. 1221 */ 1222 if (error == EWOULDBLOCK) { 1223 if (!ismaster 1224 && (grp->sg_threadcount 1225 > grp->sg_minthreads) 1226 && !st->st_xprt) 1227 break; 1228 } else if (error != 0) { 1229 KASSERT(error == EINTR || error == ERESTART, 1230 ("non-signal error %d", error)); 1231 mtx_unlock(&grp->sg_lock); 1232 p = curproc; 1233 PROC_LOCK(p); 1234 if (P_SHOULDSTOP(p) || 1235 (p->p_flag & P_TOTAL_STOP) != 0) { 1236 thread_suspend_check(0); 1237 PROC_UNLOCK(p); 1238 mtx_lock(&grp->sg_lock); 1239 } else { 1240 PROC_UNLOCK(p); 1241 svc_exit(pool); 1242 mtx_lock(&grp->sg_lock); 1243 break; 1244 } 1245 } 1246 continue; 1247 } 1248 mtx_unlock(&grp->sg_lock); 1249 1250 /* 1251 * Drain the transport socket and queue up any RPCs. 1252 */ 1253 xprt->xp_lastactive = time_uptime; 1254 do { 1255 if (!svc_request_space_available(pool)) 1256 break; 1257 rqstp = NULL; 1258 stat = svc_getreq(xprt, &rqstp); 1259 if (rqstp) { 1260 svc_change_space_used(pool, rqstp->rq_size); 1261 /* 1262 * See if the application has a preference 1263 * for some other thread. 1264 */ 1265 if (pool->sp_assign) { 1266 stpref = pool->sp_assign(st, rqstp); 1267 rqstp->rq_thread = stpref; 1268 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1269 rqstp, rq_link); 1270 mtx_unlock(&stpref->st_lock); 1271 if (stpref != st) 1272 rqstp = NULL; 1273 } else { 1274 rqstp->rq_thread = st; 1275 STAILQ_INSERT_TAIL(&st->st_reqs, 1276 rqstp, rq_link); 1277 } 1278 } 1279 } while (rqstp == NULL && stat == XPRT_MOREREQS 1280 && grp->sg_state != SVCPOOL_CLOSING); 1281 1282 /* 1283 * Move this transport to the end of the active list to 1284 * ensure fairness when multiple transports are active. 1285 * If this was the last queued request, svc_getreq will end 1286 * up calling xprt_inactive to remove from the active list. 1287 */ 1288 mtx_lock(&grp->sg_lock); 1289 xprt->xp_thread = NULL; 1290 st->st_xprt = NULL; 1291 if (xprt->xp_active) { 1292 if (!svc_request_space_available(pool) || 1293 !xprt_assignthread(xprt)) 1294 TAILQ_INSERT_TAIL(&grp->sg_active, 1295 xprt, xp_alink); 1296 } 1297 mtx_unlock(&grp->sg_lock); 1298 SVC_RELEASE(xprt); 1299 1300 /* 1301 * Execute what we have queued. 1302 */ 1303 mtx_lock(&st->st_lock); 1304 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1305 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1306 mtx_unlock(&st->st_lock); 1307 sz = (long)rqstp->rq_size; 1308 svc_executereq(rqstp); 1309 svc_change_space_used(pool, -sz); 1310 mtx_lock(&st->st_lock); 1311 } 1312 mtx_unlock(&st->st_lock); 1313 mtx_lock(&grp->sg_lock); 1314 } 1315 1316 if (st->st_xprt) { 1317 xprt = st->st_xprt; 1318 st->st_xprt = NULL; 1319 SVC_RELEASE(xprt); 1320 } 1321 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1322 mtx_destroy(&st->st_lock); 1323 cv_destroy(&st->st_cond); 1324 mem_free(st, sizeof(*st)); 1325 1326 grp->sg_threadcount--; 1327 if (!ismaster) 1328 wakeup(grp); 1329 mtx_unlock(&grp->sg_lock); 1330 } 1331 1332 static void 1333 svc_thread_start(void *arg) 1334 { 1335 1336 svc_run_internal((SVCGROUP *) arg, FALSE); 1337 kthread_exit(); 1338 } 1339 1340 static void 1341 svc_new_thread(SVCGROUP *grp) 1342 { 1343 SVCPOOL *pool = grp->sg_pool; 1344 struct thread *td; 1345 1346 mtx_lock(&grp->sg_lock); 1347 grp->sg_threadcount++; 1348 mtx_unlock(&grp->sg_lock); 1349 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1350 "%s: service", pool->sp_name); 1351 } 1352 1353 void 1354 svc_run(SVCPOOL *pool) 1355 { 1356 int g, i; 1357 struct proc *p; 1358 struct thread *td; 1359 SVCGROUP *grp; 1360 1361 p = curproc; 1362 td = curthread; 1363 snprintf(td->td_name, sizeof(td->td_name), 1364 "%s: master", pool->sp_name); 1365 pool->sp_state = SVCPOOL_ACTIVE; 1366 pool->sp_proc = p; 1367 1368 /* Choose group count based on number of threads and CPUs. */ 1369 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1370 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1371 for (g = 0; g < pool->sp_groupcount; g++) { 1372 grp = &pool->sp_groups[g]; 1373 grp->sg_minthreads = max(1, 1374 pool->sp_minthreads / pool->sp_groupcount); 1375 grp->sg_maxthreads = max(1, 1376 pool->sp_maxthreads / pool->sp_groupcount); 1377 grp->sg_lastcreatetime = time_uptime; 1378 } 1379 1380 /* Starting threads */ 1381 pool->sp_groups[0].sg_threadcount++; 1382 for (g = 0; g < pool->sp_groupcount; g++) { 1383 grp = &pool->sp_groups[g]; 1384 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1385 svc_new_thread(grp); 1386 } 1387 svc_run_internal(&pool->sp_groups[0], TRUE); 1388 1389 /* Waiting for threads to stop. */ 1390 for (g = 0; g < pool->sp_groupcount; g++) { 1391 grp = &pool->sp_groups[g]; 1392 mtx_lock(&grp->sg_lock); 1393 while (grp->sg_threadcount > 0) 1394 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1395 mtx_unlock(&grp->sg_lock); 1396 } 1397 } 1398 1399 void 1400 svc_exit(SVCPOOL *pool) 1401 { 1402 SVCGROUP *grp; 1403 SVCTHREAD *st; 1404 int g; 1405 1406 pool->sp_state = SVCPOOL_CLOSING; 1407 for (g = 0; g < pool->sp_groupcount; g++) { 1408 grp = &pool->sp_groups[g]; 1409 mtx_lock(&grp->sg_lock); 1410 if (grp->sg_state != SVCPOOL_CLOSING) { 1411 grp->sg_state = SVCPOOL_CLOSING; 1412 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1413 cv_signal(&st->st_cond); 1414 } 1415 mtx_unlock(&grp->sg_lock); 1416 } 1417 } 1418 1419 bool_t 1420 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1421 { 1422 struct mbuf *m; 1423 XDR xdrs; 1424 bool_t stat; 1425 1426 m = rqstp->rq_args; 1427 rqstp->rq_args = NULL; 1428 1429 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1430 stat = xargs(&xdrs, args); 1431 XDR_DESTROY(&xdrs); 1432 1433 return (stat); 1434 } 1435 1436 bool_t 1437 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1438 { 1439 XDR xdrs; 1440 1441 if (rqstp->rq_addr) { 1442 free(rqstp->rq_addr, M_SONAME); 1443 rqstp->rq_addr = NULL; 1444 } 1445 1446 xdrs.x_op = XDR_FREE; 1447 return (xargs(&xdrs, args)); 1448 } 1449 1450 void 1451 svc_freereq(struct svc_req *rqstp) 1452 { 1453 SVCTHREAD *st; 1454 SVCPOOL *pool; 1455 1456 st = rqstp->rq_thread; 1457 if (st) { 1458 pool = st->st_pool; 1459 if (pool->sp_done) 1460 pool->sp_done(st, rqstp); 1461 } 1462 1463 if (rqstp->rq_auth.svc_ah_ops) 1464 SVCAUTH_RELEASE(&rqstp->rq_auth); 1465 1466 if (rqstp->rq_xprt) { 1467 SVC_RELEASE(rqstp->rq_xprt); 1468 } 1469 1470 if (rqstp->rq_addr) 1471 free(rqstp->rq_addr, M_SONAME); 1472 1473 if (rqstp->rq_args) 1474 m_freem(rqstp->rq_args); 1475 1476 free(rqstp, M_RPC); 1477 } 1478