1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * svc.c, Server-side remote procedure call interface. 42 * 43 * There are two sets of procedures here. The xprt routines are 44 * for handling transport handles. The svc routines handle the 45 * list of service routines. 46 * 47 * Copyright (C) 1984, Sun Microsystems, Inc. 48 */ 49 50 #include <sys/param.h> 51 #include <sys/jail.h> 52 #include <sys/lock.h> 53 #include <sys/kernel.h> 54 #include <sys/kthread.h> 55 #include <sys/malloc.h> 56 #include <sys/mbuf.h> 57 #include <sys/mutex.h> 58 #include <sys/proc.h> 59 #include <sys/queue.h> 60 #include <sys/socketvar.h> 61 #include <sys/systm.h> 62 #include <sys/smp.h> 63 #include <sys/sx.h> 64 #include <sys/ucred.h> 65 66 #include <rpc/rpc.h> 67 #include <rpc/rpcb_clnt.h> 68 #include <rpc/replay.h> 69 70 #include <rpc/rpc_com.h> 71 72 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 73 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 74 75 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 76 char *); 77 static void svc_new_thread(SVCGROUP *grp); 78 static void xprt_unregister_locked(SVCXPRT *xprt); 79 static void svc_change_space_used(SVCPOOL *pool, long delta); 80 static bool_t svc_request_space_available(SVCPOOL *pool); 81 static void svcpool_cleanup(SVCPOOL *pool); 82 83 /* *************** SVCXPRT related stuff **************** */ 84 85 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 86 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 87 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 88 89 SVCPOOL* 90 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 91 { 92 SVCPOOL *pool; 93 SVCGROUP *grp; 94 int g; 95 96 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 97 98 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 99 pool->sp_name = name; 100 pool->sp_state = SVCPOOL_INIT; 101 pool->sp_proc = NULL; 102 TAILQ_INIT(&pool->sp_callouts); 103 TAILQ_INIT(&pool->sp_lcallouts); 104 pool->sp_minthreads = 1; 105 pool->sp_maxthreads = 1; 106 pool->sp_groupcount = 1; 107 for (g = 0; g < SVC_MAXGROUPS; g++) { 108 grp = &pool->sp_groups[g]; 109 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 110 grp->sg_pool = pool; 111 grp->sg_state = SVCPOOL_ACTIVE; 112 TAILQ_INIT(&grp->sg_xlist); 113 TAILQ_INIT(&grp->sg_active); 114 LIST_INIT(&grp->sg_idlethreads); 115 grp->sg_minthreads = 1; 116 grp->sg_maxthreads = 1; 117 } 118 119 /* 120 * Don't use more than a quarter of mbuf clusters. Nota bene: 121 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 122 * on LP64 architectures, so cast to u_long to avoid undefined 123 * behavior. (ILP32 architectures cannot have nmbclusters 124 * large enough to overflow for other reasons.) 125 */ 126 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 127 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 128 129 sysctl_ctx_init(&pool->sp_sysctl); 130 if (!jailed(curthread->td_ucred) && sysctl_base) { 131 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 132 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 133 pool, 0, svcpool_minthread_sysctl, "I", 134 "Minimal number of threads"); 135 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 136 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 137 pool, 0, svcpool_maxthread_sysctl, "I", 138 "Maximal number of threads"); 139 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 140 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 141 pool, 0, svcpool_threads_sysctl, "I", 142 "Current number of threads"); 143 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 144 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 145 "Number of thread groups"); 146 147 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 148 "request_space_used", CTLFLAG_RD, 149 &pool->sp_space_used, 150 "Space in parsed but not handled requests."); 151 152 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 153 "request_space_used_highest", CTLFLAG_RD, 154 &pool->sp_space_used_highest, 155 "Highest space used since reboot."); 156 157 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 158 "request_space_high", CTLFLAG_RW, 159 &pool->sp_space_high, 160 "Maximum space in parsed but not handled requests."); 161 162 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 163 "request_space_low", CTLFLAG_RW, 164 &pool->sp_space_low, 165 "Low water mark for request space."); 166 167 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 168 "request_space_throttled", CTLFLAG_RD, 169 &pool->sp_space_throttled, 0, 170 "Whether nfs requests are currently throttled"); 171 172 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 173 "request_space_throttle_count", CTLFLAG_RD, 174 &pool->sp_space_throttle_count, 0, 175 "Count of times throttling based on request space has occurred"); 176 } 177 178 return pool; 179 } 180 181 /* 182 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 183 * the pool data structures. 184 */ 185 static void 186 svcpool_cleanup(SVCPOOL *pool) 187 { 188 SVCGROUP *grp; 189 SVCXPRT *xprt, *nxprt; 190 struct svc_callout *s; 191 struct svc_loss_callout *sl; 192 struct svcxprt_list cleanup; 193 int g; 194 195 TAILQ_INIT(&cleanup); 196 197 for (g = 0; g < SVC_MAXGROUPS; g++) { 198 grp = &pool->sp_groups[g]; 199 mtx_lock(&grp->sg_lock); 200 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 201 xprt_unregister_locked(xprt); 202 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 203 } 204 mtx_unlock(&grp->sg_lock); 205 } 206 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 207 if (xprt->xp_socket != NULL) 208 soshutdown(xprt->xp_socket, SHUT_WR); 209 SVC_RELEASE(xprt); 210 } 211 212 mtx_lock(&pool->sp_lock); 213 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 214 mtx_unlock(&pool->sp_lock); 215 svc_unreg(pool, s->sc_prog, s->sc_vers); 216 mtx_lock(&pool->sp_lock); 217 } 218 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 219 mtx_unlock(&pool->sp_lock); 220 svc_loss_unreg(pool, sl->slc_dispatch); 221 mtx_lock(&pool->sp_lock); 222 } 223 mtx_unlock(&pool->sp_lock); 224 } 225 226 void 227 svcpool_destroy(SVCPOOL *pool) 228 { 229 SVCGROUP *grp; 230 int g; 231 232 svcpool_cleanup(pool); 233 234 for (g = 0; g < SVC_MAXGROUPS; g++) { 235 grp = &pool->sp_groups[g]; 236 mtx_destroy(&grp->sg_lock); 237 } 238 mtx_destroy(&pool->sp_lock); 239 240 if (pool->sp_rcache) 241 replay_freecache(pool->sp_rcache); 242 243 sysctl_ctx_free(&pool->sp_sysctl); 244 free(pool, M_RPC); 245 } 246 247 /* 248 * Similar to svcpool_destroy(), except that it does not destroy the actual 249 * data structures. As such, "pool" may be used again. 250 */ 251 void 252 svcpool_close(SVCPOOL *pool) 253 { 254 SVCGROUP *grp; 255 int g; 256 257 svcpool_cleanup(pool); 258 259 /* Now, initialize the pool's state for a fresh svc_run() call. */ 260 mtx_lock(&pool->sp_lock); 261 pool->sp_state = SVCPOOL_INIT; 262 mtx_unlock(&pool->sp_lock); 263 for (g = 0; g < SVC_MAXGROUPS; g++) { 264 grp = &pool->sp_groups[g]; 265 mtx_lock(&grp->sg_lock); 266 grp->sg_state = SVCPOOL_ACTIVE; 267 mtx_unlock(&grp->sg_lock); 268 } 269 } 270 271 /* 272 * Sysctl handler to get the present thread count on a pool 273 */ 274 static int 275 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 276 { 277 SVCPOOL *pool; 278 int threads, error, g; 279 280 pool = oidp->oid_arg1; 281 threads = 0; 282 mtx_lock(&pool->sp_lock); 283 for (g = 0; g < pool->sp_groupcount; g++) 284 threads += pool->sp_groups[g].sg_threadcount; 285 mtx_unlock(&pool->sp_lock); 286 error = sysctl_handle_int(oidp, &threads, 0, req); 287 return (error); 288 } 289 290 /* 291 * Sysctl handler to set the minimum thread count on a pool 292 */ 293 static int 294 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 295 { 296 SVCPOOL *pool; 297 int newminthreads, error, g; 298 299 pool = oidp->oid_arg1; 300 newminthreads = pool->sp_minthreads; 301 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 302 if (error == 0 && newminthreads != pool->sp_minthreads) { 303 if (newminthreads > pool->sp_maxthreads) 304 return (EINVAL); 305 mtx_lock(&pool->sp_lock); 306 pool->sp_minthreads = newminthreads; 307 for (g = 0; g < pool->sp_groupcount; g++) { 308 pool->sp_groups[g].sg_minthreads = max(1, 309 pool->sp_minthreads / pool->sp_groupcount); 310 } 311 mtx_unlock(&pool->sp_lock); 312 } 313 return (error); 314 } 315 316 /* 317 * Sysctl handler to set the maximum thread count on a pool 318 */ 319 static int 320 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 321 { 322 SVCPOOL *pool; 323 int newmaxthreads, error, g; 324 325 pool = oidp->oid_arg1; 326 newmaxthreads = pool->sp_maxthreads; 327 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 328 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 329 if (newmaxthreads < pool->sp_minthreads) 330 return (EINVAL); 331 mtx_lock(&pool->sp_lock); 332 pool->sp_maxthreads = newmaxthreads; 333 for (g = 0; g < pool->sp_groupcount; g++) { 334 pool->sp_groups[g].sg_maxthreads = max(1, 335 pool->sp_maxthreads / pool->sp_groupcount); 336 } 337 mtx_unlock(&pool->sp_lock); 338 } 339 return (error); 340 } 341 342 /* 343 * Activate a transport handle. 344 */ 345 void 346 xprt_register(SVCXPRT *xprt) 347 { 348 SVCPOOL *pool = xprt->xp_pool; 349 SVCGROUP *grp; 350 int g; 351 352 SVC_ACQUIRE(xprt); 353 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 354 xprt->xp_group = grp = &pool->sp_groups[g]; 355 mtx_lock(&grp->sg_lock); 356 xprt->xp_registered = TRUE; 357 xprt->xp_active = FALSE; 358 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 359 mtx_unlock(&grp->sg_lock); 360 } 361 362 /* 363 * De-activate a transport handle. Note: the locked version doesn't 364 * release the transport - caller must do that after dropping the pool 365 * lock. 366 */ 367 static void 368 xprt_unregister_locked(SVCXPRT *xprt) 369 { 370 SVCGROUP *grp = xprt->xp_group; 371 372 mtx_assert(&grp->sg_lock, MA_OWNED); 373 KASSERT(xprt->xp_registered == TRUE, 374 ("xprt_unregister_locked: not registered")); 375 xprt_inactive_locked(xprt); 376 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 377 xprt->xp_registered = FALSE; 378 } 379 380 void 381 xprt_unregister(SVCXPRT *xprt) 382 { 383 SVCGROUP *grp = xprt->xp_group; 384 385 mtx_lock(&grp->sg_lock); 386 if (xprt->xp_registered == FALSE) { 387 /* Already unregistered by another thread */ 388 mtx_unlock(&grp->sg_lock); 389 return; 390 } 391 xprt_unregister_locked(xprt); 392 mtx_unlock(&grp->sg_lock); 393 394 if (xprt->xp_socket != NULL) 395 soshutdown(xprt->xp_socket, SHUT_WR); 396 SVC_RELEASE(xprt); 397 } 398 399 /* 400 * Attempt to assign a service thread to this transport. 401 */ 402 static int 403 xprt_assignthread(SVCXPRT *xprt) 404 { 405 SVCGROUP *grp = xprt->xp_group; 406 SVCTHREAD *st; 407 408 mtx_assert(&grp->sg_lock, MA_OWNED); 409 st = LIST_FIRST(&grp->sg_idlethreads); 410 if (st) { 411 LIST_REMOVE(st, st_ilink); 412 SVC_ACQUIRE(xprt); 413 xprt->xp_thread = st; 414 st->st_xprt = xprt; 415 cv_signal(&st->st_cond); 416 return (TRUE); 417 } else { 418 /* 419 * See if we can create a new thread. The 420 * actual thread creation happens in 421 * svc_run_internal because our locking state 422 * is poorly defined (we are typically called 423 * from a socket upcall). Don't create more 424 * than one thread per second. 425 */ 426 if (grp->sg_state == SVCPOOL_ACTIVE 427 && grp->sg_lastcreatetime < time_uptime 428 && grp->sg_threadcount < grp->sg_maxthreads) { 429 grp->sg_state = SVCPOOL_THREADWANTED; 430 } 431 } 432 return (FALSE); 433 } 434 435 void 436 xprt_active(SVCXPRT *xprt) 437 { 438 SVCGROUP *grp = xprt->xp_group; 439 440 mtx_lock(&grp->sg_lock); 441 442 if (!xprt->xp_registered) { 443 /* 444 * Race with xprt_unregister - we lose. 445 */ 446 mtx_unlock(&grp->sg_lock); 447 return; 448 } 449 450 if (!xprt->xp_active) { 451 xprt->xp_active = TRUE; 452 if (xprt->xp_thread == NULL) { 453 if (!svc_request_space_available(xprt->xp_pool) || 454 !xprt_assignthread(xprt)) 455 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 456 xp_alink); 457 } 458 } 459 460 mtx_unlock(&grp->sg_lock); 461 } 462 463 void 464 xprt_inactive_locked(SVCXPRT *xprt) 465 { 466 SVCGROUP *grp = xprt->xp_group; 467 468 mtx_assert(&grp->sg_lock, MA_OWNED); 469 if (xprt->xp_active) { 470 if (xprt->xp_thread == NULL) 471 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 472 xprt->xp_active = FALSE; 473 } 474 } 475 476 void 477 xprt_inactive(SVCXPRT *xprt) 478 { 479 SVCGROUP *grp = xprt->xp_group; 480 481 mtx_lock(&grp->sg_lock); 482 xprt_inactive_locked(xprt); 483 mtx_unlock(&grp->sg_lock); 484 } 485 486 /* 487 * Variant of xprt_inactive() for use only when sure that port is 488 * assigned to thread. For example, within receive handlers. 489 */ 490 void 491 xprt_inactive_self(SVCXPRT *xprt) 492 { 493 494 KASSERT(xprt->xp_thread != NULL, 495 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 496 xprt->xp_active = FALSE; 497 } 498 499 /* 500 * Add a service program to the callout list. 501 * The dispatch routine will be called when a rpc request for this 502 * program number comes in. 503 */ 504 bool_t 505 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 506 void (*dispatch)(struct svc_req *, SVCXPRT *), 507 const struct netconfig *nconf) 508 { 509 SVCPOOL *pool = xprt->xp_pool; 510 struct svc_callout *s; 511 char *netid = NULL; 512 int flag = 0; 513 514 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 515 516 if (xprt->xp_netid) { 517 netid = strdup(xprt->xp_netid, M_RPC); 518 flag = 1; 519 } else if (nconf && nconf->nc_netid) { 520 netid = strdup(nconf->nc_netid, M_RPC); 521 flag = 1; 522 } /* must have been created with svc_raw_create */ 523 if ((netid == NULL) && (flag == 1)) { 524 return (FALSE); 525 } 526 527 mtx_lock(&pool->sp_lock); 528 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 529 if (netid) 530 free(netid, M_RPC); 531 if (s->sc_dispatch == dispatch) 532 goto rpcb_it; /* he is registering another xptr */ 533 mtx_unlock(&pool->sp_lock); 534 return (FALSE); 535 } 536 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 537 if (s == NULL) { 538 if (netid) 539 free(netid, M_RPC); 540 mtx_unlock(&pool->sp_lock); 541 return (FALSE); 542 } 543 544 s->sc_prog = prog; 545 s->sc_vers = vers; 546 s->sc_dispatch = dispatch; 547 s->sc_netid = netid; 548 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 549 550 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 551 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 552 553 rpcb_it: 554 mtx_unlock(&pool->sp_lock); 555 /* now register the information with the local binder service */ 556 if (nconf) { 557 bool_t dummy; 558 struct netconfig tnc; 559 struct netbuf nb; 560 tnc = *nconf; 561 nb.buf = &xprt->xp_ltaddr; 562 nb.len = xprt->xp_ltaddr.ss_len; 563 dummy = rpcb_set(prog, vers, &tnc, &nb); 564 return (dummy); 565 } 566 return (TRUE); 567 } 568 569 /* 570 * Remove a service program from the callout list. 571 */ 572 void 573 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 574 { 575 struct svc_callout *s; 576 577 /* unregister the information anyway */ 578 (void) rpcb_unset(prog, vers, NULL); 579 mtx_lock(&pool->sp_lock); 580 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 581 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 582 if (s->sc_netid) 583 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 584 mem_free(s, sizeof (struct svc_callout)); 585 } 586 mtx_unlock(&pool->sp_lock); 587 } 588 589 /* 590 * Add a service connection loss program to the callout list. 591 * The dispatch routine will be called when some port in ths pool die. 592 */ 593 bool_t 594 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 595 { 596 SVCPOOL *pool = xprt->xp_pool; 597 struct svc_loss_callout *s; 598 599 mtx_lock(&pool->sp_lock); 600 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 601 if (s->slc_dispatch == dispatch) 602 break; 603 } 604 if (s != NULL) { 605 mtx_unlock(&pool->sp_lock); 606 return (TRUE); 607 } 608 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 609 if (s == NULL) { 610 mtx_unlock(&pool->sp_lock); 611 return (FALSE); 612 } 613 s->slc_dispatch = dispatch; 614 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 615 mtx_unlock(&pool->sp_lock); 616 return (TRUE); 617 } 618 619 /* 620 * Remove a service connection loss program from the callout list. 621 */ 622 void 623 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 624 { 625 struct svc_loss_callout *s; 626 627 mtx_lock(&pool->sp_lock); 628 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 629 if (s->slc_dispatch == dispatch) { 630 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 631 free(s, M_RPC); 632 break; 633 } 634 } 635 mtx_unlock(&pool->sp_lock); 636 } 637 638 /* ********************** CALLOUT list related stuff ************* */ 639 640 /* 641 * Search the callout list for a program number, return the callout 642 * struct. 643 */ 644 static struct svc_callout * 645 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 646 { 647 struct svc_callout *s; 648 649 mtx_assert(&pool->sp_lock, MA_OWNED); 650 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 651 if (s->sc_prog == prog && s->sc_vers == vers 652 && (netid == NULL || s->sc_netid == NULL || 653 strcmp(netid, s->sc_netid) == 0)) 654 break; 655 } 656 657 return (s); 658 } 659 660 /* ******************* REPLY GENERATION ROUTINES ************ */ 661 662 static bool_t 663 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 664 struct mbuf *body) 665 { 666 SVCXPRT *xprt = rqstp->rq_xprt; 667 bool_t ok; 668 669 if (rqstp->rq_args) { 670 m_freem(rqstp->rq_args); 671 rqstp->rq_args = NULL; 672 } 673 674 if (xprt->xp_pool->sp_rcache) 675 replay_setreply(xprt->xp_pool->sp_rcache, 676 rply, svc_getrpccaller(rqstp), body); 677 678 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 679 return (FALSE); 680 681 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 682 if (rqstp->rq_addr) { 683 free(rqstp->rq_addr, M_SONAME); 684 rqstp->rq_addr = NULL; 685 } 686 687 return (ok); 688 } 689 690 /* 691 * Send a reply to an rpc request 692 */ 693 bool_t 694 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 695 { 696 struct rpc_msg rply; 697 struct mbuf *m; 698 XDR xdrs; 699 bool_t ok; 700 701 rply.rm_xid = rqstp->rq_xid; 702 rply.rm_direction = REPLY; 703 rply.rm_reply.rp_stat = MSG_ACCEPTED; 704 rply.acpted_rply.ar_verf = rqstp->rq_verf; 705 rply.acpted_rply.ar_stat = SUCCESS; 706 rply.acpted_rply.ar_results.where = NULL; 707 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 708 709 m = m_getcl(M_WAITOK, MT_DATA, 0); 710 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 711 ok = xdr_results(&xdrs, xdr_location); 712 XDR_DESTROY(&xdrs); 713 714 if (ok) { 715 return (svc_sendreply_common(rqstp, &rply, m)); 716 } else { 717 m_freem(m); 718 return (FALSE); 719 } 720 } 721 722 bool_t 723 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 724 { 725 struct rpc_msg rply; 726 727 rply.rm_xid = rqstp->rq_xid; 728 rply.rm_direction = REPLY; 729 rply.rm_reply.rp_stat = MSG_ACCEPTED; 730 rply.acpted_rply.ar_verf = rqstp->rq_verf; 731 rply.acpted_rply.ar_stat = SUCCESS; 732 rply.acpted_rply.ar_results.where = NULL; 733 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 734 735 return (svc_sendreply_common(rqstp, &rply, m)); 736 } 737 738 /* 739 * No procedure error reply 740 */ 741 void 742 svcerr_noproc(struct svc_req *rqstp) 743 { 744 SVCXPRT *xprt = rqstp->rq_xprt; 745 struct rpc_msg rply; 746 747 rply.rm_xid = rqstp->rq_xid; 748 rply.rm_direction = REPLY; 749 rply.rm_reply.rp_stat = MSG_ACCEPTED; 750 rply.acpted_rply.ar_verf = rqstp->rq_verf; 751 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 752 753 if (xprt->xp_pool->sp_rcache) 754 replay_setreply(xprt->xp_pool->sp_rcache, 755 &rply, svc_getrpccaller(rqstp), NULL); 756 757 svc_sendreply_common(rqstp, &rply, NULL); 758 } 759 760 /* 761 * Can't decode args error reply 762 */ 763 void 764 svcerr_decode(struct svc_req *rqstp) 765 { 766 SVCXPRT *xprt = rqstp->rq_xprt; 767 struct rpc_msg rply; 768 769 rply.rm_xid = rqstp->rq_xid; 770 rply.rm_direction = REPLY; 771 rply.rm_reply.rp_stat = MSG_ACCEPTED; 772 rply.acpted_rply.ar_verf = rqstp->rq_verf; 773 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 774 775 if (xprt->xp_pool->sp_rcache) 776 replay_setreply(xprt->xp_pool->sp_rcache, 777 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 778 779 svc_sendreply_common(rqstp, &rply, NULL); 780 } 781 782 /* 783 * Some system error 784 */ 785 void 786 svcerr_systemerr(struct svc_req *rqstp) 787 { 788 SVCXPRT *xprt = rqstp->rq_xprt; 789 struct rpc_msg rply; 790 791 rply.rm_xid = rqstp->rq_xid; 792 rply.rm_direction = REPLY; 793 rply.rm_reply.rp_stat = MSG_ACCEPTED; 794 rply.acpted_rply.ar_verf = rqstp->rq_verf; 795 rply.acpted_rply.ar_stat = SYSTEM_ERR; 796 797 if (xprt->xp_pool->sp_rcache) 798 replay_setreply(xprt->xp_pool->sp_rcache, 799 &rply, svc_getrpccaller(rqstp), NULL); 800 801 svc_sendreply_common(rqstp, &rply, NULL); 802 } 803 804 /* 805 * Authentication error reply 806 */ 807 void 808 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 809 { 810 SVCXPRT *xprt = rqstp->rq_xprt; 811 struct rpc_msg rply; 812 813 rply.rm_xid = rqstp->rq_xid; 814 rply.rm_direction = REPLY; 815 rply.rm_reply.rp_stat = MSG_DENIED; 816 rply.rjcted_rply.rj_stat = AUTH_ERROR; 817 rply.rjcted_rply.rj_why = why; 818 819 if (xprt->xp_pool->sp_rcache) 820 replay_setreply(xprt->xp_pool->sp_rcache, 821 &rply, svc_getrpccaller(rqstp), NULL); 822 823 svc_sendreply_common(rqstp, &rply, NULL); 824 } 825 826 /* 827 * Auth too weak error reply 828 */ 829 void 830 svcerr_weakauth(struct svc_req *rqstp) 831 { 832 833 svcerr_auth(rqstp, AUTH_TOOWEAK); 834 } 835 836 /* 837 * Program unavailable error reply 838 */ 839 void 840 svcerr_noprog(struct svc_req *rqstp) 841 { 842 SVCXPRT *xprt = rqstp->rq_xprt; 843 struct rpc_msg rply; 844 845 rply.rm_xid = rqstp->rq_xid; 846 rply.rm_direction = REPLY; 847 rply.rm_reply.rp_stat = MSG_ACCEPTED; 848 rply.acpted_rply.ar_verf = rqstp->rq_verf; 849 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 850 851 if (xprt->xp_pool->sp_rcache) 852 replay_setreply(xprt->xp_pool->sp_rcache, 853 &rply, svc_getrpccaller(rqstp), NULL); 854 855 svc_sendreply_common(rqstp, &rply, NULL); 856 } 857 858 /* 859 * Program version mismatch error reply 860 */ 861 void 862 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 863 { 864 SVCXPRT *xprt = rqstp->rq_xprt; 865 struct rpc_msg rply; 866 867 rply.rm_xid = rqstp->rq_xid; 868 rply.rm_direction = REPLY; 869 rply.rm_reply.rp_stat = MSG_ACCEPTED; 870 rply.acpted_rply.ar_verf = rqstp->rq_verf; 871 rply.acpted_rply.ar_stat = PROG_MISMATCH; 872 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 873 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 874 875 if (xprt->xp_pool->sp_rcache) 876 replay_setreply(xprt->xp_pool->sp_rcache, 877 &rply, svc_getrpccaller(rqstp), NULL); 878 879 svc_sendreply_common(rqstp, &rply, NULL); 880 } 881 882 /* 883 * Allocate a new server transport structure. All fields are 884 * initialized to zero and xp_p3 is initialized to point at an 885 * extension structure to hold various flags and authentication 886 * parameters. 887 */ 888 SVCXPRT * 889 svc_xprt_alloc(void) 890 { 891 SVCXPRT *xprt; 892 SVCXPRT_EXT *ext; 893 894 xprt = mem_alloc(sizeof(SVCXPRT)); 895 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 896 xprt->xp_p3 = ext; 897 refcount_init(&xprt->xp_refs, 1); 898 899 return (xprt); 900 } 901 902 /* 903 * Free a server transport structure. 904 */ 905 void 906 svc_xprt_free(SVCXPRT *xprt) 907 { 908 909 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 910 /* The size argument is ignored, so 0 is ok. */ 911 mem_free(xprt->xp_gidp, 0); 912 mem_free(xprt, sizeof(SVCXPRT)); 913 } 914 915 /* ******************* SERVER INPUT STUFF ******************* */ 916 917 /* 918 * Read RPC requests from a transport and queue them to be 919 * executed. We handle authentication and replay cache replies here. 920 * Actually dispatching the RPC is deferred till svc_executereq. 921 */ 922 static enum xprt_stat 923 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 924 { 925 SVCPOOL *pool = xprt->xp_pool; 926 struct svc_req *r; 927 struct rpc_msg msg; 928 struct mbuf *args; 929 struct svc_loss_callout *s; 930 enum xprt_stat stat; 931 932 /* now receive msgs from xprtprt (support batch calls) */ 933 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 934 935 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 936 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 937 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 938 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 939 enum auth_stat why; 940 941 /* 942 * Handle replays and authenticate before queuing the 943 * request to be executed. 944 */ 945 SVC_ACQUIRE(xprt); 946 r->rq_xprt = xprt; 947 if (pool->sp_rcache) { 948 struct rpc_msg repmsg; 949 struct mbuf *repbody; 950 enum replay_state rs; 951 rs = replay_find(pool->sp_rcache, &msg, 952 svc_getrpccaller(r), &repmsg, &repbody); 953 switch (rs) { 954 case RS_NEW: 955 break; 956 case RS_DONE: 957 SVC_REPLY(xprt, &repmsg, r->rq_addr, 958 repbody, &r->rq_reply_seq); 959 if (r->rq_addr) { 960 free(r->rq_addr, M_SONAME); 961 r->rq_addr = NULL; 962 } 963 m_freem(args); 964 goto call_done; 965 966 default: 967 m_freem(args); 968 goto call_done; 969 } 970 } 971 972 r->rq_xid = msg.rm_xid; 973 r->rq_prog = msg.rm_call.cb_prog; 974 r->rq_vers = msg.rm_call.cb_vers; 975 r->rq_proc = msg.rm_call.cb_proc; 976 r->rq_size = sizeof(*r) + m_length(args, NULL); 977 r->rq_args = args; 978 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 979 /* 980 * RPCSEC_GSS uses this return code 981 * for requests that form part of its 982 * context establishment protocol and 983 * should not be dispatched to the 984 * application. 985 */ 986 if (why != RPCSEC_GSS_NODISPATCH) 987 svcerr_auth(r, why); 988 goto call_done; 989 } 990 991 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 992 svcerr_decode(r); 993 goto call_done; 994 } 995 996 /* 997 * Everything checks out, return request to caller. 998 */ 999 *rqstp_ret = r; 1000 r = NULL; 1001 } 1002 call_done: 1003 if (r) { 1004 svc_freereq(r); 1005 r = NULL; 1006 } 1007 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1008 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1009 (*s->slc_dispatch)(xprt); 1010 xprt_unregister(xprt); 1011 } 1012 1013 return (stat); 1014 } 1015 1016 static void 1017 svc_executereq(struct svc_req *rqstp) 1018 { 1019 SVCXPRT *xprt = rqstp->rq_xprt; 1020 SVCPOOL *pool = xprt->xp_pool; 1021 int prog_found; 1022 rpcvers_t low_vers; 1023 rpcvers_t high_vers; 1024 struct svc_callout *s; 1025 1026 /* now match message with a registered service*/ 1027 prog_found = FALSE; 1028 low_vers = (rpcvers_t) -1L; 1029 high_vers = (rpcvers_t) 0L; 1030 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1031 if (s->sc_prog == rqstp->rq_prog) { 1032 if (s->sc_vers == rqstp->rq_vers) { 1033 /* 1034 * We hand ownership of r to the 1035 * dispatch method - they must call 1036 * svc_freereq. 1037 */ 1038 (*s->sc_dispatch)(rqstp, xprt); 1039 return; 1040 } /* found correct version */ 1041 prog_found = TRUE; 1042 if (s->sc_vers < low_vers) 1043 low_vers = s->sc_vers; 1044 if (s->sc_vers > high_vers) 1045 high_vers = s->sc_vers; 1046 } /* found correct program */ 1047 } 1048 1049 /* 1050 * if we got here, the program or version 1051 * is not served ... 1052 */ 1053 if (prog_found) 1054 svcerr_progvers(rqstp, low_vers, high_vers); 1055 else 1056 svcerr_noprog(rqstp); 1057 1058 svc_freereq(rqstp); 1059 } 1060 1061 static void 1062 svc_checkidle(SVCGROUP *grp) 1063 { 1064 SVCXPRT *xprt, *nxprt; 1065 time_t timo; 1066 struct svcxprt_list cleanup; 1067 1068 TAILQ_INIT(&cleanup); 1069 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1070 /* 1071 * Only some transports have idle timers. Don't time 1072 * something out which is just waking up. 1073 */ 1074 if (!xprt->xp_idletimeout || xprt->xp_thread) 1075 continue; 1076 1077 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1078 if (time_uptime > timo) { 1079 xprt_unregister_locked(xprt); 1080 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1081 } 1082 } 1083 1084 mtx_unlock(&grp->sg_lock); 1085 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1086 soshutdown(xprt->xp_socket, SHUT_WR); 1087 SVC_RELEASE(xprt); 1088 } 1089 mtx_lock(&grp->sg_lock); 1090 } 1091 1092 static void 1093 svc_assign_waiting_sockets(SVCPOOL *pool) 1094 { 1095 SVCGROUP *grp; 1096 SVCXPRT *xprt; 1097 int g; 1098 1099 for (g = 0; g < pool->sp_groupcount; g++) { 1100 grp = &pool->sp_groups[g]; 1101 mtx_lock(&grp->sg_lock); 1102 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1103 if (xprt_assignthread(xprt)) 1104 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1105 else 1106 break; 1107 } 1108 mtx_unlock(&grp->sg_lock); 1109 } 1110 } 1111 1112 static void 1113 svc_change_space_used(SVCPOOL *pool, long delta) 1114 { 1115 unsigned long value; 1116 1117 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1118 if (delta > 0) { 1119 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1120 pool->sp_space_throttled = TRUE; 1121 pool->sp_space_throttle_count++; 1122 } 1123 if (value > pool->sp_space_used_highest) 1124 pool->sp_space_used_highest = value; 1125 } else { 1126 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1127 pool->sp_space_throttled = FALSE; 1128 svc_assign_waiting_sockets(pool); 1129 } 1130 } 1131 } 1132 1133 static bool_t 1134 svc_request_space_available(SVCPOOL *pool) 1135 { 1136 1137 if (pool->sp_space_throttled) 1138 return (FALSE); 1139 return (TRUE); 1140 } 1141 1142 static void 1143 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1144 { 1145 SVCPOOL *pool = grp->sg_pool; 1146 SVCTHREAD *st, *stpref; 1147 SVCXPRT *xprt; 1148 enum xprt_stat stat; 1149 struct svc_req *rqstp; 1150 struct proc *p; 1151 long sz; 1152 int error; 1153 1154 st = mem_alloc(sizeof(*st)); 1155 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1156 st->st_pool = pool; 1157 st->st_xprt = NULL; 1158 STAILQ_INIT(&st->st_reqs); 1159 cv_init(&st->st_cond, "rpcsvc"); 1160 1161 mtx_lock(&grp->sg_lock); 1162 1163 /* 1164 * If we are a new thread which was spawned to cope with 1165 * increased load, set the state back to SVCPOOL_ACTIVE. 1166 */ 1167 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1168 grp->sg_state = SVCPOOL_ACTIVE; 1169 1170 while (grp->sg_state != SVCPOOL_CLOSING) { 1171 /* 1172 * Create new thread if requested. 1173 */ 1174 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1175 grp->sg_state = SVCPOOL_THREADSTARTING; 1176 grp->sg_lastcreatetime = time_uptime; 1177 mtx_unlock(&grp->sg_lock); 1178 svc_new_thread(grp); 1179 mtx_lock(&grp->sg_lock); 1180 continue; 1181 } 1182 1183 /* 1184 * Check for idle transports once per second. 1185 */ 1186 if (time_uptime > grp->sg_lastidlecheck) { 1187 grp->sg_lastidlecheck = time_uptime; 1188 svc_checkidle(grp); 1189 } 1190 1191 xprt = st->st_xprt; 1192 if (!xprt) { 1193 /* 1194 * Enforce maxthreads count. 1195 */ 1196 if (!ismaster && grp->sg_threadcount > 1197 grp->sg_maxthreads) 1198 break; 1199 1200 /* 1201 * Before sleeping, see if we can find an 1202 * active transport which isn't being serviced 1203 * by a thread. 1204 */ 1205 if (svc_request_space_available(pool) && 1206 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1207 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1208 SVC_ACQUIRE(xprt); 1209 xprt->xp_thread = st; 1210 st->st_xprt = xprt; 1211 continue; 1212 } 1213 1214 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1215 if (ismaster || (!ismaster && 1216 grp->sg_threadcount > grp->sg_minthreads)) 1217 error = cv_timedwait_sig(&st->st_cond, 1218 &grp->sg_lock, 5 * hz); 1219 else 1220 error = cv_wait_sig(&st->st_cond, 1221 &grp->sg_lock); 1222 if (st->st_xprt == NULL) 1223 LIST_REMOVE(st, st_ilink); 1224 1225 /* 1226 * Reduce worker thread count when idle. 1227 */ 1228 if (error == EWOULDBLOCK) { 1229 if (!ismaster 1230 && (grp->sg_threadcount 1231 > grp->sg_minthreads) 1232 && !st->st_xprt) 1233 break; 1234 } else if (error != 0) { 1235 KASSERT(error == EINTR || error == ERESTART, 1236 ("non-signal error %d", error)); 1237 mtx_unlock(&grp->sg_lock); 1238 p = curproc; 1239 PROC_LOCK(p); 1240 if (P_SHOULDSTOP(p) || 1241 (p->p_flag & P_TOTAL_STOP) != 0) { 1242 thread_suspend_check(0); 1243 PROC_UNLOCK(p); 1244 mtx_lock(&grp->sg_lock); 1245 } else { 1246 PROC_UNLOCK(p); 1247 svc_exit(pool); 1248 mtx_lock(&grp->sg_lock); 1249 break; 1250 } 1251 } 1252 continue; 1253 } 1254 mtx_unlock(&grp->sg_lock); 1255 1256 /* 1257 * Drain the transport socket and queue up any RPCs. 1258 */ 1259 xprt->xp_lastactive = time_uptime; 1260 do { 1261 if (!svc_request_space_available(pool)) 1262 break; 1263 rqstp = NULL; 1264 stat = svc_getreq(xprt, &rqstp); 1265 if (rqstp) { 1266 svc_change_space_used(pool, rqstp->rq_size); 1267 /* 1268 * See if the application has a preference 1269 * for some other thread. 1270 */ 1271 if (pool->sp_assign) { 1272 stpref = pool->sp_assign(st, rqstp); 1273 rqstp->rq_thread = stpref; 1274 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1275 rqstp, rq_link); 1276 mtx_unlock(&stpref->st_lock); 1277 if (stpref != st) 1278 rqstp = NULL; 1279 } else { 1280 rqstp->rq_thread = st; 1281 STAILQ_INSERT_TAIL(&st->st_reqs, 1282 rqstp, rq_link); 1283 } 1284 } 1285 } while (rqstp == NULL && stat == XPRT_MOREREQS 1286 && grp->sg_state != SVCPOOL_CLOSING); 1287 1288 /* 1289 * Move this transport to the end of the active list to 1290 * ensure fairness when multiple transports are active. 1291 * If this was the last queued request, svc_getreq will end 1292 * up calling xprt_inactive to remove from the active list. 1293 */ 1294 mtx_lock(&grp->sg_lock); 1295 xprt->xp_thread = NULL; 1296 st->st_xprt = NULL; 1297 if (xprt->xp_active) { 1298 if (!svc_request_space_available(pool) || 1299 !xprt_assignthread(xprt)) 1300 TAILQ_INSERT_TAIL(&grp->sg_active, 1301 xprt, xp_alink); 1302 } 1303 mtx_unlock(&grp->sg_lock); 1304 SVC_RELEASE(xprt); 1305 1306 /* 1307 * Execute what we have queued. 1308 */ 1309 mtx_lock(&st->st_lock); 1310 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1311 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1312 mtx_unlock(&st->st_lock); 1313 sz = (long)rqstp->rq_size; 1314 svc_executereq(rqstp); 1315 svc_change_space_used(pool, -sz); 1316 mtx_lock(&st->st_lock); 1317 } 1318 mtx_unlock(&st->st_lock); 1319 mtx_lock(&grp->sg_lock); 1320 } 1321 1322 if (st->st_xprt) { 1323 xprt = st->st_xprt; 1324 st->st_xprt = NULL; 1325 SVC_RELEASE(xprt); 1326 } 1327 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1328 mtx_destroy(&st->st_lock); 1329 cv_destroy(&st->st_cond); 1330 mem_free(st, sizeof(*st)); 1331 1332 grp->sg_threadcount--; 1333 if (!ismaster) 1334 wakeup(grp); 1335 mtx_unlock(&grp->sg_lock); 1336 } 1337 1338 static void 1339 svc_thread_start(void *arg) 1340 { 1341 1342 svc_run_internal((SVCGROUP *) arg, FALSE); 1343 kthread_exit(); 1344 } 1345 1346 static void 1347 svc_new_thread(SVCGROUP *grp) 1348 { 1349 SVCPOOL *pool = grp->sg_pool; 1350 struct thread *td; 1351 1352 mtx_lock(&grp->sg_lock); 1353 grp->sg_threadcount++; 1354 mtx_unlock(&grp->sg_lock); 1355 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1356 "%s: service", pool->sp_name); 1357 } 1358 1359 void 1360 svc_run(SVCPOOL *pool) 1361 { 1362 int g, i; 1363 struct proc *p; 1364 struct thread *td; 1365 SVCGROUP *grp; 1366 1367 p = curproc; 1368 td = curthread; 1369 snprintf(td->td_name, sizeof(td->td_name), 1370 "%s: master", pool->sp_name); 1371 pool->sp_state = SVCPOOL_ACTIVE; 1372 pool->sp_proc = p; 1373 1374 /* Choose group count based on number of threads and CPUs. */ 1375 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1376 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1377 for (g = 0; g < pool->sp_groupcount; g++) { 1378 grp = &pool->sp_groups[g]; 1379 grp->sg_minthreads = max(1, 1380 pool->sp_minthreads / pool->sp_groupcount); 1381 grp->sg_maxthreads = max(1, 1382 pool->sp_maxthreads / pool->sp_groupcount); 1383 grp->sg_lastcreatetime = time_uptime; 1384 } 1385 1386 /* Starting threads */ 1387 pool->sp_groups[0].sg_threadcount++; 1388 for (g = 0; g < pool->sp_groupcount; g++) { 1389 grp = &pool->sp_groups[g]; 1390 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1391 svc_new_thread(grp); 1392 } 1393 svc_run_internal(&pool->sp_groups[0], TRUE); 1394 1395 /* Waiting for threads to stop. */ 1396 for (g = 0; g < pool->sp_groupcount; g++) { 1397 grp = &pool->sp_groups[g]; 1398 mtx_lock(&grp->sg_lock); 1399 while (grp->sg_threadcount > 0) 1400 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1401 mtx_unlock(&grp->sg_lock); 1402 } 1403 } 1404 1405 void 1406 svc_exit(SVCPOOL *pool) 1407 { 1408 SVCGROUP *grp; 1409 SVCTHREAD *st; 1410 int g; 1411 1412 pool->sp_state = SVCPOOL_CLOSING; 1413 for (g = 0; g < pool->sp_groupcount; g++) { 1414 grp = &pool->sp_groups[g]; 1415 mtx_lock(&grp->sg_lock); 1416 if (grp->sg_state != SVCPOOL_CLOSING) { 1417 grp->sg_state = SVCPOOL_CLOSING; 1418 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1419 cv_signal(&st->st_cond); 1420 } 1421 mtx_unlock(&grp->sg_lock); 1422 } 1423 } 1424 1425 bool_t 1426 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1427 { 1428 struct mbuf *m; 1429 XDR xdrs; 1430 bool_t stat; 1431 1432 m = rqstp->rq_args; 1433 rqstp->rq_args = NULL; 1434 1435 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1436 stat = xargs(&xdrs, args); 1437 XDR_DESTROY(&xdrs); 1438 1439 return (stat); 1440 } 1441 1442 bool_t 1443 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1444 { 1445 XDR xdrs; 1446 1447 if (rqstp->rq_addr) { 1448 free(rqstp->rq_addr, M_SONAME); 1449 rqstp->rq_addr = NULL; 1450 } 1451 1452 xdrs.x_op = XDR_FREE; 1453 return (xargs(&xdrs, args)); 1454 } 1455 1456 void 1457 svc_freereq(struct svc_req *rqstp) 1458 { 1459 SVCTHREAD *st; 1460 SVCPOOL *pool; 1461 1462 st = rqstp->rq_thread; 1463 if (st) { 1464 pool = st->st_pool; 1465 if (pool->sp_done) 1466 pool->sp_done(st, rqstp); 1467 } 1468 1469 if (rqstp->rq_auth.svc_ah_ops) 1470 SVCAUTH_RELEASE(&rqstp->rq_auth); 1471 1472 if (rqstp->rq_xprt) { 1473 SVC_RELEASE(rqstp->rq_xprt); 1474 } 1475 1476 if (rqstp->rq_addr) 1477 free(rqstp->rq_addr, M_SONAME); 1478 1479 if (rqstp->rq_args) 1480 m_freem(rqstp->rq_args); 1481 1482 free(rqstp, M_RPC); 1483 } 1484