1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <sys/cdefs.h> 34 /* 35 * svc.c, Server-side remote procedure call interface. 36 * 37 * There are two sets of procedures here. The xprt routines are 38 * for handling transport handles. The svc routines handle the 39 * list of service routines. 40 * 41 * Copyright (C) 1984, Sun Microsystems, Inc. 42 */ 43 44 #include <sys/param.h> 45 #include <sys/jail.h> 46 #include <sys/lock.h> 47 #include <sys/kernel.h> 48 #include <sys/kthread.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/proc.h> 53 #include <sys/protosw.h> 54 #include <sys/queue.h> 55 #include <sys/socketvar.h> 56 #include <sys/systm.h> 57 #include <sys/smp.h> 58 #include <sys/sx.h> 59 #include <sys/ucred.h> 60 61 #include <netinet/tcp.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpcb_clnt.h> 65 #include <rpc/replay.h> 66 67 #include <rpc/rpc_com.h> 68 69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 71 72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 73 char *); 74 static void svc_new_thread(SVCGROUP *grp); 75 static void xprt_unregister_locked(SVCXPRT *xprt); 76 static void svc_change_space_used(SVCPOOL *pool, long delta); 77 static bool_t svc_request_space_available(SVCPOOL *pool); 78 static void svcpool_cleanup(SVCPOOL *pool); 79 80 /* *************** SVCXPRT related stuff **************** */ 81 82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 85 86 SVCPOOL* 87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 88 { 89 SVCPOOL *pool; 90 SVCGROUP *grp; 91 int g; 92 93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 94 95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 96 pool->sp_name = name; 97 pool->sp_state = SVCPOOL_INIT; 98 pool->sp_proc = NULL; 99 TAILQ_INIT(&pool->sp_callouts); 100 TAILQ_INIT(&pool->sp_lcallouts); 101 pool->sp_minthreads = 1; 102 pool->sp_maxthreads = 1; 103 pool->sp_groupcount = 1; 104 for (g = 0; g < SVC_MAXGROUPS; g++) { 105 grp = &pool->sp_groups[g]; 106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 107 grp->sg_pool = pool; 108 grp->sg_state = SVCPOOL_ACTIVE; 109 TAILQ_INIT(&grp->sg_xlist); 110 TAILQ_INIT(&grp->sg_active); 111 LIST_INIT(&grp->sg_idlethreads); 112 grp->sg_minthreads = 1; 113 grp->sg_maxthreads = 1; 114 } 115 116 /* 117 * Don't use more than a quarter of mbuf clusters. Nota bene: 118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 119 * on LP64 architectures, so cast to u_long to avoid undefined 120 * behavior. (ILP32 architectures cannot have nmbclusters 121 * large enough to overflow for other reasons.) 122 */ 123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 124 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 125 126 sysctl_ctx_init(&pool->sp_sysctl); 127 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) { 128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 129 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 130 pool, 0, svcpool_minthread_sysctl, "I", 131 "Minimal number of threads"); 132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 134 pool, 0, svcpool_maxthread_sysctl, "I", 135 "Maximal number of threads"); 136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 137 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 138 pool, 0, svcpool_threads_sysctl, "I", 139 "Current number of threads"); 140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 142 "Number of thread groups"); 143 144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 145 "request_space_used", CTLFLAG_RD, 146 &pool->sp_space_used, 147 "Space in parsed but not handled requests."); 148 149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 150 "request_space_used_highest", CTLFLAG_RD, 151 &pool->sp_space_used_highest, 152 "Highest space used since reboot."); 153 154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 155 "request_space_high", CTLFLAG_RW, 156 &pool->sp_space_high, 157 "Maximum space in parsed but not handled requests."); 158 159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 160 "request_space_low", CTLFLAG_RW, 161 &pool->sp_space_low, 162 "Low water mark for request space."); 163 164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 165 "request_space_throttled", CTLFLAG_RD, 166 &pool->sp_space_throttled, 0, 167 "Whether nfs requests are currently throttled"); 168 169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 170 "request_space_throttle_count", CTLFLAG_RD, 171 &pool->sp_space_throttle_count, 0, 172 "Count of times throttling based on request space has occurred"); 173 } 174 175 return pool; 176 } 177 178 /* 179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 180 * the pool data structures. 181 */ 182 static void 183 svcpool_cleanup(SVCPOOL *pool) 184 { 185 SVCGROUP *grp; 186 SVCXPRT *xprt, *nxprt; 187 struct svc_callout *s; 188 struct svc_loss_callout *sl; 189 struct svcxprt_list cleanup; 190 int g; 191 192 TAILQ_INIT(&cleanup); 193 194 for (g = 0; g < SVC_MAXGROUPS; g++) { 195 grp = &pool->sp_groups[g]; 196 mtx_lock(&grp->sg_lock); 197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 198 xprt_unregister_locked(xprt); 199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 200 } 201 mtx_unlock(&grp->sg_lock); 202 } 203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 204 if (xprt->xp_socket != NULL) 205 soshutdown(xprt->xp_socket, SHUT_WR); 206 SVC_RELEASE(xprt); 207 } 208 209 mtx_lock(&pool->sp_lock); 210 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 211 mtx_unlock(&pool->sp_lock); 212 svc_unreg(pool, s->sc_prog, s->sc_vers); 213 mtx_lock(&pool->sp_lock); 214 } 215 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 216 mtx_unlock(&pool->sp_lock); 217 svc_loss_unreg(pool, sl->slc_dispatch); 218 mtx_lock(&pool->sp_lock); 219 } 220 mtx_unlock(&pool->sp_lock); 221 } 222 223 void 224 svcpool_destroy(SVCPOOL *pool) 225 { 226 SVCGROUP *grp; 227 int g; 228 229 svcpool_cleanup(pool); 230 231 for (g = 0; g < SVC_MAXGROUPS; g++) { 232 grp = &pool->sp_groups[g]; 233 mtx_destroy(&grp->sg_lock); 234 } 235 mtx_destroy(&pool->sp_lock); 236 237 if (pool->sp_rcache) 238 replay_freecache(pool->sp_rcache); 239 240 sysctl_ctx_free(&pool->sp_sysctl); 241 free(pool, M_RPC); 242 } 243 244 /* 245 * Similar to svcpool_destroy(), except that it does not destroy the actual 246 * data structures. As such, "pool" may be used again. 247 */ 248 void 249 svcpool_close(SVCPOOL *pool) 250 { 251 SVCGROUP *grp; 252 int g; 253 254 svcpool_cleanup(pool); 255 256 /* Now, initialize the pool's state for a fresh svc_run() call. */ 257 mtx_lock(&pool->sp_lock); 258 pool->sp_state = SVCPOOL_INIT; 259 mtx_unlock(&pool->sp_lock); 260 for (g = 0; g < SVC_MAXGROUPS; g++) { 261 grp = &pool->sp_groups[g]; 262 mtx_lock(&grp->sg_lock); 263 grp->sg_state = SVCPOOL_ACTIVE; 264 mtx_unlock(&grp->sg_lock); 265 } 266 } 267 268 /* 269 * Sysctl handler to get the present thread count on a pool 270 */ 271 static int 272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 273 { 274 SVCPOOL *pool; 275 int threads, error, g; 276 277 pool = oidp->oid_arg1; 278 threads = 0; 279 mtx_lock(&pool->sp_lock); 280 for (g = 0; g < pool->sp_groupcount; g++) 281 threads += pool->sp_groups[g].sg_threadcount; 282 mtx_unlock(&pool->sp_lock); 283 error = sysctl_handle_int(oidp, &threads, 0, req); 284 return (error); 285 } 286 287 /* 288 * Sysctl handler to set the minimum thread count on a pool 289 */ 290 static int 291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 292 { 293 SVCPOOL *pool; 294 int newminthreads, error, g; 295 296 pool = oidp->oid_arg1; 297 newminthreads = pool->sp_minthreads; 298 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 299 if (error == 0 && newminthreads != pool->sp_minthreads) { 300 if (newminthreads > pool->sp_maxthreads) 301 return (EINVAL); 302 mtx_lock(&pool->sp_lock); 303 pool->sp_minthreads = newminthreads; 304 for (g = 0; g < pool->sp_groupcount; g++) { 305 pool->sp_groups[g].sg_minthreads = max(1, 306 pool->sp_minthreads / pool->sp_groupcount); 307 } 308 mtx_unlock(&pool->sp_lock); 309 } 310 return (error); 311 } 312 313 /* 314 * Sysctl handler to set the maximum thread count on a pool 315 */ 316 static int 317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 318 { 319 SVCPOOL *pool; 320 int newmaxthreads, error, g; 321 322 pool = oidp->oid_arg1; 323 newmaxthreads = pool->sp_maxthreads; 324 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 325 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 326 if (newmaxthreads < pool->sp_minthreads) 327 return (EINVAL); 328 mtx_lock(&pool->sp_lock); 329 pool->sp_maxthreads = newmaxthreads; 330 for (g = 0; g < pool->sp_groupcount; g++) { 331 pool->sp_groups[g].sg_maxthreads = max(1, 332 pool->sp_maxthreads / pool->sp_groupcount); 333 } 334 mtx_unlock(&pool->sp_lock); 335 } 336 return (error); 337 } 338 339 /* 340 * Activate a transport handle. 341 */ 342 void 343 xprt_register(SVCXPRT *xprt) 344 { 345 SVCPOOL *pool = xprt->xp_pool; 346 SVCGROUP *grp; 347 int g; 348 349 SVC_ACQUIRE(xprt); 350 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 351 xprt->xp_group = grp = &pool->sp_groups[g]; 352 mtx_lock(&grp->sg_lock); 353 xprt->xp_registered = TRUE; 354 xprt->xp_active = FALSE; 355 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 356 mtx_unlock(&grp->sg_lock); 357 } 358 359 /* 360 * De-activate a transport handle. Note: the locked version doesn't 361 * release the transport - caller must do that after dropping the pool 362 * lock. 363 */ 364 static void 365 xprt_unregister_locked(SVCXPRT *xprt) 366 { 367 SVCGROUP *grp = xprt->xp_group; 368 369 mtx_assert(&grp->sg_lock, MA_OWNED); 370 KASSERT(xprt->xp_registered == TRUE, 371 ("xprt_unregister_locked: not registered")); 372 xprt_inactive_locked(xprt); 373 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 374 xprt->xp_registered = FALSE; 375 } 376 377 void 378 xprt_unregister(SVCXPRT *xprt) 379 { 380 SVCGROUP *grp = xprt->xp_group; 381 382 mtx_lock(&grp->sg_lock); 383 if (xprt->xp_registered == FALSE) { 384 /* Already unregistered by another thread */ 385 mtx_unlock(&grp->sg_lock); 386 return; 387 } 388 xprt_unregister_locked(xprt); 389 mtx_unlock(&grp->sg_lock); 390 391 if (xprt->xp_socket != NULL) 392 soshutdown(xprt->xp_socket, SHUT_WR); 393 SVC_RELEASE(xprt); 394 } 395 396 /* 397 * Attempt to assign a service thread to this transport. 398 */ 399 static int 400 xprt_assignthread(SVCXPRT *xprt) 401 { 402 SVCGROUP *grp = xprt->xp_group; 403 SVCTHREAD *st; 404 405 mtx_assert(&grp->sg_lock, MA_OWNED); 406 st = LIST_FIRST(&grp->sg_idlethreads); 407 if (st) { 408 LIST_REMOVE(st, st_ilink); 409 SVC_ACQUIRE(xprt); 410 xprt->xp_thread = st; 411 st->st_xprt = xprt; 412 cv_signal(&st->st_cond); 413 return (TRUE); 414 } else { 415 /* 416 * See if we can create a new thread. The 417 * actual thread creation happens in 418 * svc_run_internal because our locking state 419 * is poorly defined (we are typically called 420 * from a socket upcall). Don't create more 421 * than one thread per second. 422 */ 423 if (grp->sg_state == SVCPOOL_ACTIVE 424 && grp->sg_lastcreatetime < time_uptime 425 && grp->sg_threadcount < grp->sg_maxthreads) { 426 grp->sg_state = SVCPOOL_THREADWANTED; 427 } 428 } 429 return (FALSE); 430 } 431 432 void 433 xprt_active(SVCXPRT *xprt) 434 { 435 SVCGROUP *grp = xprt->xp_group; 436 437 mtx_lock(&grp->sg_lock); 438 439 if (!xprt->xp_registered) { 440 /* 441 * Race with xprt_unregister - we lose. 442 */ 443 mtx_unlock(&grp->sg_lock); 444 return; 445 } 446 447 if (!xprt->xp_active) { 448 xprt->xp_active = TRUE; 449 if (xprt->xp_thread == NULL) { 450 if (!svc_request_space_available(xprt->xp_pool) || 451 !xprt_assignthread(xprt)) 452 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 453 xp_alink); 454 } 455 } 456 457 mtx_unlock(&grp->sg_lock); 458 } 459 460 void 461 xprt_inactive_locked(SVCXPRT *xprt) 462 { 463 SVCGROUP *grp = xprt->xp_group; 464 465 mtx_assert(&grp->sg_lock, MA_OWNED); 466 if (xprt->xp_active) { 467 if (xprt->xp_thread == NULL) 468 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 469 xprt->xp_active = FALSE; 470 } 471 } 472 473 void 474 xprt_inactive(SVCXPRT *xprt) 475 { 476 SVCGROUP *grp = xprt->xp_group; 477 478 mtx_lock(&grp->sg_lock); 479 xprt_inactive_locked(xprt); 480 mtx_unlock(&grp->sg_lock); 481 } 482 483 /* 484 * Variant of xprt_inactive() for use only when sure that port is 485 * assigned to thread. For example, within receive handlers. 486 */ 487 void 488 xprt_inactive_self(SVCXPRT *xprt) 489 { 490 491 KASSERT(xprt->xp_thread != NULL, 492 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 493 xprt->xp_active = FALSE; 494 } 495 496 /* 497 * Add a service program to the callout list. 498 * The dispatch routine will be called when a rpc request for this 499 * program number comes in. 500 */ 501 bool_t 502 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 503 void (*dispatch)(struct svc_req *, SVCXPRT *), 504 const struct netconfig *nconf) 505 { 506 SVCPOOL *pool = xprt->xp_pool; 507 struct svc_callout *s; 508 char *netid = NULL; 509 int flag = 0; 510 511 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 512 513 if (xprt->xp_netid) { 514 netid = strdup(xprt->xp_netid, M_RPC); 515 flag = 1; 516 } else if (nconf && nconf->nc_netid) { 517 netid = strdup(nconf->nc_netid, M_RPC); 518 flag = 1; 519 } /* must have been created with svc_raw_create */ 520 if ((netid == NULL) && (flag == 1)) { 521 return (FALSE); 522 } 523 524 mtx_lock(&pool->sp_lock); 525 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 526 if (netid) 527 free(netid, M_RPC); 528 if (s->sc_dispatch == dispatch) 529 goto rpcb_it; /* he is registering another xptr */ 530 mtx_unlock(&pool->sp_lock); 531 return (FALSE); 532 } 533 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 534 if (s == NULL) { 535 if (netid) 536 free(netid, M_RPC); 537 mtx_unlock(&pool->sp_lock); 538 return (FALSE); 539 } 540 541 s->sc_prog = prog; 542 s->sc_vers = vers; 543 s->sc_dispatch = dispatch; 544 s->sc_netid = netid; 545 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 546 547 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 548 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 549 550 rpcb_it: 551 mtx_unlock(&pool->sp_lock); 552 /* now register the information with the local binder service */ 553 if (nconf) { 554 bool_t dummy; 555 struct netconfig tnc; 556 struct netbuf nb; 557 tnc = *nconf; 558 nb.buf = &xprt->xp_ltaddr; 559 nb.len = xprt->xp_ltaddr.ss_len; 560 dummy = rpcb_set(prog, vers, &tnc, &nb); 561 return (dummy); 562 } 563 return (TRUE); 564 } 565 566 /* 567 * Remove a service program from the callout list. 568 */ 569 void 570 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 571 { 572 struct svc_callout *s; 573 574 /* unregister the information anyway */ 575 (void) rpcb_unset(prog, vers, NULL); 576 mtx_lock(&pool->sp_lock); 577 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 578 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 579 if (s->sc_netid) 580 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 581 mem_free(s, sizeof (struct svc_callout)); 582 } 583 mtx_unlock(&pool->sp_lock); 584 } 585 586 /* 587 * Add a service connection loss program to the callout list. 588 * The dispatch routine will be called when some port in ths pool die. 589 */ 590 bool_t 591 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 592 { 593 SVCPOOL *pool = xprt->xp_pool; 594 struct svc_loss_callout *s; 595 596 mtx_lock(&pool->sp_lock); 597 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 598 if (s->slc_dispatch == dispatch) 599 break; 600 } 601 if (s != NULL) { 602 mtx_unlock(&pool->sp_lock); 603 return (TRUE); 604 } 605 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 606 if (s == NULL) { 607 mtx_unlock(&pool->sp_lock); 608 return (FALSE); 609 } 610 s->slc_dispatch = dispatch; 611 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 612 mtx_unlock(&pool->sp_lock); 613 return (TRUE); 614 } 615 616 /* 617 * Remove a service connection loss program from the callout list. 618 */ 619 void 620 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 621 { 622 struct svc_loss_callout *s; 623 624 mtx_lock(&pool->sp_lock); 625 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 626 if (s->slc_dispatch == dispatch) { 627 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 628 free(s, M_RPC); 629 break; 630 } 631 } 632 mtx_unlock(&pool->sp_lock); 633 } 634 635 /* ********************** CALLOUT list related stuff ************* */ 636 637 /* 638 * Search the callout list for a program number, return the callout 639 * struct. 640 */ 641 static struct svc_callout * 642 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 643 { 644 struct svc_callout *s; 645 646 mtx_assert(&pool->sp_lock, MA_OWNED); 647 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 648 if (s->sc_prog == prog && s->sc_vers == vers 649 && (netid == NULL || s->sc_netid == NULL || 650 strcmp(netid, s->sc_netid) == 0)) 651 break; 652 } 653 654 return (s); 655 } 656 657 /* ******************* REPLY GENERATION ROUTINES ************ */ 658 659 static bool_t 660 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 661 struct mbuf *body) 662 { 663 SVCXPRT *xprt = rqstp->rq_xprt; 664 bool_t ok; 665 666 if (rqstp->rq_args) { 667 m_freem(rqstp->rq_args); 668 rqstp->rq_args = NULL; 669 } 670 671 if (xprt->xp_pool->sp_rcache) 672 replay_setreply(xprt->xp_pool->sp_rcache, 673 rply, svc_getrpccaller(rqstp), body); 674 675 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 676 return (FALSE); 677 678 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 679 if (rqstp->rq_addr) { 680 free(rqstp->rq_addr, M_SONAME); 681 rqstp->rq_addr = NULL; 682 } 683 684 return (ok); 685 } 686 687 /* 688 * Send a reply to an rpc request 689 */ 690 bool_t 691 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 692 { 693 struct rpc_msg rply; 694 struct mbuf *m; 695 XDR xdrs; 696 bool_t ok; 697 698 rply.rm_xid = rqstp->rq_xid; 699 rply.rm_direction = REPLY; 700 rply.rm_reply.rp_stat = MSG_ACCEPTED; 701 rply.acpted_rply.ar_verf = rqstp->rq_verf; 702 rply.acpted_rply.ar_stat = SUCCESS; 703 rply.acpted_rply.ar_results.where = NULL; 704 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 705 706 m = m_getcl(M_WAITOK, MT_DATA, 0); 707 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 708 ok = xdr_results(&xdrs, xdr_location); 709 XDR_DESTROY(&xdrs); 710 711 if (ok) { 712 return (svc_sendreply_common(rqstp, &rply, m)); 713 } else { 714 m_freem(m); 715 return (FALSE); 716 } 717 } 718 719 bool_t 720 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 721 { 722 struct rpc_msg rply; 723 724 rply.rm_xid = rqstp->rq_xid; 725 rply.rm_direction = REPLY; 726 rply.rm_reply.rp_stat = MSG_ACCEPTED; 727 rply.acpted_rply.ar_verf = rqstp->rq_verf; 728 rply.acpted_rply.ar_stat = SUCCESS; 729 rply.acpted_rply.ar_results.where = NULL; 730 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 731 732 return (svc_sendreply_common(rqstp, &rply, m)); 733 } 734 735 /* 736 * No procedure error reply 737 */ 738 void 739 svcerr_noproc(struct svc_req *rqstp) 740 { 741 SVCXPRT *xprt = rqstp->rq_xprt; 742 struct rpc_msg rply; 743 744 rply.rm_xid = rqstp->rq_xid; 745 rply.rm_direction = REPLY; 746 rply.rm_reply.rp_stat = MSG_ACCEPTED; 747 rply.acpted_rply.ar_verf = rqstp->rq_verf; 748 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 749 750 if (xprt->xp_pool->sp_rcache) 751 replay_setreply(xprt->xp_pool->sp_rcache, 752 &rply, svc_getrpccaller(rqstp), NULL); 753 754 svc_sendreply_common(rqstp, &rply, NULL); 755 } 756 757 /* 758 * Can't decode args error reply 759 */ 760 void 761 svcerr_decode(struct svc_req *rqstp) 762 { 763 SVCXPRT *xprt = rqstp->rq_xprt; 764 struct rpc_msg rply; 765 766 rply.rm_xid = rqstp->rq_xid; 767 rply.rm_direction = REPLY; 768 rply.rm_reply.rp_stat = MSG_ACCEPTED; 769 rply.acpted_rply.ar_verf = rqstp->rq_verf; 770 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 771 772 if (xprt->xp_pool->sp_rcache) 773 replay_setreply(xprt->xp_pool->sp_rcache, 774 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 775 776 svc_sendreply_common(rqstp, &rply, NULL); 777 } 778 779 /* 780 * Some system error 781 */ 782 void 783 svcerr_systemerr(struct svc_req *rqstp) 784 { 785 SVCXPRT *xprt = rqstp->rq_xprt; 786 struct rpc_msg rply; 787 788 rply.rm_xid = rqstp->rq_xid; 789 rply.rm_direction = REPLY; 790 rply.rm_reply.rp_stat = MSG_ACCEPTED; 791 rply.acpted_rply.ar_verf = rqstp->rq_verf; 792 rply.acpted_rply.ar_stat = SYSTEM_ERR; 793 794 if (xprt->xp_pool->sp_rcache) 795 replay_setreply(xprt->xp_pool->sp_rcache, 796 &rply, svc_getrpccaller(rqstp), NULL); 797 798 svc_sendreply_common(rqstp, &rply, NULL); 799 } 800 801 /* 802 * Authentication error reply 803 */ 804 void 805 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 806 { 807 SVCXPRT *xprt = rqstp->rq_xprt; 808 struct rpc_msg rply; 809 810 rply.rm_xid = rqstp->rq_xid; 811 rply.rm_direction = REPLY; 812 rply.rm_reply.rp_stat = MSG_DENIED; 813 rply.rjcted_rply.rj_stat = AUTH_ERROR; 814 rply.rjcted_rply.rj_why = why; 815 816 if (xprt->xp_pool->sp_rcache) 817 replay_setreply(xprt->xp_pool->sp_rcache, 818 &rply, svc_getrpccaller(rqstp), NULL); 819 820 svc_sendreply_common(rqstp, &rply, NULL); 821 } 822 823 /* 824 * Auth too weak error reply 825 */ 826 void 827 svcerr_weakauth(struct svc_req *rqstp) 828 { 829 830 svcerr_auth(rqstp, AUTH_TOOWEAK); 831 } 832 833 /* 834 * Program unavailable error reply 835 */ 836 void 837 svcerr_noprog(struct svc_req *rqstp) 838 { 839 SVCXPRT *xprt = rqstp->rq_xprt; 840 struct rpc_msg rply; 841 842 rply.rm_xid = rqstp->rq_xid; 843 rply.rm_direction = REPLY; 844 rply.rm_reply.rp_stat = MSG_ACCEPTED; 845 rply.acpted_rply.ar_verf = rqstp->rq_verf; 846 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 847 848 if (xprt->xp_pool->sp_rcache) 849 replay_setreply(xprt->xp_pool->sp_rcache, 850 &rply, svc_getrpccaller(rqstp), NULL); 851 852 svc_sendreply_common(rqstp, &rply, NULL); 853 } 854 855 /* 856 * Program version mismatch error reply 857 */ 858 void 859 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 860 { 861 SVCXPRT *xprt = rqstp->rq_xprt; 862 struct rpc_msg rply; 863 864 rply.rm_xid = rqstp->rq_xid; 865 rply.rm_direction = REPLY; 866 rply.rm_reply.rp_stat = MSG_ACCEPTED; 867 rply.acpted_rply.ar_verf = rqstp->rq_verf; 868 rply.acpted_rply.ar_stat = PROG_MISMATCH; 869 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 870 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 871 872 if (xprt->xp_pool->sp_rcache) 873 replay_setreply(xprt->xp_pool->sp_rcache, 874 &rply, svc_getrpccaller(rqstp), NULL); 875 876 svc_sendreply_common(rqstp, &rply, NULL); 877 } 878 879 /* 880 * Allocate a new server transport structure. All fields are 881 * initialized to zero and xp_p3 is initialized to point at an 882 * extension structure to hold various flags and authentication 883 * parameters. 884 */ 885 SVCXPRT * 886 svc_xprt_alloc(void) 887 { 888 SVCXPRT *xprt; 889 SVCXPRT_EXT *ext; 890 891 xprt = mem_alloc(sizeof(SVCXPRT)); 892 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 893 xprt->xp_p3 = ext; 894 refcount_init(&xprt->xp_refs, 1); 895 896 return (xprt); 897 } 898 899 /* 900 * Free a server transport structure. 901 */ 902 void 903 svc_xprt_free(SVCXPRT *xprt) 904 { 905 906 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 907 /* The size argument is ignored, so 0 is ok. */ 908 mem_free(xprt->xp_gidp, 0); 909 mem_free(xprt, sizeof(SVCXPRT)); 910 } 911 912 /* ******************* SERVER INPUT STUFF ******************* */ 913 914 /* 915 * Read RPC requests from a transport and queue them to be 916 * executed. We handle authentication and replay cache replies here. 917 * Actually dispatching the RPC is deferred till svc_executereq. 918 */ 919 static enum xprt_stat 920 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 921 { 922 SVCPOOL *pool = xprt->xp_pool; 923 struct svc_req *r; 924 struct rpc_msg msg; 925 struct mbuf *args; 926 struct svc_loss_callout *s; 927 enum xprt_stat stat; 928 929 /* now receive msgs from xprtprt (support batch calls) */ 930 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 931 932 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 933 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 934 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 935 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 936 enum auth_stat why; 937 938 /* 939 * Handle replays and authenticate before queuing the 940 * request to be executed. 941 */ 942 SVC_ACQUIRE(xprt); 943 r->rq_xprt = xprt; 944 if (pool->sp_rcache) { 945 struct rpc_msg repmsg; 946 struct mbuf *repbody; 947 enum replay_state rs; 948 rs = replay_find(pool->sp_rcache, &msg, 949 svc_getrpccaller(r), &repmsg, &repbody); 950 switch (rs) { 951 case RS_NEW: 952 break; 953 case RS_DONE: 954 SVC_REPLY(xprt, &repmsg, r->rq_addr, 955 repbody, &r->rq_reply_seq); 956 if (r->rq_addr) { 957 free(r->rq_addr, M_SONAME); 958 r->rq_addr = NULL; 959 } 960 m_freem(args); 961 goto call_done; 962 963 default: 964 m_freem(args); 965 goto call_done; 966 } 967 } 968 969 r->rq_xid = msg.rm_xid; 970 r->rq_prog = msg.rm_call.cb_prog; 971 r->rq_vers = msg.rm_call.cb_vers; 972 r->rq_proc = msg.rm_call.cb_proc; 973 r->rq_size = sizeof(*r) + m_length(args, NULL); 974 r->rq_args = args; 975 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 976 /* 977 * RPCSEC_GSS uses this return code 978 * for requests that form part of its 979 * context establishment protocol and 980 * should not be dispatched to the 981 * application. 982 */ 983 if (why != RPCSEC_GSS_NODISPATCH) 984 svcerr_auth(r, why); 985 goto call_done; 986 } 987 988 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 989 svcerr_decode(r); 990 goto call_done; 991 } 992 993 /* 994 * Defer enabling DDP until the first non-NULLPROC RPC 995 * is received to allow STARTTLS authentication to 996 * enable TLS offload first. 997 */ 998 if (xprt->xp_doneddp == 0 && r->rq_proc != NULLPROC && 999 atomic_cmpset_int(&xprt->xp_doneddp, 0, 1)) { 1000 if (xprt->xp_socket->so_proto->pr_protocol == 1001 IPPROTO_TCP) { 1002 int optval = 1; 1003 1004 (void)so_setsockopt(xprt->xp_socket, 1005 IPPROTO_TCP, TCP_USE_DDP, &optval, 1006 sizeof(optval)); 1007 } 1008 } 1009 1010 /* 1011 * Everything checks out, return request to caller. 1012 */ 1013 *rqstp_ret = r; 1014 r = NULL; 1015 } 1016 call_done: 1017 if (r) { 1018 svc_freereq(r); 1019 r = NULL; 1020 } 1021 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1022 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1023 (*s->slc_dispatch)(xprt); 1024 xprt_unregister(xprt); 1025 } 1026 1027 return (stat); 1028 } 1029 1030 static void 1031 svc_executereq(struct svc_req *rqstp) 1032 { 1033 SVCXPRT *xprt = rqstp->rq_xprt; 1034 SVCPOOL *pool = xprt->xp_pool; 1035 int prog_found; 1036 rpcvers_t low_vers; 1037 rpcvers_t high_vers; 1038 struct svc_callout *s; 1039 1040 /* now match message with a registered service*/ 1041 prog_found = FALSE; 1042 low_vers = (rpcvers_t) -1L; 1043 high_vers = (rpcvers_t) 0L; 1044 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1045 if (s->sc_prog == rqstp->rq_prog) { 1046 if (s->sc_vers == rqstp->rq_vers) { 1047 /* 1048 * We hand ownership of r to the 1049 * dispatch method - they must call 1050 * svc_freereq. 1051 */ 1052 (*s->sc_dispatch)(rqstp, xprt); 1053 return; 1054 } /* found correct version */ 1055 prog_found = TRUE; 1056 if (s->sc_vers < low_vers) 1057 low_vers = s->sc_vers; 1058 if (s->sc_vers > high_vers) 1059 high_vers = s->sc_vers; 1060 } /* found correct program */ 1061 } 1062 1063 /* 1064 * if we got here, the program or version 1065 * is not served ... 1066 */ 1067 if (prog_found) 1068 svcerr_progvers(rqstp, low_vers, high_vers); 1069 else 1070 svcerr_noprog(rqstp); 1071 1072 svc_freereq(rqstp); 1073 } 1074 1075 static void 1076 svc_checkidle(SVCGROUP *grp) 1077 { 1078 SVCXPRT *xprt, *nxprt; 1079 time_t timo; 1080 struct svcxprt_list cleanup; 1081 1082 TAILQ_INIT(&cleanup); 1083 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1084 /* 1085 * Only some transports have idle timers. Don't time 1086 * something out which is just waking up. 1087 */ 1088 if (!xprt->xp_idletimeout || xprt->xp_thread) 1089 continue; 1090 1091 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1092 if (time_uptime > timo) { 1093 xprt_unregister_locked(xprt); 1094 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1095 } 1096 } 1097 1098 mtx_unlock(&grp->sg_lock); 1099 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1100 soshutdown(xprt->xp_socket, SHUT_WR); 1101 SVC_RELEASE(xprt); 1102 } 1103 mtx_lock(&grp->sg_lock); 1104 } 1105 1106 static void 1107 svc_assign_waiting_sockets(SVCPOOL *pool) 1108 { 1109 SVCGROUP *grp; 1110 SVCXPRT *xprt; 1111 int g; 1112 1113 for (g = 0; g < pool->sp_groupcount; g++) { 1114 grp = &pool->sp_groups[g]; 1115 mtx_lock(&grp->sg_lock); 1116 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1117 if (xprt_assignthread(xprt)) 1118 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1119 else 1120 break; 1121 } 1122 mtx_unlock(&grp->sg_lock); 1123 } 1124 } 1125 1126 static void 1127 svc_change_space_used(SVCPOOL *pool, long delta) 1128 { 1129 unsigned long value; 1130 1131 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1132 if (delta > 0) { 1133 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1134 pool->sp_space_throttled = TRUE; 1135 pool->sp_space_throttle_count++; 1136 } 1137 if (value > pool->sp_space_used_highest) 1138 pool->sp_space_used_highest = value; 1139 } else { 1140 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1141 pool->sp_space_throttled = FALSE; 1142 svc_assign_waiting_sockets(pool); 1143 } 1144 } 1145 } 1146 1147 static bool_t 1148 svc_request_space_available(SVCPOOL *pool) 1149 { 1150 1151 if (pool->sp_space_throttled) 1152 return (FALSE); 1153 return (TRUE); 1154 } 1155 1156 static void 1157 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1158 { 1159 SVCPOOL *pool = grp->sg_pool; 1160 SVCTHREAD *st, *stpref; 1161 SVCXPRT *xprt; 1162 enum xprt_stat stat; 1163 struct svc_req *rqstp; 1164 struct proc *p; 1165 long sz; 1166 int error; 1167 1168 st = mem_alloc(sizeof(*st)); 1169 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1170 st->st_pool = pool; 1171 st->st_xprt = NULL; 1172 STAILQ_INIT(&st->st_reqs); 1173 cv_init(&st->st_cond, "rpcsvc"); 1174 1175 mtx_lock(&grp->sg_lock); 1176 1177 /* 1178 * If we are a new thread which was spawned to cope with 1179 * increased load, set the state back to SVCPOOL_ACTIVE. 1180 */ 1181 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1182 grp->sg_state = SVCPOOL_ACTIVE; 1183 1184 while (grp->sg_state != SVCPOOL_CLOSING) { 1185 /* 1186 * Create new thread if requested. 1187 */ 1188 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1189 grp->sg_state = SVCPOOL_THREADSTARTING; 1190 grp->sg_lastcreatetime = time_uptime; 1191 mtx_unlock(&grp->sg_lock); 1192 svc_new_thread(grp); 1193 mtx_lock(&grp->sg_lock); 1194 continue; 1195 } 1196 1197 /* 1198 * Check for idle transports once per second. 1199 */ 1200 if (time_uptime > grp->sg_lastidlecheck) { 1201 grp->sg_lastidlecheck = time_uptime; 1202 svc_checkidle(grp); 1203 } 1204 1205 xprt = st->st_xprt; 1206 if (!xprt) { 1207 /* 1208 * Enforce maxthreads count. 1209 */ 1210 if (!ismaster && grp->sg_threadcount > 1211 grp->sg_maxthreads) 1212 break; 1213 1214 /* 1215 * Before sleeping, see if we can find an 1216 * active transport which isn't being serviced 1217 * by a thread. 1218 */ 1219 if (svc_request_space_available(pool) && 1220 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1221 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1222 SVC_ACQUIRE(xprt); 1223 xprt->xp_thread = st; 1224 st->st_xprt = xprt; 1225 continue; 1226 } 1227 1228 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1229 if (ismaster || (!ismaster && 1230 grp->sg_threadcount > grp->sg_minthreads)) 1231 error = cv_timedwait_sig(&st->st_cond, 1232 &grp->sg_lock, 5 * hz); 1233 else 1234 error = cv_wait_sig(&st->st_cond, 1235 &grp->sg_lock); 1236 if (st->st_xprt == NULL) 1237 LIST_REMOVE(st, st_ilink); 1238 1239 /* 1240 * Reduce worker thread count when idle. 1241 */ 1242 if (error == EWOULDBLOCK) { 1243 if (!ismaster 1244 && (grp->sg_threadcount 1245 > grp->sg_minthreads) 1246 && !st->st_xprt) 1247 break; 1248 } else if (error != 0) { 1249 KASSERT(error == EINTR || error == ERESTART, 1250 ("non-signal error %d", error)); 1251 mtx_unlock(&grp->sg_lock); 1252 p = curproc; 1253 PROC_LOCK(p); 1254 if (P_SHOULDSTOP(p) || 1255 (p->p_flag & P_TOTAL_STOP) != 0) { 1256 thread_suspend_check(0); 1257 PROC_UNLOCK(p); 1258 mtx_lock(&grp->sg_lock); 1259 } else { 1260 PROC_UNLOCK(p); 1261 svc_exit(pool); 1262 mtx_lock(&grp->sg_lock); 1263 break; 1264 } 1265 } 1266 continue; 1267 } 1268 mtx_unlock(&grp->sg_lock); 1269 1270 /* 1271 * Drain the transport socket and queue up any RPCs. 1272 */ 1273 xprt->xp_lastactive = time_uptime; 1274 do { 1275 if (!svc_request_space_available(pool)) 1276 break; 1277 rqstp = NULL; 1278 stat = svc_getreq(xprt, &rqstp); 1279 if (rqstp) { 1280 svc_change_space_used(pool, rqstp->rq_size); 1281 /* 1282 * See if the application has a preference 1283 * for some other thread. 1284 */ 1285 if (pool->sp_assign) { 1286 stpref = pool->sp_assign(st, rqstp); 1287 rqstp->rq_thread = stpref; 1288 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1289 rqstp, rq_link); 1290 mtx_unlock(&stpref->st_lock); 1291 if (stpref != st) 1292 rqstp = NULL; 1293 } else { 1294 rqstp->rq_thread = st; 1295 STAILQ_INSERT_TAIL(&st->st_reqs, 1296 rqstp, rq_link); 1297 } 1298 } 1299 } while (rqstp == NULL && stat == XPRT_MOREREQS 1300 && grp->sg_state != SVCPOOL_CLOSING); 1301 1302 /* 1303 * Move this transport to the end of the active list to 1304 * ensure fairness when multiple transports are active. 1305 * If this was the last queued request, svc_getreq will end 1306 * up calling xprt_inactive to remove from the active list. 1307 */ 1308 mtx_lock(&grp->sg_lock); 1309 xprt->xp_thread = NULL; 1310 st->st_xprt = NULL; 1311 if (xprt->xp_active) { 1312 if (!svc_request_space_available(pool) || 1313 !xprt_assignthread(xprt)) 1314 TAILQ_INSERT_TAIL(&grp->sg_active, 1315 xprt, xp_alink); 1316 } 1317 mtx_unlock(&grp->sg_lock); 1318 SVC_RELEASE(xprt); 1319 1320 /* 1321 * Execute what we have queued. 1322 */ 1323 mtx_lock(&st->st_lock); 1324 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1325 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1326 mtx_unlock(&st->st_lock); 1327 sz = (long)rqstp->rq_size; 1328 svc_executereq(rqstp); 1329 svc_change_space_used(pool, -sz); 1330 mtx_lock(&st->st_lock); 1331 } 1332 mtx_unlock(&st->st_lock); 1333 mtx_lock(&grp->sg_lock); 1334 } 1335 1336 if (st->st_xprt) { 1337 xprt = st->st_xprt; 1338 st->st_xprt = NULL; 1339 SVC_RELEASE(xprt); 1340 } 1341 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1342 mtx_destroy(&st->st_lock); 1343 cv_destroy(&st->st_cond); 1344 mem_free(st, sizeof(*st)); 1345 1346 grp->sg_threadcount--; 1347 if (!ismaster) 1348 wakeup(grp); 1349 mtx_unlock(&grp->sg_lock); 1350 } 1351 1352 static void 1353 svc_thread_start(void *arg) 1354 { 1355 1356 svc_run_internal((SVCGROUP *) arg, FALSE); 1357 kthread_exit(); 1358 } 1359 1360 static void 1361 svc_new_thread(SVCGROUP *grp) 1362 { 1363 SVCPOOL *pool = grp->sg_pool; 1364 struct thread *td; 1365 1366 mtx_lock(&grp->sg_lock); 1367 grp->sg_threadcount++; 1368 mtx_unlock(&grp->sg_lock); 1369 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1370 "%s: service", pool->sp_name); 1371 } 1372 1373 void 1374 svc_run(SVCPOOL *pool) 1375 { 1376 int g, i; 1377 struct proc *p; 1378 struct thread *td; 1379 SVCGROUP *grp; 1380 1381 p = curproc; 1382 td = curthread; 1383 snprintf(td->td_name, sizeof(td->td_name), 1384 "%s: master", pool->sp_name); 1385 pool->sp_state = SVCPOOL_ACTIVE; 1386 pool->sp_proc = p; 1387 1388 /* Choose group count based on number of threads and CPUs. */ 1389 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1390 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1391 for (g = 0; g < pool->sp_groupcount; g++) { 1392 grp = &pool->sp_groups[g]; 1393 grp->sg_minthreads = max(1, 1394 pool->sp_minthreads / pool->sp_groupcount); 1395 grp->sg_maxthreads = max(1, 1396 pool->sp_maxthreads / pool->sp_groupcount); 1397 grp->sg_lastcreatetime = time_uptime; 1398 } 1399 1400 /* Starting threads */ 1401 pool->sp_groups[0].sg_threadcount++; 1402 for (g = 0; g < pool->sp_groupcount; g++) { 1403 grp = &pool->sp_groups[g]; 1404 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1405 svc_new_thread(grp); 1406 } 1407 svc_run_internal(&pool->sp_groups[0], TRUE); 1408 1409 /* Waiting for threads to stop. */ 1410 for (g = 0; g < pool->sp_groupcount; g++) { 1411 grp = &pool->sp_groups[g]; 1412 mtx_lock(&grp->sg_lock); 1413 while (grp->sg_threadcount > 0) 1414 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1415 mtx_unlock(&grp->sg_lock); 1416 } 1417 } 1418 1419 void 1420 svc_exit(SVCPOOL *pool) 1421 { 1422 SVCGROUP *grp; 1423 SVCTHREAD *st; 1424 int g; 1425 1426 pool->sp_state = SVCPOOL_CLOSING; 1427 for (g = 0; g < pool->sp_groupcount; g++) { 1428 grp = &pool->sp_groups[g]; 1429 mtx_lock(&grp->sg_lock); 1430 if (grp->sg_state != SVCPOOL_CLOSING) { 1431 grp->sg_state = SVCPOOL_CLOSING; 1432 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1433 cv_signal(&st->st_cond); 1434 } 1435 mtx_unlock(&grp->sg_lock); 1436 } 1437 } 1438 1439 bool_t 1440 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1441 { 1442 struct mbuf *m; 1443 XDR xdrs; 1444 bool_t stat; 1445 1446 m = rqstp->rq_args; 1447 rqstp->rq_args = NULL; 1448 1449 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1450 stat = xargs(&xdrs, args); 1451 XDR_DESTROY(&xdrs); 1452 1453 return (stat); 1454 } 1455 1456 bool_t 1457 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1458 { 1459 XDR xdrs; 1460 1461 if (rqstp->rq_addr) { 1462 free(rqstp->rq_addr, M_SONAME); 1463 rqstp->rq_addr = NULL; 1464 } 1465 1466 xdrs.x_op = XDR_FREE; 1467 return (xargs(&xdrs, args)); 1468 } 1469 1470 void 1471 svc_freereq(struct svc_req *rqstp) 1472 { 1473 SVCTHREAD *st; 1474 SVCPOOL *pool; 1475 1476 st = rqstp->rq_thread; 1477 if (st) { 1478 pool = st->st_pool; 1479 if (pool->sp_done) 1480 pool->sp_done(st, rqstp); 1481 } 1482 1483 if (rqstp->rq_auth.svc_ah_ops) 1484 SVCAUTH_RELEASE(&rqstp->rq_auth); 1485 1486 if (rqstp->rq_xprt) { 1487 SVC_RELEASE(rqstp->rq_xprt); 1488 } 1489 1490 if (rqstp->rq_addr) 1491 free(rqstp->rq_addr, M_SONAME); 1492 1493 if (rqstp->rq_args) 1494 m_freem(rqstp->rq_args); 1495 1496 free(rqstp, M_RPC); 1497 } 1498