1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <sys/cdefs.h> 34 /* 35 * svc.c, Server-side remote procedure call interface. 36 * 37 * There are two sets of procedures here. The xprt routines are 38 * for handling transport handles. The svc routines handle the 39 * list of service routines. 40 * 41 * Copyright (C) 1984, Sun Microsystems, Inc. 42 */ 43 44 #include <sys/param.h> 45 #include <sys/jail.h> 46 #include <sys/lock.h> 47 #include <sys/kernel.h> 48 #include <sys/kthread.h> 49 #include <sys/malloc.h> 50 #include <sys/mbuf.h> 51 #include <sys/mutex.h> 52 #include <sys/proc.h> 53 #include <sys/protosw.h> 54 #include <sys/queue.h> 55 #include <sys/socketvar.h> 56 #include <sys/systm.h> 57 #include <sys/smp.h> 58 #include <sys/sx.h> 59 #include <sys/ucred.h> 60 61 #include <netinet/tcp.h> 62 63 #include <rpc/rpc.h> 64 #include <rpc/rpcb_clnt.h> 65 #include <rpc/replay.h> 66 67 #include <rpc/rpc_com.h> 68 69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 71 72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 73 char *); 74 static void svc_new_thread(SVCGROUP *grp); 75 static void xprt_unregister_locked(SVCXPRT *xprt); 76 static void svc_change_space_used(SVCPOOL *pool, long delta); 77 static bool_t svc_request_space_available(SVCPOOL *pool); 78 static void svcpool_cleanup(SVCPOOL *pool); 79 80 /* *************** SVCXPRT related stuff **************** */ 81 82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 85 86 SVCPOOL* 87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 88 { 89 SVCPOOL *pool; 90 SVCGROUP *grp; 91 int g; 92 93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 94 95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 96 pool->sp_name = name; 97 pool->sp_state = SVCPOOL_INIT; 98 pool->sp_proc = NULL; 99 TAILQ_INIT(&pool->sp_callouts); 100 TAILQ_INIT(&pool->sp_lcallouts); 101 pool->sp_minthreads = 1; 102 pool->sp_maxthreads = 1; 103 pool->sp_groupcount = 1; 104 for (g = 0; g < SVC_MAXGROUPS; g++) { 105 grp = &pool->sp_groups[g]; 106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 107 grp->sg_pool = pool; 108 grp->sg_state = SVCPOOL_ACTIVE; 109 TAILQ_INIT(&grp->sg_xlist); 110 TAILQ_INIT(&grp->sg_active); 111 LIST_INIT(&grp->sg_idlethreads); 112 grp->sg_minthreads = 1; 113 grp->sg_maxthreads = 1; 114 } 115 116 /* 117 * Don't use more than a quarter of mbuf clusters. Nota bene: 118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 119 * on LP64 architectures, so cast to u_long to avoid undefined 120 * behavior. (ILP32 architectures cannot have nmbclusters 121 * large enough to overflow for other reasons.) 122 */ 123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 124 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 125 126 sysctl_ctx_init(&pool->sp_sysctl); 127 if (IS_DEFAULT_VNET(curvnet) && sysctl_base) { 128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 129 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 130 pool, 0, svcpool_minthread_sysctl, "I", 131 "Minimal number of threads"); 132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 134 pool, 0, svcpool_maxthread_sysctl, "I", 135 "Maximal number of threads"); 136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 137 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 138 pool, 0, svcpool_threads_sysctl, "I", 139 "Current number of threads"); 140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 142 "Number of thread groups"); 143 144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 145 "request_space_used", CTLFLAG_RD, 146 &pool->sp_space_used, 147 "Space in parsed but not handled requests."); 148 149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 150 "request_space_used_highest", CTLFLAG_RD, 151 &pool->sp_space_used_highest, 152 "Highest space used since reboot."); 153 154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 155 "request_space_high", CTLFLAG_RW, 156 &pool->sp_space_high, 157 "Maximum space in parsed but not handled requests."); 158 159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 160 "request_space_low", CTLFLAG_RW, 161 &pool->sp_space_low, 162 "Low water mark for request space."); 163 164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 165 "request_space_throttled", CTLFLAG_RD, 166 &pool->sp_space_throttled, 0, 167 "Whether nfs requests are currently throttled"); 168 169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 170 "request_space_throttle_count", CTLFLAG_RD, 171 &pool->sp_space_throttle_count, 0, 172 "Count of times throttling based on request space has occurred"); 173 } 174 175 return pool; 176 } 177 178 /* 179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 180 * the pool data structures. 181 */ 182 static void 183 svcpool_cleanup(SVCPOOL *pool) 184 { 185 SVCGROUP *grp; 186 SVCXPRT *xprt, *nxprt; 187 struct svc_callout *s; 188 struct svc_loss_callout *sl; 189 struct svcxprt_list cleanup; 190 int g; 191 192 TAILQ_INIT(&cleanup); 193 194 for (g = 0; g < SVC_MAXGROUPS; g++) { 195 grp = &pool->sp_groups[g]; 196 mtx_lock(&grp->sg_lock); 197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 198 xprt_unregister_locked(xprt); 199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 200 } 201 mtx_unlock(&grp->sg_lock); 202 } 203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 204 if (xprt->xp_socket != NULL) 205 soshutdown(xprt->xp_socket, SHUT_WR); 206 SVC_RELEASE(xprt); 207 } 208 209 mtx_lock(&pool->sp_lock); 210 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 211 mtx_unlock(&pool->sp_lock); 212 svc_unreg(pool, s->sc_prog, s->sc_vers); 213 mtx_lock(&pool->sp_lock); 214 } 215 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 216 mtx_unlock(&pool->sp_lock); 217 svc_loss_unreg(pool, sl->slc_dispatch); 218 mtx_lock(&pool->sp_lock); 219 } 220 mtx_unlock(&pool->sp_lock); 221 } 222 223 void 224 svcpool_destroy(SVCPOOL *pool) 225 { 226 SVCGROUP *grp; 227 int g; 228 229 svcpool_cleanup(pool); 230 231 for (g = 0; g < SVC_MAXGROUPS; g++) { 232 grp = &pool->sp_groups[g]; 233 mtx_destroy(&grp->sg_lock); 234 } 235 mtx_destroy(&pool->sp_lock); 236 237 if (pool->sp_rcache) 238 replay_freecache(pool->sp_rcache); 239 240 sysctl_ctx_free(&pool->sp_sysctl); 241 free(pool, M_RPC); 242 } 243 244 /* 245 * Similar to svcpool_destroy(), except that it does not destroy the actual 246 * data structures. As such, "pool" may be used again. 247 */ 248 void 249 svcpool_close(SVCPOOL *pool) 250 { 251 SVCGROUP *grp; 252 int g; 253 254 svcpool_cleanup(pool); 255 256 /* Now, initialize the pool's state for a fresh svc_run() call. */ 257 mtx_lock(&pool->sp_lock); 258 pool->sp_state = SVCPOOL_INIT; 259 mtx_unlock(&pool->sp_lock); 260 for (g = 0; g < SVC_MAXGROUPS; g++) { 261 grp = &pool->sp_groups[g]; 262 mtx_lock(&grp->sg_lock); 263 grp->sg_state = SVCPOOL_ACTIVE; 264 mtx_unlock(&grp->sg_lock); 265 } 266 } 267 268 /* 269 * Sysctl handler to get the present thread count on a pool 270 */ 271 static int 272 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 273 { 274 SVCPOOL *pool; 275 int threads, error, g; 276 277 pool = oidp->oid_arg1; 278 threads = 0; 279 mtx_lock(&pool->sp_lock); 280 for (g = 0; g < pool->sp_groupcount; g++) 281 threads += pool->sp_groups[g].sg_threadcount; 282 mtx_unlock(&pool->sp_lock); 283 error = sysctl_handle_int(oidp, &threads, 0, req); 284 return (error); 285 } 286 287 /* 288 * Sysctl handler to set the minimum thread count on a pool 289 */ 290 static int 291 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 292 { 293 SVCPOOL *pool; 294 int newminthreads, error, g; 295 296 pool = oidp->oid_arg1; 297 newminthreads = pool->sp_minthreads; 298 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 299 if (error == 0 && newminthreads != pool->sp_minthreads) { 300 if (newminthreads > pool->sp_maxthreads) 301 return (EINVAL); 302 mtx_lock(&pool->sp_lock); 303 pool->sp_minthreads = newminthreads; 304 for (g = 0; g < pool->sp_groupcount; g++) { 305 pool->sp_groups[g].sg_minthreads = max(1, 306 pool->sp_minthreads / pool->sp_groupcount); 307 } 308 mtx_unlock(&pool->sp_lock); 309 } 310 return (error); 311 } 312 313 /* 314 * Sysctl handler to set the maximum thread count on a pool 315 */ 316 static int 317 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 318 { 319 SVCPOOL *pool; 320 int newmaxthreads, error, g; 321 322 pool = oidp->oid_arg1; 323 newmaxthreads = pool->sp_maxthreads; 324 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 325 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 326 if (newmaxthreads < pool->sp_minthreads) 327 return (EINVAL); 328 mtx_lock(&pool->sp_lock); 329 pool->sp_maxthreads = newmaxthreads; 330 for (g = 0; g < pool->sp_groupcount; g++) { 331 pool->sp_groups[g].sg_maxthreads = max(1, 332 pool->sp_maxthreads / pool->sp_groupcount); 333 } 334 mtx_unlock(&pool->sp_lock); 335 } 336 return (error); 337 } 338 339 /* 340 * Activate a transport handle. 341 */ 342 void 343 xprt_register(SVCXPRT *xprt) 344 { 345 SVCPOOL *pool = xprt->xp_pool; 346 SVCGROUP *grp; 347 int g; 348 349 SVC_ACQUIRE(xprt); 350 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 351 xprt->xp_group = grp = &pool->sp_groups[g]; 352 mtx_lock(&grp->sg_lock); 353 xprt->xp_registered = TRUE; 354 xprt->xp_active = FALSE; 355 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 356 mtx_unlock(&grp->sg_lock); 357 } 358 359 /* 360 * De-activate a transport handle. Note: the locked version doesn't 361 * release the transport - caller must do that after dropping the pool 362 * lock. 363 */ 364 static void 365 xprt_unregister_locked(SVCXPRT *xprt) 366 { 367 SVCGROUP *grp = xprt->xp_group; 368 369 mtx_assert(&grp->sg_lock, MA_OWNED); 370 KASSERT(xprt->xp_registered == TRUE, 371 ("xprt_unregister_locked: not registered")); 372 xprt_inactive_locked(xprt); 373 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 374 xprt->xp_registered = FALSE; 375 } 376 377 void 378 xprt_unregister(SVCXPRT *xprt) 379 { 380 SVCGROUP *grp = xprt->xp_group; 381 382 mtx_lock(&grp->sg_lock); 383 if (xprt->xp_registered == FALSE) { 384 /* Already unregistered by another thread */ 385 mtx_unlock(&grp->sg_lock); 386 return; 387 } 388 xprt_unregister_locked(xprt); 389 mtx_unlock(&grp->sg_lock); 390 391 if (xprt->xp_socket != NULL) 392 soshutdown(xprt->xp_socket, SHUT_WR); 393 SVC_RELEASE(xprt); 394 } 395 396 /* 397 * Attempt to assign a service thread to this transport. 398 */ 399 static int 400 xprt_assignthread(SVCXPRT *xprt) 401 { 402 SVCGROUP *grp = xprt->xp_group; 403 SVCTHREAD *st; 404 405 mtx_assert(&grp->sg_lock, MA_OWNED); 406 st = LIST_FIRST(&grp->sg_idlethreads); 407 if (st) { 408 LIST_REMOVE(st, st_ilink); 409 SVC_ACQUIRE(xprt); 410 xprt->xp_thread = st; 411 st->st_xprt = xprt; 412 cv_signal(&st->st_cond); 413 return (TRUE); 414 } else { 415 /* 416 * See if we can create a new thread. The 417 * actual thread creation happens in 418 * svc_run_internal because our locking state 419 * is poorly defined (we are typically called 420 * from a socket upcall). Don't create more 421 * than one thread per second. 422 */ 423 if (grp->sg_state == SVCPOOL_ACTIVE 424 && grp->sg_lastcreatetime < time_uptime 425 && grp->sg_threadcount < grp->sg_maxthreads) { 426 grp->sg_state = SVCPOOL_THREADWANTED; 427 } 428 } 429 return (FALSE); 430 } 431 432 void 433 xprt_active(SVCXPRT *xprt) 434 { 435 SVCGROUP *grp = xprt->xp_group; 436 437 mtx_lock(&grp->sg_lock); 438 439 if (!xprt->xp_registered) { 440 /* 441 * Race with xprt_unregister - we lose. 442 */ 443 mtx_unlock(&grp->sg_lock); 444 return; 445 } 446 447 if (!xprt->xp_active) { 448 xprt->xp_active = TRUE; 449 if (xprt->xp_thread == NULL) { 450 if (!svc_request_space_available(xprt->xp_pool) || 451 !xprt_assignthread(xprt)) 452 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 453 xp_alink); 454 } 455 } 456 457 mtx_unlock(&grp->sg_lock); 458 } 459 460 void 461 xprt_inactive_locked(SVCXPRT *xprt) 462 { 463 SVCGROUP *grp = xprt->xp_group; 464 465 mtx_assert(&grp->sg_lock, MA_OWNED); 466 if (xprt->xp_active) { 467 if (xprt->xp_thread == NULL) 468 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 469 xprt->xp_active = FALSE; 470 } 471 } 472 473 void 474 xprt_inactive(SVCXPRT *xprt) 475 { 476 SVCGROUP *grp = xprt->xp_group; 477 478 mtx_lock(&grp->sg_lock); 479 xprt_inactive_locked(xprt); 480 mtx_unlock(&grp->sg_lock); 481 } 482 483 /* 484 * Variant of xprt_inactive() for use only when sure that port is 485 * assigned to thread. For example, within receive handlers. 486 */ 487 void 488 xprt_inactive_self(SVCXPRT *xprt) 489 { 490 491 KASSERT(xprt->xp_thread != NULL, 492 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 493 xprt->xp_active = FALSE; 494 } 495 496 /* 497 * Add a service program to the callout list. 498 * The dispatch routine will be called when a rpc request for this 499 * program number comes in. 500 */ 501 bool_t 502 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 503 void (*dispatch)(struct svc_req *, SVCXPRT *), 504 const struct netconfig *nconf) 505 { 506 SVCPOOL *pool = xprt->xp_pool; 507 struct svc_callout *s; 508 char *netid = NULL; 509 int flag = 0; 510 511 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 512 513 if (xprt->xp_netid) { 514 netid = strdup(xprt->xp_netid, M_RPC); 515 flag = 1; 516 } else if (nconf && nconf->nc_netid) { 517 netid = strdup(nconf->nc_netid, M_RPC); 518 flag = 1; 519 } /* must have been created with svc_raw_create */ 520 if ((netid == NULL) && (flag == 1)) { 521 return (FALSE); 522 } 523 524 mtx_lock(&pool->sp_lock); 525 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 526 if (netid) 527 free(netid, M_RPC); 528 if (s->sc_dispatch == dispatch) 529 goto rpcb_it; /* he is registering another xptr */ 530 mtx_unlock(&pool->sp_lock); 531 return (FALSE); 532 } 533 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 534 if (s == NULL) { 535 if (netid) 536 free(netid, M_RPC); 537 mtx_unlock(&pool->sp_lock); 538 return (FALSE); 539 } 540 541 s->sc_prog = prog; 542 s->sc_vers = vers; 543 s->sc_dispatch = dispatch; 544 s->sc_netid = netid; 545 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 546 547 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 548 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 549 550 rpcb_it: 551 mtx_unlock(&pool->sp_lock); 552 /* now register the information with the local binder service */ 553 if (nconf) { 554 bool_t dummy; 555 struct netconfig tnc; 556 struct netbuf nb; 557 tnc = *nconf; 558 nb.buf = &xprt->xp_ltaddr; 559 nb.len = xprt->xp_ltaddr.ss_len; 560 dummy = rpcb_set(prog, vers, &tnc, &nb); 561 return (dummy); 562 } 563 return (TRUE); 564 } 565 566 /* 567 * Remove a service program from the callout list. 568 */ 569 void 570 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 571 { 572 struct svc_callout *s; 573 574 /* unregister the information anyway */ 575 (void) rpcb_unset(prog, vers, NULL); 576 mtx_lock(&pool->sp_lock); 577 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 578 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 579 if (s->sc_netid) 580 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 581 mem_free(s, sizeof (struct svc_callout)); 582 } 583 mtx_unlock(&pool->sp_lock); 584 } 585 586 /* 587 * Add a service connection loss program to the callout list. 588 * The dispatch routine will be called when some port in ths pool die. 589 */ 590 bool_t 591 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 592 { 593 SVCPOOL *pool = xprt->xp_pool; 594 struct svc_loss_callout *s; 595 596 mtx_lock(&pool->sp_lock); 597 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 598 if (s->slc_dispatch == dispatch) 599 break; 600 } 601 if (s != NULL) { 602 mtx_unlock(&pool->sp_lock); 603 return (TRUE); 604 } 605 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 606 if (s == NULL) { 607 mtx_unlock(&pool->sp_lock); 608 return (FALSE); 609 } 610 s->slc_dispatch = dispatch; 611 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 612 mtx_unlock(&pool->sp_lock); 613 return (TRUE); 614 } 615 616 /* 617 * Remove a service connection loss program from the callout list. 618 */ 619 void 620 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 621 { 622 struct svc_loss_callout *s; 623 624 mtx_lock(&pool->sp_lock); 625 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 626 if (s->slc_dispatch == dispatch) { 627 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 628 free(s, M_RPC); 629 break; 630 } 631 } 632 mtx_unlock(&pool->sp_lock); 633 } 634 635 /* ********************** CALLOUT list related stuff ************* */ 636 637 /* 638 * Search the callout list for a program number, return the callout 639 * struct. 640 */ 641 static struct svc_callout * 642 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 643 { 644 struct svc_callout *s; 645 646 mtx_assert(&pool->sp_lock, MA_OWNED); 647 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 648 if (s->sc_prog == prog && s->sc_vers == vers 649 && (netid == NULL || s->sc_netid == NULL || 650 strcmp(netid, s->sc_netid) == 0)) 651 break; 652 } 653 654 return (s); 655 } 656 657 /* ******************* REPLY GENERATION ROUTINES ************ */ 658 659 static bool_t 660 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 661 struct mbuf *body) 662 { 663 SVCXPRT *xprt = rqstp->rq_xprt; 664 bool_t ok; 665 666 if (rqstp->rq_args) { 667 m_freem(rqstp->rq_args); 668 rqstp->rq_args = NULL; 669 } 670 671 if (xprt->xp_pool->sp_rcache) 672 replay_setreply(xprt->xp_pool->sp_rcache, 673 rply, svc_getrpccaller(rqstp), body); 674 675 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 676 return (FALSE); 677 678 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 679 if (rqstp->rq_addr) { 680 free(rqstp->rq_addr, M_SONAME); 681 rqstp->rq_addr = NULL; 682 } 683 684 return (ok); 685 } 686 687 /* 688 * Send a reply to an rpc request 689 */ 690 bool_t 691 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 692 { 693 struct rpc_msg rply; 694 struct mbuf *m; 695 XDR xdrs; 696 bool_t ok; 697 698 rply.rm_xid = rqstp->rq_xid; 699 rply.rm_direction = REPLY; 700 rply.rm_reply.rp_stat = MSG_ACCEPTED; 701 rply.acpted_rply.ar_verf = rqstp->rq_verf; 702 rply.acpted_rply.ar_stat = SUCCESS; 703 rply.acpted_rply.ar_results.where = NULL; 704 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 705 706 m = m_getcl(M_WAITOK, MT_DATA, 0); 707 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 708 ok = xdr_results(&xdrs, xdr_location); 709 XDR_DESTROY(&xdrs); 710 711 if (ok) { 712 return (svc_sendreply_common(rqstp, &rply, m)); 713 } else { 714 m_freem(m); 715 return (FALSE); 716 } 717 } 718 719 bool_t 720 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 721 { 722 struct rpc_msg rply; 723 724 rply.rm_xid = rqstp->rq_xid; 725 rply.rm_direction = REPLY; 726 rply.rm_reply.rp_stat = MSG_ACCEPTED; 727 rply.acpted_rply.ar_verf = rqstp->rq_verf; 728 rply.acpted_rply.ar_stat = SUCCESS; 729 rply.acpted_rply.ar_results.where = NULL; 730 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 731 732 return (svc_sendreply_common(rqstp, &rply, m)); 733 } 734 735 /* 736 * No procedure error reply 737 */ 738 void 739 svcerr_noproc(struct svc_req *rqstp) 740 { 741 SVCXPRT *xprt = rqstp->rq_xprt; 742 struct rpc_msg rply; 743 744 rply.rm_xid = rqstp->rq_xid; 745 rply.rm_direction = REPLY; 746 rply.rm_reply.rp_stat = MSG_ACCEPTED; 747 rply.acpted_rply.ar_verf = rqstp->rq_verf; 748 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 749 750 if (xprt->xp_pool->sp_rcache) 751 replay_setreply(xprt->xp_pool->sp_rcache, 752 &rply, svc_getrpccaller(rqstp), NULL); 753 754 svc_sendreply_common(rqstp, &rply, NULL); 755 } 756 757 /* 758 * Can't decode args error reply 759 */ 760 void 761 svcerr_decode(struct svc_req *rqstp) 762 { 763 SVCXPRT *xprt = rqstp->rq_xprt; 764 struct rpc_msg rply; 765 766 rply.rm_xid = rqstp->rq_xid; 767 rply.rm_direction = REPLY; 768 rply.rm_reply.rp_stat = MSG_ACCEPTED; 769 rply.acpted_rply.ar_verf = rqstp->rq_verf; 770 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 771 772 if (xprt->xp_pool->sp_rcache) 773 replay_setreply(xprt->xp_pool->sp_rcache, 774 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 775 776 svc_sendreply_common(rqstp, &rply, NULL); 777 } 778 779 /* 780 * Some system error 781 */ 782 void 783 svcerr_systemerr(struct svc_req *rqstp) 784 { 785 SVCXPRT *xprt = rqstp->rq_xprt; 786 struct rpc_msg rply; 787 788 rply.rm_xid = rqstp->rq_xid; 789 rply.rm_direction = REPLY; 790 rply.rm_reply.rp_stat = MSG_ACCEPTED; 791 rply.acpted_rply.ar_verf = rqstp->rq_verf; 792 rply.acpted_rply.ar_stat = SYSTEM_ERR; 793 794 if (xprt->xp_pool->sp_rcache) 795 replay_setreply(xprt->xp_pool->sp_rcache, 796 &rply, svc_getrpccaller(rqstp), NULL); 797 798 svc_sendreply_common(rqstp, &rply, NULL); 799 } 800 801 /* 802 * Authentication error reply 803 */ 804 void 805 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 806 { 807 SVCXPRT *xprt = rqstp->rq_xprt; 808 struct rpc_msg rply; 809 810 rply.rm_xid = rqstp->rq_xid; 811 rply.rm_direction = REPLY; 812 rply.rm_reply.rp_stat = MSG_DENIED; 813 rply.rjcted_rply.rj_stat = AUTH_ERROR; 814 rply.rjcted_rply.rj_why = why; 815 816 if (xprt->xp_pool->sp_rcache) 817 replay_setreply(xprt->xp_pool->sp_rcache, 818 &rply, svc_getrpccaller(rqstp), NULL); 819 820 svc_sendreply_common(rqstp, &rply, NULL); 821 } 822 823 /* 824 * Auth too weak error reply 825 */ 826 void 827 svcerr_weakauth(struct svc_req *rqstp) 828 { 829 830 svcerr_auth(rqstp, AUTH_TOOWEAK); 831 } 832 833 /* 834 * Program unavailable error reply 835 */ 836 void 837 svcerr_noprog(struct svc_req *rqstp) 838 { 839 SVCXPRT *xprt = rqstp->rq_xprt; 840 struct rpc_msg rply; 841 842 rply.rm_xid = rqstp->rq_xid; 843 rply.rm_direction = REPLY; 844 rply.rm_reply.rp_stat = MSG_ACCEPTED; 845 rply.acpted_rply.ar_verf = rqstp->rq_verf; 846 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 847 848 if (xprt->xp_pool->sp_rcache) 849 replay_setreply(xprt->xp_pool->sp_rcache, 850 &rply, svc_getrpccaller(rqstp), NULL); 851 852 svc_sendreply_common(rqstp, &rply, NULL); 853 } 854 855 /* 856 * Program version mismatch error reply 857 */ 858 void 859 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 860 { 861 SVCXPRT *xprt = rqstp->rq_xprt; 862 struct rpc_msg rply; 863 864 rply.rm_xid = rqstp->rq_xid; 865 rply.rm_direction = REPLY; 866 rply.rm_reply.rp_stat = MSG_ACCEPTED; 867 rply.acpted_rply.ar_verf = rqstp->rq_verf; 868 rply.acpted_rply.ar_stat = PROG_MISMATCH; 869 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 870 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 871 872 if (xprt->xp_pool->sp_rcache) 873 replay_setreply(xprt->xp_pool->sp_rcache, 874 &rply, svc_getrpccaller(rqstp), NULL); 875 876 svc_sendreply_common(rqstp, &rply, NULL); 877 } 878 879 /* 880 * Allocate a new server transport structure. All fields are 881 * initialized to zero and xp_p3 is initialized to point at an 882 * extension structure to hold various flags and authentication 883 * parameters. 884 */ 885 SVCXPRT * 886 svc_xprt_alloc(void) 887 { 888 SVCXPRT *xprt; 889 SVCXPRT_EXT *ext; 890 891 xprt = mem_alloc(sizeof(SVCXPRT)); 892 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 893 xprt->xp_p3 = ext; 894 refcount_init(&xprt->xp_refs, 1); 895 896 return (xprt); 897 } 898 899 /* 900 * Free a server transport structure. 901 */ 902 void 903 svc_xprt_free(SVCXPRT *xprt) 904 { 905 906 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 907 /* The size argument is ignored, so 0 is ok. */ 908 mem_free(xprt->xp_gidp, 0); 909 mem_free(xprt, sizeof(SVCXPRT)); 910 } 911 912 /* ******************* SERVER INPUT STUFF ******************* */ 913 914 /* 915 * Read RPC requests from a transport and queue them to be 916 * executed. We handle authentication and replay cache replies here. 917 * Actually dispatching the RPC is deferred till svc_executereq. 918 */ 919 static enum xprt_stat 920 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 921 { 922 SVCPOOL *pool = xprt->xp_pool; 923 struct svc_req *r; 924 struct rpc_msg msg; 925 struct mbuf *args; 926 struct svc_loss_callout *s; 927 enum xprt_stat stat; 928 929 /* now receive msgs from xprtprt (support batch calls) */ 930 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 931 932 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 933 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 934 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 935 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 936 enum auth_stat why; 937 938 /* 939 * Handle replays and authenticate before queuing the 940 * request to be executed. 941 */ 942 SVC_ACQUIRE(xprt); 943 r->rq_xprt = xprt; 944 if (pool->sp_rcache) { 945 struct rpc_msg repmsg; 946 struct mbuf *repbody; 947 enum replay_state rs; 948 rs = replay_find(pool->sp_rcache, &msg, 949 svc_getrpccaller(r), &repmsg, &repbody); 950 switch (rs) { 951 case RS_NEW: 952 break; 953 case RS_DONE: 954 SVC_REPLY(xprt, &repmsg, r->rq_addr, 955 repbody, &r->rq_reply_seq); 956 if (r->rq_addr) { 957 free(r->rq_addr, M_SONAME); 958 r->rq_addr = NULL; 959 } 960 m_freem(args); 961 goto call_done; 962 963 default: 964 m_freem(args); 965 goto call_done; 966 } 967 } 968 969 r->rq_xid = msg.rm_xid; 970 r->rq_prog = msg.rm_call.cb_prog; 971 r->rq_vers = msg.rm_call.cb_vers; 972 r->rq_proc = msg.rm_call.cb_proc; 973 r->rq_size = sizeof(*r) + m_length(args, NULL); 974 r->rq_args = args; 975 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 976 /* 977 * RPCSEC_GSS uses this return code 978 * for requests that form part of its 979 * context establishment protocol and 980 * should not be dispatched to the 981 * application. 982 */ 983 if (why != RPCSEC_GSS_NODISPATCH) 984 svcerr_auth(r, why); 985 goto call_done; 986 } 987 988 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 989 svcerr_decode(r); 990 goto call_done; 991 } 992 993 /* 994 * Defer enabling DDP until the first non-NULLPROC RPC 995 * is received to allow STARTTLS authentication to 996 * enable TLS offload first. 997 */ 998 if (xprt->xp_doneddp == 0 && r->rq_proc != NULLPROC && 999 xprt->xp_socket != NULL && 1000 atomic_cmpset_int(&xprt->xp_doneddp, 0, 1)) { 1001 if (xprt->xp_socket->so_proto->pr_protocol == 1002 IPPROTO_TCP) { 1003 int optval = 1; 1004 1005 (void)so_setsockopt(xprt->xp_socket, 1006 IPPROTO_TCP, TCP_USE_DDP, &optval, 1007 sizeof(optval)); 1008 } 1009 } 1010 1011 /* 1012 * Everything checks out, return request to caller. 1013 */ 1014 *rqstp_ret = r; 1015 r = NULL; 1016 } 1017 call_done: 1018 if (r) { 1019 svc_freereq(r); 1020 r = NULL; 1021 } 1022 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1023 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1024 (*s->slc_dispatch)(xprt); 1025 xprt_unregister(xprt); 1026 } 1027 1028 return (stat); 1029 } 1030 1031 static void 1032 svc_executereq(struct svc_req *rqstp) 1033 { 1034 SVCXPRT *xprt = rqstp->rq_xprt; 1035 SVCPOOL *pool = xprt->xp_pool; 1036 int prog_found; 1037 rpcvers_t low_vers; 1038 rpcvers_t high_vers; 1039 struct svc_callout *s; 1040 1041 /* now match message with a registered service*/ 1042 prog_found = FALSE; 1043 low_vers = (rpcvers_t) -1L; 1044 high_vers = (rpcvers_t) 0L; 1045 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1046 if (s->sc_prog == rqstp->rq_prog) { 1047 if (s->sc_vers == rqstp->rq_vers) { 1048 /* 1049 * We hand ownership of r to the 1050 * dispatch method - they must call 1051 * svc_freereq. 1052 */ 1053 (*s->sc_dispatch)(rqstp, xprt); 1054 return; 1055 } /* found correct version */ 1056 prog_found = TRUE; 1057 if (s->sc_vers < low_vers) 1058 low_vers = s->sc_vers; 1059 if (s->sc_vers > high_vers) 1060 high_vers = s->sc_vers; 1061 } /* found correct program */ 1062 } 1063 1064 /* 1065 * if we got here, the program or version 1066 * is not served ... 1067 */ 1068 if (prog_found) 1069 svcerr_progvers(rqstp, low_vers, high_vers); 1070 else 1071 svcerr_noprog(rqstp); 1072 1073 svc_freereq(rqstp); 1074 } 1075 1076 static void 1077 svc_checkidle(SVCGROUP *grp) 1078 { 1079 SVCXPRT *xprt, *nxprt; 1080 time_t timo; 1081 struct svcxprt_list cleanup; 1082 1083 TAILQ_INIT(&cleanup); 1084 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1085 /* 1086 * Only some transports have idle timers. Don't time 1087 * something out which is just waking up. 1088 */ 1089 if (!xprt->xp_idletimeout || xprt->xp_thread) 1090 continue; 1091 1092 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1093 if (time_uptime > timo) { 1094 xprt_unregister_locked(xprt); 1095 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1096 } 1097 } 1098 1099 mtx_unlock(&grp->sg_lock); 1100 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1101 soshutdown(xprt->xp_socket, SHUT_WR); 1102 SVC_RELEASE(xprt); 1103 } 1104 mtx_lock(&grp->sg_lock); 1105 } 1106 1107 static void 1108 svc_assign_waiting_sockets(SVCPOOL *pool) 1109 { 1110 SVCGROUP *grp; 1111 SVCXPRT *xprt; 1112 int g; 1113 1114 for (g = 0; g < pool->sp_groupcount; g++) { 1115 grp = &pool->sp_groups[g]; 1116 mtx_lock(&grp->sg_lock); 1117 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1118 if (xprt_assignthread(xprt)) 1119 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1120 else 1121 break; 1122 } 1123 mtx_unlock(&grp->sg_lock); 1124 } 1125 } 1126 1127 static void 1128 svc_change_space_used(SVCPOOL *pool, long delta) 1129 { 1130 unsigned long value; 1131 1132 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1133 if (delta > 0) { 1134 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1135 pool->sp_space_throttled = TRUE; 1136 pool->sp_space_throttle_count++; 1137 } 1138 if (value > pool->sp_space_used_highest) 1139 pool->sp_space_used_highest = value; 1140 } else { 1141 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1142 pool->sp_space_throttled = FALSE; 1143 svc_assign_waiting_sockets(pool); 1144 } 1145 } 1146 } 1147 1148 static bool_t 1149 svc_request_space_available(SVCPOOL *pool) 1150 { 1151 1152 if (pool->sp_space_throttled) 1153 return (FALSE); 1154 return (TRUE); 1155 } 1156 1157 static void 1158 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1159 { 1160 SVCPOOL *pool = grp->sg_pool; 1161 SVCTHREAD *st, *stpref; 1162 SVCXPRT *xprt; 1163 enum xprt_stat stat; 1164 struct svc_req *rqstp; 1165 struct proc *p; 1166 long sz; 1167 int error; 1168 1169 st = mem_alloc(sizeof(*st)); 1170 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1171 st->st_pool = pool; 1172 st->st_xprt = NULL; 1173 STAILQ_INIT(&st->st_reqs); 1174 cv_init(&st->st_cond, "rpcsvc"); 1175 1176 mtx_lock(&grp->sg_lock); 1177 1178 /* 1179 * If we are a new thread which was spawned to cope with 1180 * increased load, set the state back to SVCPOOL_ACTIVE. 1181 */ 1182 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1183 grp->sg_state = SVCPOOL_ACTIVE; 1184 1185 while (grp->sg_state != SVCPOOL_CLOSING) { 1186 /* 1187 * Create new thread if requested. 1188 */ 1189 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1190 grp->sg_state = SVCPOOL_THREADSTARTING; 1191 grp->sg_lastcreatetime = time_uptime; 1192 mtx_unlock(&grp->sg_lock); 1193 svc_new_thread(grp); 1194 mtx_lock(&grp->sg_lock); 1195 continue; 1196 } 1197 1198 /* 1199 * Check for idle transports once per second. 1200 */ 1201 if (time_uptime > grp->sg_lastidlecheck) { 1202 grp->sg_lastidlecheck = time_uptime; 1203 svc_checkidle(grp); 1204 } 1205 1206 xprt = st->st_xprt; 1207 if (!xprt) { 1208 /* 1209 * Enforce maxthreads count. 1210 */ 1211 if (!ismaster && grp->sg_threadcount > 1212 grp->sg_maxthreads) 1213 break; 1214 1215 /* 1216 * Before sleeping, see if we can find an 1217 * active transport which isn't being serviced 1218 * by a thread. 1219 */ 1220 if (svc_request_space_available(pool) && 1221 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1222 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1223 SVC_ACQUIRE(xprt); 1224 xprt->xp_thread = st; 1225 st->st_xprt = xprt; 1226 continue; 1227 } 1228 1229 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1230 if (ismaster || (!ismaster && 1231 grp->sg_threadcount > grp->sg_minthreads)) 1232 error = cv_timedwait_sig(&st->st_cond, 1233 &grp->sg_lock, 5 * hz); 1234 else 1235 error = cv_wait_sig(&st->st_cond, 1236 &grp->sg_lock); 1237 if (st->st_xprt == NULL) 1238 LIST_REMOVE(st, st_ilink); 1239 1240 /* 1241 * Reduce worker thread count when idle. 1242 */ 1243 if (error == EWOULDBLOCK) { 1244 if (!ismaster 1245 && (grp->sg_threadcount 1246 > grp->sg_minthreads) 1247 && !st->st_xprt) 1248 break; 1249 } else if (error != 0) { 1250 KASSERT(error == EINTR || error == ERESTART, 1251 ("non-signal error %d", error)); 1252 mtx_unlock(&grp->sg_lock); 1253 p = curproc; 1254 PROC_LOCK(p); 1255 if (P_SHOULDSTOP(p) || 1256 (p->p_flag & P_TOTAL_STOP) != 0) { 1257 thread_suspend_check(0); 1258 PROC_UNLOCK(p); 1259 mtx_lock(&grp->sg_lock); 1260 } else { 1261 PROC_UNLOCK(p); 1262 svc_exit(pool); 1263 mtx_lock(&grp->sg_lock); 1264 break; 1265 } 1266 } 1267 continue; 1268 } 1269 mtx_unlock(&grp->sg_lock); 1270 1271 /* 1272 * Drain the transport socket and queue up any RPCs. 1273 */ 1274 xprt->xp_lastactive = time_uptime; 1275 do { 1276 if (!svc_request_space_available(pool)) 1277 break; 1278 rqstp = NULL; 1279 stat = svc_getreq(xprt, &rqstp); 1280 if (rqstp) { 1281 svc_change_space_used(pool, rqstp->rq_size); 1282 /* 1283 * See if the application has a preference 1284 * for some other thread. 1285 */ 1286 if (pool->sp_assign) { 1287 stpref = pool->sp_assign(st, rqstp); 1288 rqstp->rq_thread = stpref; 1289 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1290 rqstp, rq_link); 1291 mtx_unlock(&stpref->st_lock); 1292 if (stpref != st) 1293 rqstp = NULL; 1294 } else { 1295 rqstp->rq_thread = st; 1296 STAILQ_INSERT_TAIL(&st->st_reqs, 1297 rqstp, rq_link); 1298 } 1299 } 1300 } while (rqstp == NULL && stat == XPRT_MOREREQS 1301 && grp->sg_state != SVCPOOL_CLOSING); 1302 1303 /* 1304 * Move this transport to the end of the active list to 1305 * ensure fairness when multiple transports are active. 1306 * If this was the last queued request, svc_getreq will end 1307 * up calling xprt_inactive to remove from the active list. 1308 */ 1309 mtx_lock(&grp->sg_lock); 1310 xprt->xp_thread = NULL; 1311 st->st_xprt = NULL; 1312 if (xprt->xp_active) { 1313 if (!svc_request_space_available(pool) || 1314 !xprt_assignthread(xprt)) 1315 TAILQ_INSERT_TAIL(&grp->sg_active, 1316 xprt, xp_alink); 1317 } 1318 mtx_unlock(&grp->sg_lock); 1319 SVC_RELEASE(xprt); 1320 1321 /* 1322 * Execute what we have queued. 1323 */ 1324 mtx_lock(&st->st_lock); 1325 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1326 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1327 mtx_unlock(&st->st_lock); 1328 sz = (long)rqstp->rq_size; 1329 svc_executereq(rqstp); 1330 svc_change_space_used(pool, -sz); 1331 mtx_lock(&st->st_lock); 1332 } 1333 mtx_unlock(&st->st_lock); 1334 mtx_lock(&grp->sg_lock); 1335 } 1336 1337 if (st->st_xprt) { 1338 xprt = st->st_xprt; 1339 st->st_xprt = NULL; 1340 SVC_RELEASE(xprt); 1341 } 1342 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1343 mtx_destroy(&st->st_lock); 1344 cv_destroy(&st->st_cond); 1345 mem_free(st, sizeof(*st)); 1346 1347 grp->sg_threadcount--; 1348 if (!ismaster) 1349 wakeup(grp); 1350 mtx_unlock(&grp->sg_lock); 1351 } 1352 1353 static void 1354 svc_thread_start(void *arg) 1355 { 1356 1357 svc_run_internal((SVCGROUP *) arg, FALSE); 1358 kthread_exit(); 1359 } 1360 1361 static void 1362 svc_new_thread(SVCGROUP *grp) 1363 { 1364 SVCPOOL *pool = grp->sg_pool; 1365 struct thread *td; 1366 1367 mtx_lock(&grp->sg_lock); 1368 grp->sg_threadcount++; 1369 mtx_unlock(&grp->sg_lock); 1370 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1371 "%s: service", pool->sp_name); 1372 } 1373 1374 void 1375 svc_run(SVCPOOL *pool) 1376 { 1377 int g, i; 1378 struct proc *p; 1379 struct thread *td; 1380 SVCGROUP *grp; 1381 1382 p = curproc; 1383 td = curthread; 1384 snprintf(td->td_name, sizeof(td->td_name), 1385 "%s: master", pool->sp_name); 1386 pool->sp_state = SVCPOOL_ACTIVE; 1387 pool->sp_proc = p; 1388 1389 /* Choose group count based on number of threads and CPUs. */ 1390 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1391 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1392 for (g = 0; g < pool->sp_groupcount; g++) { 1393 grp = &pool->sp_groups[g]; 1394 grp->sg_minthreads = max(1, 1395 pool->sp_minthreads / pool->sp_groupcount); 1396 grp->sg_maxthreads = max(1, 1397 pool->sp_maxthreads / pool->sp_groupcount); 1398 grp->sg_lastcreatetime = time_uptime; 1399 } 1400 1401 /* Starting threads */ 1402 pool->sp_groups[0].sg_threadcount++; 1403 for (g = 0; g < pool->sp_groupcount; g++) { 1404 grp = &pool->sp_groups[g]; 1405 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1406 svc_new_thread(grp); 1407 } 1408 svc_run_internal(&pool->sp_groups[0], TRUE); 1409 1410 /* Waiting for threads to stop. */ 1411 for (g = 0; g < pool->sp_groupcount; g++) { 1412 grp = &pool->sp_groups[g]; 1413 mtx_lock(&grp->sg_lock); 1414 while (grp->sg_threadcount > 0) 1415 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1416 mtx_unlock(&grp->sg_lock); 1417 } 1418 } 1419 1420 void 1421 svc_exit(SVCPOOL *pool) 1422 { 1423 SVCGROUP *grp; 1424 SVCTHREAD *st; 1425 int g; 1426 1427 pool->sp_state = SVCPOOL_CLOSING; 1428 for (g = 0; g < pool->sp_groupcount; g++) { 1429 grp = &pool->sp_groups[g]; 1430 mtx_lock(&grp->sg_lock); 1431 if (grp->sg_state != SVCPOOL_CLOSING) { 1432 grp->sg_state = SVCPOOL_CLOSING; 1433 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1434 cv_signal(&st->st_cond); 1435 } 1436 mtx_unlock(&grp->sg_lock); 1437 } 1438 } 1439 1440 bool_t 1441 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1442 { 1443 struct mbuf *m; 1444 XDR xdrs; 1445 bool_t stat; 1446 1447 m = rqstp->rq_args; 1448 rqstp->rq_args = NULL; 1449 1450 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1451 stat = xargs(&xdrs, args); 1452 XDR_DESTROY(&xdrs); 1453 1454 return (stat); 1455 } 1456 1457 bool_t 1458 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1459 { 1460 XDR xdrs; 1461 1462 if (rqstp->rq_addr) { 1463 free(rqstp->rq_addr, M_SONAME); 1464 rqstp->rq_addr = NULL; 1465 } 1466 1467 xdrs.x_op = XDR_FREE; 1468 return (xargs(&xdrs, args)); 1469 } 1470 1471 void 1472 svc_freereq(struct svc_req *rqstp) 1473 { 1474 SVCTHREAD *st; 1475 SVCPOOL *pool; 1476 1477 st = rqstp->rq_thread; 1478 if (st) { 1479 pool = st->st_pool; 1480 if (pool->sp_done) 1481 pool->sp_done(st, rqstp); 1482 } 1483 1484 if (rqstp->rq_auth.svc_ah_ops) 1485 SVCAUTH_RELEASE(&rqstp->rq_auth); 1486 1487 if (rqstp->rq_xprt) { 1488 SVC_RELEASE(rqstp->rq_xprt); 1489 } 1490 1491 if (rqstp->rq_addr) 1492 free(rqstp->rq_addr, M_SONAME); 1493 1494 if (rqstp->rq_args) 1495 m_freem(rqstp->rq_args); 1496 1497 free(rqstp, M_RPC); 1498 } 1499