1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 /* 39 * svc.c, Server-side remote procedure call interface. 40 * 41 * There are two sets of procedures here. The xprt routines are 42 * for handling transport handles. The svc routines handle the 43 * list of service routines. 44 * 45 * Copyright (C) 1984, Sun Microsystems, Inc. 46 */ 47 48 #include <sys/param.h> 49 #include <sys/jail.h> 50 #include <sys/lock.h> 51 #include <sys/kernel.h> 52 #include <sys/kthread.h> 53 #include <sys/malloc.h> 54 #include <sys/mbuf.h> 55 #include <sys/mutex.h> 56 #include <sys/proc.h> 57 #include <sys/queue.h> 58 #include <sys/socketvar.h> 59 #include <sys/systm.h> 60 #include <sys/smp.h> 61 #include <sys/sx.h> 62 #include <sys/ucred.h> 63 64 #include <rpc/rpc.h> 65 #include <rpc/rpcb_clnt.h> 66 #include <rpc/replay.h> 67 68 #include <rpc/rpc_com.h> 69 70 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 71 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 72 73 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 74 char *); 75 static void svc_new_thread(SVCGROUP *grp); 76 static void xprt_unregister_locked(SVCXPRT *xprt); 77 static void svc_change_space_used(SVCPOOL *pool, long delta); 78 static bool_t svc_request_space_available(SVCPOOL *pool); 79 static void svcpool_cleanup(SVCPOOL *pool); 80 81 /* *************** SVCXPRT related stuff **************** */ 82 83 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 84 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 85 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 86 87 SVCPOOL* 88 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 89 { 90 SVCPOOL *pool; 91 SVCGROUP *grp; 92 int g; 93 94 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 95 96 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 97 pool->sp_name = name; 98 pool->sp_state = SVCPOOL_INIT; 99 pool->sp_proc = NULL; 100 TAILQ_INIT(&pool->sp_callouts); 101 TAILQ_INIT(&pool->sp_lcallouts); 102 pool->sp_minthreads = 1; 103 pool->sp_maxthreads = 1; 104 pool->sp_groupcount = 1; 105 for (g = 0; g < SVC_MAXGROUPS; g++) { 106 grp = &pool->sp_groups[g]; 107 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 108 grp->sg_pool = pool; 109 grp->sg_state = SVCPOOL_ACTIVE; 110 TAILQ_INIT(&grp->sg_xlist); 111 TAILQ_INIT(&grp->sg_active); 112 LIST_INIT(&grp->sg_idlethreads); 113 grp->sg_minthreads = 1; 114 grp->sg_maxthreads = 1; 115 } 116 117 /* 118 * Don't use more than a quarter of mbuf clusters. Nota bene: 119 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 120 * on LP64 architectures, so cast to u_long to avoid undefined 121 * behavior. (ILP32 architectures cannot have nmbclusters 122 * large enough to overflow for other reasons.) 123 */ 124 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 125 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 126 127 sysctl_ctx_init(&pool->sp_sysctl); 128 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) { 129 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 130 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 131 pool, 0, svcpool_minthread_sysctl, "I", 132 "Minimal number of threads"); 133 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 134 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 135 pool, 0, svcpool_maxthread_sysctl, "I", 136 "Maximal number of threads"); 137 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 138 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 139 pool, 0, svcpool_threads_sysctl, "I", 140 "Current number of threads"); 141 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 142 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 143 "Number of thread groups"); 144 145 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 146 "request_space_used", CTLFLAG_RD, 147 &pool->sp_space_used, 148 "Space in parsed but not handled requests."); 149 150 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 151 "request_space_used_highest", CTLFLAG_RD, 152 &pool->sp_space_used_highest, 153 "Highest space used since reboot."); 154 155 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 156 "request_space_high", CTLFLAG_RW, 157 &pool->sp_space_high, 158 "Maximum space in parsed but not handled requests."); 159 160 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 161 "request_space_low", CTLFLAG_RW, 162 &pool->sp_space_low, 163 "Low water mark for request space."); 164 165 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 166 "request_space_throttled", CTLFLAG_RD, 167 &pool->sp_space_throttled, 0, 168 "Whether nfs requests are currently throttled"); 169 170 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 171 "request_space_throttle_count", CTLFLAG_RD, 172 &pool->sp_space_throttle_count, 0, 173 "Count of times throttling based on request space has occurred"); 174 } 175 176 return pool; 177 } 178 179 /* 180 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 181 * the pool data structures. 182 */ 183 static void 184 svcpool_cleanup(SVCPOOL *pool) 185 { 186 SVCGROUP *grp; 187 SVCXPRT *xprt, *nxprt; 188 struct svc_callout *s; 189 struct svc_loss_callout *sl; 190 struct svcxprt_list cleanup; 191 int g; 192 193 TAILQ_INIT(&cleanup); 194 195 for (g = 0; g < SVC_MAXGROUPS; g++) { 196 grp = &pool->sp_groups[g]; 197 mtx_lock(&grp->sg_lock); 198 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 199 xprt_unregister_locked(xprt); 200 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 201 } 202 mtx_unlock(&grp->sg_lock); 203 } 204 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 205 if (xprt->xp_socket != NULL) 206 soshutdown(xprt->xp_socket, SHUT_WR); 207 SVC_RELEASE(xprt); 208 } 209 210 mtx_lock(&pool->sp_lock); 211 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 212 mtx_unlock(&pool->sp_lock); 213 svc_unreg(pool, s->sc_prog, s->sc_vers); 214 mtx_lock(&pool->sp_lock); 215 } 216 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 217 mtx_unlock(&pool->sp_lock); 218 svc_loss_unreg(pool, sl->slc_dispatch); 219 mtx_lock(&pool->sp_lock); 220 } 221 mtx_unlock(&pool->sp_lock); 222 } 223 224 void 225 svcpool_destroy(SVCPOOL *pool) 226 { 227 SVCGROUP *grp; 228 int g; 229 230 svcpool_cleanup(pool); 231 232 for (g = 0; g < SVC_MAXGROUPS; g++) { 233 grp = &pool->sp_groups[g]; 234 mtx_destroy(&grp->sg_lock); 235 } 236 mtx_destroy(&pool->sp_lock); 237 238 if (pool->sp_rcache) 239 replay_freecache(pool->sp_rcache); 240 241 sysctl_ctx_free(&pool->sp_sysctl); 242 free(pool, M_RPC); 243 } 244 245 /* 246 * Similar to svcpool_destroy(), except that it does not destroy the actual 247 * data structures. As such, "pool" may be used again. 248 */ 249 void 250 svcpool_close(SVCPOOL *pool) 251 { 252 SVCGROUP *grp; 253 int g; 254 255 svcpool_cleanup(pool); 256 257 /* Now, initialize the pool's state for a fresh svc_run() call. */ 258 mtx_lock(&pool->sp_lock); 259 pool->sp_state = SVCPOOL_INIT; 260 mtx_unlock(&pool->sp_lock); 261 for (g = 0; g < SVC_MAXGROUPS; g++) { 262 grp = &pool->sp_groups[g]; 263 mtx_lock(&grp->sg_lock); 264 grp->sg_state = SVCPOOL_ACTIVE; 265 mtx_unlock(&grp->sg_lock); 266 } 267 } 268 269 /* 270 * Sysctl handler to get the present thread count on a pool 271 */ 272 static int 273 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 274 { 275 SVCPOOL *pool; 276 int threads, error, g; 277 278 pool = oidp->oid_arg1; 279 threads = 0; 280 mtx_lock(&pool->sp_lock); 281 for (g = 0; g < pool->sp_groupcount; g++) 282 threads += pool->sp_groups[g].sg_threadcount; 283 mtx_unlock(&pool->sp_lock); 284 error = sysctl_handle_int(oidp, &threads, 0, req); 285 return (error); 286 } 287 288 /* 289 * Sysctl handler to set the minimum thread count on a pool 290 */ 291 static int 292 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 293 { 294 SVCPOOL *pool; 295 int newminthreads, error, g; 296 297 pool = oidp->oid_arg1; 298 newminthreads = pool->sp_minthreads; 299 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 300 if (error == 0 && newminthreads != pool->sp_minthreads) { 301 if (newminthreads > pool->sp_maxthreads) 302 return (EINVAL); 303 mtx_lock(&pool->sp_lock); 304 pool->sp_minthreads = newminthreads; 305 for (g = 0; g < pool->sp_groupcount; g++) { 306 pool->sp_groups[g].sg_minthreads = max(1, 307 pool->sp_minthreads / pool->sp_groupcount); 308 } 309 mtx_unlock(&pool->sp_lock); 310 } 311 return (error); 312 } 313 314 /* 315 * Sysctl handler to set the maximum thread count on a pool 316 */ 317 static int 318 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 319 { 320 SVCPOOL *pool; 321 int newmaxthreads, error, g; 322 323 pool = oidp->oid_arg1; 324 newmaxthreads = pool->sp_maxthreads; 325 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 326 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 327 if (newmaxthreads < pool->sp_minthreads) 328 return (EINVAL); 329 mtx_lock(&pool->sp_lock); 330 pool->sp_maxthreads = newmaxthreads; 331 for (g = 0; g < pool->sp_groupcount; g++) { 332 pool->sp_groups[g].sg_maxthreads = max(1, 333 pool->sp_maxthreads / pool->sp_groupcount); 334 } 335 mtx_unlock(&pool->sp_lock); 336 } 337 return (error); 338 } 339 340 /* 341 * Activate a transport handle. 342 */ 343 void 344 xprt_register(SVCXPRT *xprt) 345 { 346 SVCPOOL *pool = xprt->xp_pool; 347 SVCGROUP *grp; 348 int g; 349 350 SVC_ACQUIRE(xprt); 351 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 352 xprt->xp_group = grp = &pool->sp_groups[g]; 353 mtx_lock(&grp->sg_lock); 354 xprt->xp_registered = TRUE; 355 xprt->xp_active = FALSE; 356 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 357 mtx_unlock(&grp->sg_lock); 358 } 359 360 /* 361 * De-activate a transport handle. Note: the locked version doesn't 362 * release the transport - caller must do that after dropping the pool 363 * lock. 364 */ 365 static void 366 xprt_unregister_locked(SVCXPRT *xprt) 367 { 368 SVCGROUP *grp = xprt->xp_group; 369 370 mtx_assert(&grp->sg_lock, MA_OWNED); 371 KASSERT(xprt->xp_registered == TRUE, 372 ("xprt_unregister_locked: not registered")); 373 xprt_inactive_locked(xprt); 374 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 375 xprt->xp_registered = FALSE; 376 } 377 378 void 379 xprt_unregister(SVCXPRT *xprt) 380 { 381 SVCGROUP *grp = xprt->xp_group; 382 383 mtx_lock(&grp->sg_lock); 384 if (xprt->xp_registered == FALSE) { 385 /* Already unregistered by another thread */ 386 mtx_unlock(&grp->sg_lock); 387 return; 388 } 389 xprt_unregister_locked(xprt); 390 mtx_unlock(&grp->sg_lock); 391 392 if (xprt->xp_socket != NULL) 393 soshutdown(xprt->xp_socket, SHUT_WR); 394 SVC_RELEASE(xprt); 395 } 396 397 /* 398 * Attempt to assign a service thread to this transport. 399 */ 400 static int 401 xprt_assignthread(SVCXPRT *xprt) 402 { 403 SVCGROUP *grp = xprt->xp_group; 404 SVCTHREAD *st; 405 406 mtx_assert(&grp->sg_lock, MA_OWNED); 407 st = LIST_FIRST(&grp->sg_idlethreads); 408 if (st) { 409 LIST_REMOVE(st, st_ilink); 410 SVC_ACQUIRE(xprt); 411 xprt->xp_thread = st; 412 st->st_xprt = xprt; 413 cv_signal(&st->st_cond); 414 return (TRUE); 415 } else { 416 /* 417 * See if we can create a new thread. The 418 * actual thread creation happens in 419 * svc_run_internal because our locking state 420 * is poorly defined (we are typically called 421 * from a socket upcall). Don't create more 422 * than one thread per second. 423 */ 424 if (grp->sg_state == SVCPOOL_ACTIVE 425 && grp->sg_lastcreatetime < time_uptime 426 && grp->sg_threadcount < grp->sg_maxthreads) { 427 grp->sg_state = SVCPOOL_THREADWANTED; 428 } 429 } 430 return (FALSE); 431 } 432 433 void 434 xprt_active(SVCXPRT *xprt) 435 { 436 SVCGROUP *grp = xprt->xp_group; 437 438 mtx_lock(&grp->sg_lock); 439 440 if (!xprt->xp_registered) { 441 /* 442 * Race with xprt_unregister - we lose. 443 */ 444 mtx_unlock(&grp->sg_lock); 445 return; 446 } 447 448 if (!xprt->xp_active) { 449 xprt->xp_active = TRUE; 450 if (xprt->xp_thread == NULL) { 451 if (!svc_request_space_available(xprt->xp_pool) || 452 !xprt_assignthread(xprt)) 453 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 454 xp_alink); 455 } 456 } 457 458 mtx_unlock(&grp->sg_lock); 459 } 460 461 void 462 xprt_inactive_locked(SVCXPRT *xprt) 463 { 464 SVCGROUP *grp = xprt->xp_group; 465 466 mtx_assert(&grp->sg_lock, MA_OWNED); 467 if (xprt->xp_active) { 468 if (xprt->xp_thread == NULL) 469 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 470 xprt->xp_active = FALSE; 471 } 472 } 473 474 void 475 xprt_inactive(SVCXPRT *xprt) 476 { 477 SVCGROUP *grp = xprt->xp_group; 478 479 mtx_lock(&grp->sg_lock); 480 xprt_inactive_locked(xprt); 481 mtx_unlock(&grp->sg_lock); 482 } 483 484 /* 485 * Variant of xprt_inactive() for use only when sure that port is 486 * assigned to thread. For example, within receive handlers. 487 */ 488 void 489 xprt_inactive_self(SVCXPRT *xprt) 490 { 491 492 KASSERT(xprt->xp_thread != NULL, 493 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 494 xprt->xp_active = FALSE; 495 } 496 497 /* 498 * Add a service program to the callout list. 499 * The dispatch routine will be called when a rpc request for this 500 * program number comes in. 501 */ 502 bool_t 503 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 504 void (*dispatch)(struct svc_req *, SVCXPRT *), 505 const struct netconfig *nconf) 506 { 507 SVCPOOL *pool = xprt->xp_pool; 508 struct svc_callout *s; 509 char *netid = NULL; 510 int flag = 0; 511 512 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 513 514 if (xprt->xp_netid) { 515 netid = strdup(xprt->xp_netid, M_RPC); 516 flag = 1; 517 } else if (nconf && nconf->nc_netid) { 518 netid = strdup(nconf->nc_netid, M_RPC); 519 flag = 1; 520 } /* must have been created with svc_raw_create */ 521 if ((netid == NULL) && (flag == 1)) { 522 return (FALSE); 523 } 524 525 mtx_lock(&pool->sp_lock); 526 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 527 if (netid) 528 free(netid, M_RPC); 529 if (s->sc_dispatch == dispatch) 530 goto rpcb_it; /* he is registering another xptr */ 531 mtx_unlock(&pool->sp_lock); 532 return (FALSE); 533 } 534 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 535 if (s == NULL) { 536 if (netid) 537 free(netid, M_RPC); 538 mtx_unlock(&pool->sp_lock); 539 return (FALSE); 540 } 541 542 s->sc_prog = prog; 543 s->sc_vers = vers; 544 s->sc_dispatch = dispatch; 545 s->sc_netid = netid; 546 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 547 548 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 549 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 550 551 rpcb_it: 552 mtx_unlock(&pool->sp_lock); 553 /* now register the information with the local binder service */ 554 if (nconf) { 555 bool_t dummy; 556 struct netconfig tnc; 557 struct netbuf nb; 558 tnc = *nconf; 559 nb.buf = &xprt->xp_ltaddr; 560 nb.len = xprt->xp_ltaddr.ss_len; 561 dummy = rpcb_set(prog, vers, &tnc, &nb); 562 return (dummy); 563 } 564 return (TRUE); 565 } 566 567 /* 568 * Remove a service program from the callout list. 569 */ 570 void 571 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 572 { 573 struct svc_callout *s; 574 575 /* unregister the information anyway */ 576 (void) rpcb_unset(prog, vers, NULL); 577 mtx_lock(&pool->sp_lock); 578 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 579 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 580 if (s->sc_netid) 581 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 582 mem_free(s, sizeof (struct svc_callout)); 583 } 584 mtx_unlock(&pool->sp_lock); 585 } 586 587 /* 588 * Add a service connection loss program to the callout list. 589 * The dispatch routine will be called when some port in ths pool die. 590 */ 591 bool_t 592 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 593 { 594 SVCPOOL *pool = xprt->xp_pool; 595 struct svc_loss_callout *s; 596 597 mtx_lock(&pool->sp_lock); 598 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 599 if (s->slc_dispatch == dispatch) 600 break; 601 } 602 if (s != NULL) { 603 mtx_unlock(&pool->sp_lock); 604 return (TRUE); 605 } 606 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 607 if (s == NULL) { 608 mtx_unlock(&pool->sp_lock); 609 return (FALSE); 610 } 611 s->slc_dispatch = dispatch; 612 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 613 mtx_unlock(&pool->sp_lock); 614 return (TRUE); 615 } 616 617 /* 618 * Remove a service connection loss program from the callout list. 619 */ 620 void 621 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 622 { 623 struct svc_loss_callout *s; 624 625 mtx_lock(&pool->sp_lock); 626 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 627 if (s->slc_dispatch == dispatch) { 628 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 629 free(s, M_RPC); 630 break; 631 } 632 } 633 mtx_unlock(&pool->sp_lock); 634 } 635 636 /* ********************** CALLOUT list related stuff ************* */ 637 638 /* 639 * Search the callout list for a program number, return the callout 640 * struct. 641 */ 642 static struct svc_callout * 643 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 644 { 645 struct svc_callout *s; 646 647 mtx_assert(&pool->sp_lock, MA_OWNED); 648 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 649 if (s->sc_prog == prog && s->sc_vers == vers 650 && (netid == NULL || s->sc_netid == NULL || 651 strcmp(netid, s->sc_netid) == 0)) 652 break; 653 } 654 655 return (s); 656 } 657 658 /* ******************* REPLY GENERATION ROUTINES ************ */ 659 660 static bool_t 661 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 662 struct mbuf *body) 663 { 664 SVCXPRT *xprt = rqstp->rq_xprt; 665 bool_t ok; 666 667 if (rqstp->rq_args) { 668 m_freem(rqstp->rq_args); 669 rqstp->rq_args = NULL; 670 } 671 672 if (xprt->xp_pool->sp_rcache) 673 replay_setreply(xprt->xp_pool->sp_rcache, 674 rply, svc_getrpccaller(rqstp), body); 675 676 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 677 return (FALSE); 678 679 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 680 if (rqstp->rq_addr) { 681 free(rqstp->rq_addr, M_SONAME); 682 rqstp->rq_addr = NULL; 683 } 684 685 return (ok); 686 } 687 688 /* 689 * Send a reply to an rpc request 690 */ 691 bool_t 692 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 693 { 694 struct rpc_msg rply; 695 struct mbuf *m; 696 XDR xdrs; 697 bool_t ok; 698 699 rply.rm_xid = rqstp->rq_xid; 700 rply.rm_direction = REPLY; 701 rply.rm_reply.rp_stat = MSG_ACCEPTED; 702 rply.acpted_rply.ar_verf = rqstp->rq_verf; 703 rply.acpted_rply.ar_stat = SUCCESS; 704 rply.acpted_rply.ar_results.where = NULL; 705 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 706 707 m = m_getcl(M_WAITOK, MT_DATA, 0); 708 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 709 ok = xdr_results(&xdrs, xdr_location); 710 XDR_DESTROY(&xdrs); 711 712 if (ok) { 713 return (svc_sendreply_common(rqstp, &rply, m)); 714 } else { 715 m_freem(m); 716 return (FALSE); 717 } 718 } 719 720 bool_t 721 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 722 { 723 struct rpc_msg rply; 724 725 rply.rm_xid = rqstp->rq_xid; 726 rply.rm_direction = REPLY; 727 rply.rm_reply.rp_stat = MSG_ACCEPTED; 728 rply.acpted_rply.ar_verf = rqstp->rq_verf; 729 rply.acpted_rply.ar_stat = SUCCESS; 730 rply.acpted_rply.ar_results.where = NULL; 731 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 732 733 return (svc_sendreply_common(rqstp, &rply, m)); 734 } 735 736 /* 737 * No procedure error reply 738 */ 739 void 740 svcerr_noproc(struct svc_req *rqstp) 741 { 742 SVCXPRT *xprt = rqstp->rq_xprt; 743 struct rpc_msg rply; 744 745 rply.rm_xid = rqstp->rq_xid; 746 rply.rm_direction = REPLY; 747 rply.rm_reply.rp_stat = MSG_ACCEPTED; 748 rply.acpted_rply.ar_verf = rqstp->rq_verf; 749 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 750 751 if (xprt->xp_pool->sp_rcache) 752 replay_setreply(xprt->xp_pool->sp_rcache, 753 &rply, svc_getrpccaller(rqstp), NULL); 754 755 svc_sendreply_common(rqstp, &rply, NULL); 756 } 757 758 /* 759 * Can't decode args error reply 760 */ 761 void 762 svcerr_decode(struct svc_req *rqstp) 763 { 764 SVCXPRT *xprt = rqstp->rq_xprt; 765 struct rpc_msg rply; 766 767 rply.rm_xid = rqstp->rq_xid; 768 rply.rm_direction = REPLY; 769 rply.rm_reply.rp_stat = MSG_ACCEPTED; 770 rply.acpted_rply.ar_verf = rqstp->rq_verf; 771 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 772 773 if (xprt->xp_pool->sp_rcache) 774 replay_setreply(xprt->xp_pool->sp_rcache, 775 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 776 777 svc_sendreply_common(rqstp, &rply, NULL); 778 } 779 780 /* 781 * Some system error 782 */ 783 void 784 svcerr_systemerr(struct svc_req *rqstp) 785 { 786 SVCXPRT *xprt = rqstp->rq_xprt; 787 struct rpc_msg rply; 788 789 rply.rm_xid = rqstp->rq_xid; 790 rply.rm_direction = REPLY; 791 rply.rm_reply.rp_stat = MSG_ACCEPTED; 792 rply.acpted_rply.ar_verf = rqstp->rq_verf; 793 rply.acpted_rply.ar_stat = SYSTEM_ERR; 794 795 if (xprt->xp_pool->sp_rcache) 796 replay_setreply(xprt->xp_pool->sp_rcache, 797 &rply, svc_getrpccaller(rqstp), NULL); 798 799 svc_sendreply_common(rqstp, &rply, NULL); 800 } 801 802 /* 803 * Authentication error reply 804 */ 805 void 806 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 807 { 808 SVCXPRT *xprt = rqstp->rq_xprt; 809 struct rpc_msg rply; 810 811 rply.rm_xid = rqstp->rq_xid; 812 rply.rm_direction = REPLY; 813 rply.rm_reply.rp_stat = MSG_DENIED; 814 rply.rjcted_rply.rj_stat = AUTH_ERROR; 815 rply.rjcted_rply.rj_why = why; 816 817 if (xprt->xp_pool->sp_rcache) 818 replay_setreply(xprt->xp_pool->sp_rcache, 819 &rply, svc_getrpccaller(rqstp), NULL); 820 821 svc_sendreply_common(rqstp, &rply, NULL); 822 } 823 824 /* 825 * Auth too weak error reply 826 */ 827 void 828 svcerr_weakauth(struct svc_req *rqstp) 829 { 830 831 svcerr_auth(rqstp, AUTH_TOOWEAK); 832 } 833 834 /* 835 * Program unavailable error reply 836 */ 837 void 838 svcerr_noprog(struct svc_req *rqstp) 839 { 840 SVCXPRT *xprt = rqstp->rq_xprt; 841 struct rpc_msg rply; 842 843 rply.rm_xid = rqstp->rq_xid; 844 rply.rm_direction = REPLY; 845 rply.rm_reply.rp_stat = MSG_ACCEPTED; 846 rply.acpted_rply.ar_verf = rqstp->rq_verf; 847 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 848 849 if (xprt->xp_pool->sp_rcache) 850 replay_setreply(xprt->xp_pool->sp_rcache, 851 &rply, svc_getrpccaller(rqstp), NULL); 852 853 svc_sendreply_common(rqstp, &rply, NULL); 854 } 855 856 /* 857 * Program version mismatch error reply 858 */ 859 void 860 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 861 { 862 SVCXPRT *xprt = rqstp->rq_xprt; 863 struct rpc_msg rply; 864 865 rply.rm_xid = rqstp->rq_xid; 866 rply.rm_direction = REPLY; 867 rply.rm_reply.rp_stat = MSG_ACCEPTED; 868 rply.acpted_rply.ar_verf = rqstp->rq_verf; 869 rply.acpted_rply.ar_stat = PROG_MISMATCH; 870 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 871 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 872 873 if (xprt->xp_pool->sp_rcache) 874 replay_setreply(xprt->xp_pool->sp_rcache, 875 &rply, svc_getrpccaller(rqstp), NULL); 876 877 svc_sendreply_common(rqstp, &rply, NULL); 878 } 879 880 /* 881 * Allocate a new server transport structure. All fields are 882 * initialized to zero and xp_p3 is initialized to point at an 883 * extension structure to hold various flags and authentication 884 * parameters. 885 */ 886 SVCXPRT * 887 svc_xprt_alloc(void) 888 { 889 SVCXPRT *xprt; 890 SVCXPRT_EXT *ext; 891 892 xprt = mem_alloc(sizeof(SVCXPRT)); 893 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 894 xprt->xp_p3 = ext; 895 refcount_init(&xprt->xp_refs, 1); 896 897 return (xprt); 898 } 899 900 /* 901 * Free a server transport structure. 902 */ 903 void 904 svc_xprt_free(SVCXPRT *xprt) 905 { 906 907 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 908 /* The size argument is ignored, so 0 is ok. */ 909 mem_free(xprt->xp_gidp, 0); 910 mem_free(xprt, sizeof(SVCXPRT)); 911 } 912 913 /* ******************* SERVER INPUT STUFF ******************* */ 914 915 /* 916 * Read RPC requests from a transport and queue them to be 917 * executed. We handle authentication and replay cache replies here. 918 * Actually dispatching the RPC is deferred till svc_executereq. 919 */ 920 static enum xprt_stat 921 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 922 { 923 SVCPOOL *pool = xprt->xp_pool; 924 struct svc_req *r; 925 struct rpc_msg msg; 926 struct mbuf *args; 927 struct svc_loss_callout *s; 928 enum xprt_stat stat; 929 930 /* now receive msgs from xprtprt (support batch calls) */ 931 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 932 933 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 934 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 935 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 936 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 937 enum auth_stat why; 938 939 /* 940 * Handle replays and authenticate before queuing the 941 * request to be executed. 942 */ 943 SVC_ACQUIRE(xprt); 944 r->rq_xprt = xprt; 945 if (pool->sp_rcache) { 946 struct rpc_msg repmsg; 947 struct mbuf *repbody; 948 enum replay_state rs; 949 rs = replay_find(pool->sp_rcache, &msg, 950 svc_getrpccaller(r), &repmsg, &repbody); 951 switch (rs) { 952 case RS_NEW: 953 break; 954 case RS_DONE: 955 SVC_REPLY(xprt, &repmsg, r->rq_addr, 956 repbody, &r->rq_reply_seq); 957 if (r->rq_addr) { 958 free(r->rq_addr, M_SONAME); 959 r->rq_addr = NULL; 960 } 961 m_freem(args); 962 goto call_done; 963 964 default: 965 m_freem(args); 966 goto call_done; 967 } 968 } 969 970 r->rq_xid = msg.rm_xid; 971 r->rq_prog = msg.rm_call.cb_prog; 972 r->rq_vers = msg.rm_call.cb_vers; 973 r->rq_proc = msg.rm_call.cb_proc; 974 r->rq_size = sizeof(*r) + m_length(args, NULL); 975 r->rq_args = args; 976 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 977 /* 978 * RPCSEC_GSS uses this return code 979 * for requests that form part of its 980 * context establishment protocol and 981 * should not be dispatched to the 982 * application. 983 */ 984 if (why != RPCSEC_GSS_NODISPATCH) 985 svcerr_auth(r, why); 986 goto call_done; 987 } 988 989 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 990 svcerr_decode(r); 991 goto call_done; 992 } 993 994 /* 995 * Everything checks out, return request to caller. 996 */ 997 *rqstp_ret = r; 998 r = NULL; 999 } 1000 call_done: 1001 if (r) { 1002 svc_freereq(r); 1003 r = NULL; 1004 } 1005 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1006 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1007 (*s->slc_dispatch)(xprt); 1008 xprt_unregister(xprt); 1009 } 1010 1011 return (stat); 1012 } 1013 1014 static void 1015 svc_executereq(struct svc_req *rqstp) 1016 { 1017 SVCXPRT *xprt = rqstp->rq_xprt; 1018 SVCPOOL *pool = xprt->xp_pool; 1019 int prog_found; 1020 rpcvers_t low_vers; 1021 rpcvers_t high_vers; 1022 struct svc_callout *s; 1023 1024 /* now match message with a registered service*/ 1025 prog_found = FALSE; 1026 low_vers = (rpcvers_t) -1L; 1027 high_vers = (rpcvers_t) 0L; 1028 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1029 if (s->sc_prog == rqstp->rq_prog) { 1030 if (s->sc_vers == rqstp->rq_vers) { 1031 /* 1032 * We hand ownership of r to the 1033 * dispatch method - they must call 1034 * svc_freereq. 1035 */ 1036 (*s->sc_dispatch)(rqstp, xprt); 1037 return; 1038 } /* found correct version */ 1039 prog_found = TRUE; 1040 if (s->sc_vers < low_vers) 1041 low_vers = s->sc_vers; 1042 if (s->sc_vers > high_vers) 1043 high_vers = s->sc_vers; 1044 } /* found correct program */ 1045 } 1046 1047 /* 1048 * if we got here, the program or version 1049 * is not served ... 1050 */ 1051 if (prog_found) 1052 svcerr_progvers(rqstp, low_vers, high_vers); 1053 else 1054 svcerr_noprog(rqstp); 1055 1056 svc_freereq(rqstp); 1057 } 1058 1059 static void 1060 svc_checkidle(SVCGROUP *grp) 1061 { 1062 SVCXPRT *xprt, *nxprt; 1063 time_t timo; 1064 struct svcxprt_list cleanup; 1065 1066 TAILQ_INIT(&cleanup); 1067 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1068 /* 1069 * Only some transports have idle timers. Don't time 1070 * something out which is just waking up. 1071 */ 1072 if (!xprt->xp_idletimeout || xprt->xp_thread) 1073 continue; 1074 1075 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1076 if (time_uptime > timo) { 1077 xprt_unregister_locked(xprt); 1078 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1079 } 1080 } 1081 1082 mtx_unlock(&grp->sg_lock); 1083 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1084 soshutdown(xprt->xp_socket, SHUT_WR); 1085 SVC_RELEASE(xprt); 1086 } 1087 mtx_lock(&grp->sg_lock); 1088 } 1089 1090 static void 1091 svc_assign_waiting_sockets(SVCPOOL *pool) 1092 { 1093 SVCGROUP *grp; 1094 SVCXPRT *xprt; 1095 int g; 1096 1097 for (g = 0; g < pool->sp_groupcount; g++) { 1098 grp = &pool->sp_groups[g]; 1099 mtx_lock(&grp->sg_lock); 1100 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1101 if (xprt_assignthread(xprt)) 1102 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1103 else 1104 break; 1105 } 1106 mtx_unlock(&grp->sg_lock); 1107 } 1108 } 1109 1110 static void 1111 svc_change_space_used(SVCPOOL *pool, long delta) 1112 { 1113 unsigned long value; 1114 1115 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1116 if (delta > 0) { 1117 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1118 pool->sp_space_throttled = TRUE; 1119 pool->sp_space_throttle_count++; 1120 } 1121 if (value > pool->sp_space_used_highest) 1122 pool->sp_space_used_highest = value; 1123 } else { 1124 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1125 pool->sp_space_throttled = FALSE; 1126 svc_assign_waiting_sockets(pool); 1127 } 1128 } 1129 } 1130 1131 static bool_t 1132 svc_request_space_available(SVCPOOL *pool) 1133 { 1134 1135 if (pool->sp_space_throttled) 1136 return (FALSE); 1137 return (TRUE); 1138 } 1139 1140 static void 1141 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1142 { 1143 SVCPOOL *pool = grp->sg_pool; 1144 SVCTHREAD *st, *stpref; 1145 SVCXPRT *xprt; 1146 enum xprt_stat stat; 1147 struct svc_req *rqstp; 1148 struct proc *p; 1149 long sz; 1150 int error; 1151 1152 st = mem_alloc(sizeof(*st)); 1153 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1154 st->st_pool = pool; 1155 st->st_xprt = NULL; 1156 STAILQ_INIT(&st->st_reqs); 1157 cv_init(&st->st_cond, "rpcsvc"); 1158 1159 mtx_lock(&grp->sg_lock); 1160 1161 /* 1162 * If we are a new thread which was spawned to cope with 1163 * increased load, set the state back to SVCPOOL_ACTIVE. 1164 */ 1165 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1166 grp->sg_state = SVCPOOL_ACTIVE; 1167 1168 while (grp->sg_state != SVCPOOL_CLOSING) { 1169 /* 1170 * Create new thread if requested. 1171 */ 1172 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1173 grp->sg_state = SVCPOOL_THREADSTARTING; 1174 grp->sg_lastcreatetime = time_uptime; 1175 mtx_unlock(&grp->sg_lock); 1176 svc_new_thread(grp); 1177 mtx_lock(&grp->sg_lock); 1178 continue; 1179 } 1180 1181 /* 1182 * Check for idle transports once per second. 1183 */ 1184 if (time_uptime > grp->sg_lastidlecheck) { 1185 grp->sg_lastidlecheck = time_uptime; 1186 svc_checkidle(grp); 1187 } 1188 1189 xprt = st->st_xprt; 1190 if (!xprt) { 1191 /* 1192 * Enforce maxthreads count. 1193 */ 1194 if (!ismaster && grp->sg_threadcount > 1195 grp->sg_maxthreads) 1196 break; 1197 1198 /* 1199 * Before sleeping, see if we can find an 1200 * active transport which isn't being serviced 1201 * by a thread. 1202 */ 1203 if (svc_request_space_available(pool) && 1204 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1205 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1206 SVC_ACQUIRE(xprt); 1207 xprt->xp_thread = st; 1208 st->st_xprt = xprt; 1209 continue; 1210 } 1211 1212 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1213 if (ismaster || (!ismaster && 1214 grp->sg_threadcount > grp->sg_minthreads)) 1215 error = cv_timedwait_sig(&st->st_cond, 1216 &grp->sg_lock, 5 * hz); 1217 else 1218 error = cv_wait_sig(&st->st_cond, 1219 &grp->sg_lock); 1220 if (st->st_xprt == NULL) 1221 LIST_REMOVE(st, st_ilink); 1222 1223 /* 1224 * Reduce worker thread count when idle. 1225 */ 1226 if (error == EWOULDBLOCK) { 1227 if (!ismaster 1228 && (grp->sg_threadcount 1229 > grp->sg_minthreads) 1230 && !st->st_xprt) 1231 break; 1232 } else if (error != 0) { 1233 KASSERT(error == EINTR || error == ERESTART, 1234 ("non-signal error %d", error)); 1235 mtx_unlock(&grp->sg_lock); 1236 p = curproc; 1237 PROC_LOCK(p); 1238 if (P_SHOULDSTOP(p) || 1239 (p->p_flag & P_TOTAL_STOP) != 0) { 1240 thread_suspend_check(0); 1241 PROC_UNLOCK(p); 1242 mtx_lock(&grp->sg_lock); 1243 } else { 1244 PROC_UNLOCK(p); 1245 svc_exit(pool); 1246 mtx_lock(&grp->sg_lock); 1247 break; 1248 } 1249 } 1250 continue; 1251 } 1252 mtx_unlock(&grp->sg_lock); 1253 1254 /* 1255 * Drain the transport socket and queue up any RPCs. 1256 */ 1257 xprt->xp_lastactive = time_uptime; 1258 do { 1259 if (!svc_request_space_available(pool)) 1260 break; 1261 rqstp = NULL; 1262 stat = svc_getreq(xprt, &rqstp); 1263 if (rqstp) { 1264 svc_change_space_used(pool, rqstp->rq_size); 1265 /* 1266 * See if the application has a preference 1267 * for some other thread. 1268 */ 1269 if (pool->sp_assign) { 1270 stpref = pool->sp_assign(st, rqstp); 1271 rqstp->rq_thread = stpref; 1272 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1273 rqstp, rq_link); 1274 mtx_unlock(&stpref->st_lock); 1275 if (stpref != st) 1276 rqstp = NULL; 1277 } else { 1278 rqstp->rq_thread = st; 1279 STAILQ_INSERT_TAIL(&st->st_reqs, 1280 rqstp, rq_link); 1281 } 1282 } 1283 } while (rqstp == NULL && stat == XPRT_MOREREQS 1284 && grp->sg_state != SVCPOOL_CLOSING); 1285 1286 /* 1287 * Move this transport to the end of the active list to 1288 * ensure fairness when multiple transports are active. 1289 * If this was the last queued request, svc_getreq will end 1290 * up calling xprt_inactive to remove from the active list. 1291 */ 1292 mtx_lock(&grp->sg_lock); 1293 xprt->xp_thread = NULL; 1294 st->st_xprt = NULL; 1295 if (xprt->xp_active) { 1296 if (!svc_request_space_available(pool) || 1297 !xprt_assignthread(xprt)) 1298 TAILQ_INSERT_TAIL(&grp->sg_active, 1299 xprt, xp_alink); 1300 } 1301 mtx_unlock(&grp->sg_lock); 1302 SVC_RELEASE(xprt); 1303 1304 /* 1305 * Execute what we have queued. 1306 */ 1307 mtx_lock(&st->st_lock); 1308 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1309 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1310 mtx_unlock(&st->st_lock); 1311 sz = (long)rqstp->rq_size; 1312 svc_executereq(rqstp); 1313 svc_change_space_used(pool, -sz); 1314 mtx_lock(&st->st_lock); 1315 } 1316 mtx_unlock(&st->st_lock); 1317 mtx_lock(&grp->sg_lock); 1318 } 1319 1320 if (st->st_xprt) { 1321 xprt = st->st_xprt; 1322 st->st_xprt = NULL; 1323 SVC_RELEASE(xprt); 1324 } 1325 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1326 mtx_destroy(&st->st_lock); 1327 cv_destroy(&st->st_cond); 1328 mem_free(st, sizeof(*st)); 1329 1330 grp->sg_threadcount--; 1331 if (!ismaster) 1332 wakeup(grp); 1333 mtx_unlock(&grp->sg_lock); 1334 } 1335 1336 static void 1337 svc_thread_start(void *arg) 1338 { 1339 1340 svc_run_internal((SVCGROUP *) arg, FALSE); 1341 kthread_exit(); 1342 } 1343 1344 static void 1345 svc_new_thread(SVCGROUP *grp) 1346 { 1347 SVCPOOL *pool = grp->sg_pool; 1348 struct thread *td; 1349 1350 mtx_lock(&grp->sg_lock); 1351 grp->sg_threadcount++; 1352 mtx_unlock(&grp->sg_lock); 1353 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1354 "%s: service", pool->sp_name); 1355 } 1356 1357 void 1358 svc_run(SVCPOOL *pool) 1359 { 1360 int g, i; 1361 struct proc *p; 1362 struct thread *td; 1363 SVCGROUP *grp; 1364 1365 p = curproc; 1366 td = curthread; 1367 snprintf(td->td_name, sizeof(td->td_name), 1368 "%s: master", pool->sp_name); 1369 pool->sp_state = SVCPOOL_ACTIVE; 1370 pool->sp_proc = p; 1371 1372 /* Choose group count based on number of threads and CPUs. */ 1373 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1374 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1375 for (g = 0; g < pool->sp_groupcount; g++) { 1376 grp = &pool->sp_groups[g]; 1377 grp->sg_minthreads = max(1, 1378 pool->sp_minthreads / pool->sp_groupcount); 1379 grp->sg_maxthreads = max(1, 1380 pool->sp_maxthreads / pool->sp_groupcount); 1381 grp->sg_lastcreatetime = time_uptime; 1382 } 1383 1384 /* Starting threads */ 1385 pool->sp_groups[0].sg_threadcount++; 1386 for (g = 0; g < pool->sp_groupcount; g++) { 1387 grp = &pool->sp_groups[g]; 1388 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1389 svc_new_thread(grp); 1390 } 1391 svc_run_internal(&pool->sp_groups[0], TRUE); 1392 1393 /* Waiting for threads to stop. */ 1394 for (g = 0; g < pool->sp_groupcount; g++) { 1395 grp = &pool->sp_groups[g]; 1396 mtx_lock(&grp->sg_lock); 1397 while (grp->sg_threadcount > 0) 1398 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1399 mtx_unlock(&grp->sg_lock); 1400 } 1401 } 1402 1403 void 1404 svc_exit(SVCPOOL *pool) 1405 { 1406 SVCGROUP *grp; 1407 SVCTHREAD *st; 1408 int g; 1409 1410 pool->sp_state = SVCPOOL_CLOSING; 1411 for (g = 0; g < pool->sp_groupcount; g++) { 1412 grp = &pool->sp_groups[g]; 1413 mtx_lock(&grp->sg_lock); 1414 if (grp->sg_state != SVCPOOL_CLOSING) { 1415 grp->sg_state = SVCPOOL_CLOSING; 1416 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1417 cv_signal(&st->st_cond); 1418 } 1419 mtx_unlock(&grp->sg_lock); 1420 } 1421 } 1422 1423 bool_t 1424 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1425 { 1426 struct mbuf *m; 1427 XDR xdrs; 1428 bool_t stat; 1429 1430 m = rqstp->rq_args; 1431 rqstp->rq_args = NULL; 1432 1433 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1434 stat = xargs(&xdrs, args); 1435 XDR_DESTROY(&xdrs); 1436 1437 return (stat); 1438 } 1439 1440 bool_t 1441 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1442 { 1443 XDR xdrs; 1444 1445 if (rqstp->rq_addr) { 1446 free(rqstp->rq_addr, M_SONAME); 1447 rqstp->rq_addr = NULL; 1448 } 1449 1450 xdrs.x_op = XDR_FREE; 1451 return (xargs(&xdrs, args)); 1452 } 1453 1454 void 1455 svc_freereq(struct svc_req *rqstp) 1456 { 1457 SVCTHREAD *st; 1458 SVCPOOL *pool; 1459 1460 st = rqstp->rq_thread; 1461 if (st) { 1462 pool = st->st_pool; 1463 if (pool->sp_done) 1464 pool->sp_done(st, rqstp); 1465 } 1466 1467 if (rqstp->rq_auth.svc_ah_ops) 1468 SVCAUTH_RELEASE(&rqstp->rq_auth); 1469 1470 if (rqstp->rq_xprt) { 1471 SVC_RELEASE(rqstp->rq_xprt); 1472 } 1473 1474 if (rqstp->rq_addr) 1475 free(rqstp->rq_addr, M_SONAME); 1476 1477 if (rqstp->rq_args) 1478 m_freem(rqstp->rq_args); 1479 1480 free(rqstp, M_RPC); 1481 } 1482