1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3 /*- 4 * SPDX-License-Identifier: BSD-3-Clause 5 * 6 * Copyright (c) 2009, Sun Microsystems, Inc. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * - Redistributions of source code must retain the above copyright notice, 12 * this list of conditions and the following disclaimer. 13 * - Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * - Neither the name of Sun Microsystems, Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived 18 * from this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 30 * POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #if defined(LIBC_SCCS) && !defined(lint) 34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 36 #endif 37 #include <sys/cdefs.h> 38 __FBSDID("$FreeBSD$"); 39 40 /* 41 * svc.c, Server-side remote procedure call interface. 42 * 43 * There are two sets of procedures here. The xprt routines are 44 * for handling transport handles. The svc routines handle the 45 * list of service routines. 46 * 47 * Copyright (C) 1984, Sun Microsystems, Inc. 48 */ 49 50 #include <sys/param.h> 51 #include <sys/lock.h> 52 #include <sys/kernel.h> 53 #include <sys/kthread.h> 54 #include <sys/malloc.h> 55 #include <sys/mbuf.h> 56 #include <sys/mutex.h> 57 #include <sys/proc.h> 58 #include <sys/queue.h> 59 #include <sys/socketvar.h> 60 #include <sys/systm.h> 61 #include <sys/smp.h> 62 #include <sys/sx.h> 63 #include <sys/ucred.h> 64 65 #include <rpc/rpc.h> 66 #include <rpc/rpcb_clnt.h> 67 #include <rpc/replay.h> 68 69 #include <rpc/rpc_com.h> 70 71 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 73 74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 75 char *); 76 static void svc_new_thread(SVCGROUP *grp); 77 static void xprt_unregister_locked(SVCXPRT *xprt); 78 static void svc_change_space_used(SVCPOOL *pool, long delta); 79 static bool_t svc_request_space_available(SVCPOOL *pool); 80 static void svcpool_cleanup(SVCPOOL *pool); 81 82 /* *************** SVCXPRT related stuff **************** */ 83 84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS); 87 88 SVCPOOL* 89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 90 { 91 SVCPOOL *pool; 92 SVCGROUP *grp; 93 int g; 94 95 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 96 97 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 98 pool->sp_name = name; 99 pool->sp_state = SVCPOOL_INIT; 100 pool->sp_proc = NULL; 101 TAILQ_INIT(&pool->sp_callouts); 102 TAILQ_INIT(&pool->sp_lcallouts); 103 pool->sp_minthreads = 1; 104 pool->sp_maxthreads = 1; 105 pool->sp_groupcount = 1; 106 for (g = 0; g < SVC_MAXGROUPS; g++) { 107 grp = &pool->sp_groups[g]; 108 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF); 109 grp->sg_pool = pool; 110 grp->sg_state = SVCPOOL_ACTIVE; 111 TAILQ_INIT(&grp->sg_xlist); 112 TAILQ_INIT(&grp->sg_active); 113 LIST_INIT(&grp->sg_idlethreads); 114 grp->sg_minthreads = 1; 115 grp->sg_maxthreads = 1; 116 } 117 118 /* 119 * Don't use more than a quarter of mbuf clusters. Nota bene: 120 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow 121 * on LP64 architectures, so cast to u_long to avoid undefined 122 * behavior. (ILP32 architectures cannot have nmbclusters 123 * large enough to overflow for other reasons.) 124 */ 125 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4; 126 pool->sp_space_low = (pool->sp_space_high / 3) * 2; 127 128 sysctl_ctx_init(&pool->sp_sysctl); 129 if (sysctl_base) { 130 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 131 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 132 pool, 0, svcpool_minthread_sysctl, "I", 133 "Minimal number of threads"); 134 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 135 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE, 136 pool, 0, svcpool_maxthread_sysctl, "I", 137 "Maximal number of threads"); 138 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 139 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE, 140 pool, 0, svcpool_threads_sysctl, "I", 141 "Current number of threads"); 142 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 143 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0, 144 "Number of thread groups"); 145 146 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_used", CTLFLAG_RD, 148 &pool->sp_space_used, 149 "Space in parsed but not handled requests."); 150 151 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 152 "request_space_used_highest", CTLFLAG_RD, 153 &pool->sp_space_used_highest, 154 "Highest space used since reboot."); 155 156 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 157 "request_space_high", CTLFLAG_RW, 158 &pool->sp_space_high, 159 "Maximum space in parsed but not handled requests."); 160 161 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO, 162 "request_space_low", CTLFLAG_RW, 163 &pool->sp_space_low, 164 "Low water mark for request space."); 165 166 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 167 "request_space_throttled", CTLFLAG_RD, 168 &pool->sp_space_throttled, 0, 169 "Whether nfs requests are currently throttled"); 170 171 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 172 "request_space_throttle_count", CTLFLAG_RD, 173 &pool->sp_space_throttle_count, 0, 174 "Count of times throttling based on request space has occurred"); 175 } 176 177 return pool; 178 } 179 180 /* 181 * Code common to svcpool_destroy() and svcpool_close(), which cleans up 182 * the pool data structures. 183 */ 184 static void 185 svcpool_cleanup(SVCPOOL *pool) 186 { 187 SVCGROUP *grp; 188 SVCXPRT *xprt, *nxprt; 189 struct svc_callout *s; 190 struct svc_loss_callout *sl; 191 struct svcxprt_list cleanup; 192 int g; 193 194 TAILQ_INIT(&cleanup); 195 196 for (g = 0; g < SVC_MAXGROUPS; g++) { 197 grp = &pool->sp_groups[g]; 198 mtx_lock(&grp->sg_lock); 199 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) { 200 xprt_unregister_locked(xprt); 201 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 202 } 203 mtx_unlock(&grp->sg_lock); 204 } 205 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 206 if (xprt->xp_socket != NULL) 207 soshutdown(xprt->xp_socket, SHUT_WR); 208 SVC_RELEASE(xprt); 209 } 210 211 mtx_lock(&pool->sp_lock); 212 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) { 213 mtx_unlock(&pool->sp_lock); 214 svc_unreg(pool, s->sc_prog, s->sc_vers); 215 mtx_lock(&pool->sp_lock); 216 } 217 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) { 218 mtx_unlock(&pool->sp_lock); 219 svc_loss_unreg(pool, sl->slc_dispatch); 220 mtx_lock(&pool->sp_lock); 221 } 222 mtx_unlock(&pool->sp_lock); 223 } 224 225 void 226 svcpool_destroy(SVCPOOL *pool) 227 { 228 SVCGROUP *grp; 229 int g; 230 231 svcpool_cleanup(pool); 232 233 for (g = 0; g < SVC_MAXGROUPS; g++) { 234 grp = &pool->sp_groups[g]; 235 mtx_destroy(&grp->sg_lock); 236 } 237 mtx_destroy(&pool->sp_lock); 238 239 if (pool->sp_rcache) 240 replay_freecache(pool->sp_rcache); 241 242 sysctl_ctx_free(&pool->sp_sysctl); 243 free(pool, M_RPC); 244 } 245 246 /* 247 * Similar to svcpool_destroy(), except that it does not destroy the actual 248 * data structures. As such, "pool" may be used again. 249 */ 250 void 251 svcpool_close(SVCPOOL *pool) 252 { 253 SVCGROUP *grp; 254 int g; 255 256 svcpool_cleanup(pool); 257 258 /* Now, initialize the pool's state for a fresh svc_run() call. */ 259 mtx_lock(&pool->sp_lock); 260 pool->sp_state = SVCPOOL_INIT; 261 mtx_unlock(&pool->sp_lock); 262 for (g = 0; g < SVC_MAXGROUPS; g++) { 263 grp = &pool->sp_groups[g]; 264 mtx_lock(&grp->sg_lock); 265 grp->sg_state = SVCPOOL_ACTIVE; 266 mtx_unlock(&grp->sg_lock); 267 } 268 } 269 270 /* 271 * Sysctl handler to get the present thread count on a pool 272 */ 273 static int 274 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS) 275 { 276 SVCPOOL *pool; 277 int threads, error, g; 278 279 pool = oidp->oid_arg1; 280 threads = 0; 281 mtx_lock(&pool->sp_lock); 282 for (g = 0; g < pool->sp_groupcount; g++) 283 threads += pool->sp_groups[g].sg_threadcount; 284 mtx_unlock(&pool->sp_lock); 285 error = sysctl_handle_int(oidp, &threads, 0, req); 286 return (error); 287 } 288 289 /* 290 * Sysctl handler to set the minimum thread count on a pool 291 */ 292 static int 293 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 294 { 295 SVCPOOL *pool; 296 int newminthreads, error, g; 297 298 pool = oidp->oid_arg1; 299 newminthreads = pool->sp_minthreads; 300 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 301 if (error == 0 && newminthreads != pool->sp_minthreads) { 302 if (newminthreads > pool->sp_maxthreads) 303 return (EINVAL); 304 mtx_lock(&pool->sp_lock); 305 pool->sp_minthreads = newminthreads; 306 for (g = 0; g < pool->sp_groupcount; g++) { 307 pool->sp_groups[g].sg_minthreads = max(1, 308 pool->sp_minthreads / pool->sp_groupcount); 309 } 310 mtx_unlock(&pool->sp_lock); 311 } 312 return (error); 313 } 314 315 /* 316 * Sysctl handler to set the maximum thread count on a pool 317 */ 318 static int 319 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 320 { 321 SVCPOOL *pool; 322 int newmaxthreads, error, g; 323 324 pool = oidp->oid_arg1; 325 newmaxthreads = pool->sp_maxthreads; 326 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 327 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 328 if (newmaxthreads < pool->sp_minthreads) 329 return (EINVAL); 330 mtx_lock(&pool->sp_lock); 331 pool->sp_maxthreads = newmaxthreads; 332 for (g = 0; g < pool->sp_groupcount; g++) { 333 pool->sp_groups[g].sg_maxthreads = max(1, 334 pool->sp_maxthreads / pool->sp_groupcount); 335 } 336 mtx_unlock(&pool->sp_lock); 337 } 338 return (error); 339 } 340 341 /* 342 * Activate a transport handle. 343 */ 344 void 345 xprt_register(SVCXPRT *xprt) 346 { 347 SVCPOOL *pool = xprt->xp_pool; 348 SVCGROUP *grp; 349 int g; 350 351 SVC_ACQUIRE(xprt); 352 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount; 353 xprt->xp_group = grp = &pool->sp_groups[g]; 354 mtx_lock(&grp->sg_lock); 355 xprt->xp_registered = TRUE; 356 xprt->xp_active = FALSE; 357 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link); 358 mtx_unlock(&grp->sg_lock); 359 } 360 361 /* 362 * De-activate a transport handle. Note: the locked version doesn't 363 * release the transport - caller must do that after dropping the pool 364 * lock. 365 */ 366 static void 367 xprt_unregister_locked(SVCXPRT *xprt) 368 { 369 SVCGROUP *grp = xprt->xp_group; 370 371 mtx_assert(&grp->sg_lock, MA_OWNED); 372 KASSERT(xprt->xp_registered == TRUE, 373 ("xprt_unregister_locked: not registered")); 374 xprt_inactive_locked(xprt); 375 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link); 376 xprt->xp_registered = FALSE; 377 } 378 379 void 380 xprt_unregister(SVCXPRT *xprt) 381 { 382 SVCGROUP *grp = xprt->xp_group; 383 384 mtx_lock(&grp->sg_lock); 385 if (xprt->xp_registered == FALSE) { 386 /* Already unregistered by another thread */ 387 mtx_unlock(&grp->sg_lock); 388 return; 389 } 390 xprt_unregister_locked(xprt); 391 mtx_unlock(&grp->sg_lock); 392 393 if (xprt->xp_socket != NULL) 394 soshutdown(xprt->xp_socket, SHUT_WR); 395 SVC_RELEASE(xprt); 396 } 397 398 /* 399 * Attempt to assign a service thread to this transport. 400 */ 401 static int 402 xprt_assignthread(SVCXPRT *xprt) 403 { 404 SVCGROUP *grp = xprt->xp_group; 405 SVCTHREAD *st; 406 407 mtx_assert(&grp->sg_lock, MA_OWNED); 408 st = LIST_FIRST(&grp->sg_idlethreads); 409 if (st) { 410 LIST_REMOVE(st, st_ilink); 411 SVC_ACQUIRE(xprt); 412 xprt->xp_thread = st; 413 st->st_xprt = xprt; 414 cv_signal(&st->st_cond); 415 return (TRUE); 416 } else { 417 /* 418 * See if we can create a new thread. The 419 * actual thread creation happens in 420 * svc_run_internal because our locking state 421 * is poorly defined (we are typically called 422 * from a socket upcall). Don't create more 423 * than one thread per second. 424 */ 425 if (grp->sg_state == SVCPOOL_ACTIVE 426 && grp->sg_lastcreatetime < time_uptime 427 && grp->sg_threadcount < grp->sg_maxthreads) { 428 grp->sg_state = SVCPOOL_THREADWANTED; 429 } 430 } 431 return (FALSE); 432 } 433 434 void 435 xprt_active(SVCXPRT *xprt) 436 { 437 SVCGROUP *grp = xprt->xp_group; 438 439 mtx_lock(&grp->sg_lock); 440 441 if (!xprt->xp_registered) { 442 /* 443 * Race with xprt_unregister - we lose. 444 */ 445 mtx_unlock(&grp->sg_lock); 446 return; 447 } 448 449 if (!xprt->xp_active) { 450 xprt->xp_active = TRUE; 451 if (xprt->xp_thread == NULL) { 452 if (!svc_request_space_available(xprt->xp_pool) || 453 !xprt_assignthread(xprt)) 454 TAILQ_INSERT_TAIL(&grp->sg_active, xprt, 455 xp_alink); 456 } 457 } 458 459 mtx_unlock(&grp->sg_lock); 460 } 461 462 void 463 xprt_inactive_locked(SVCXPRT *xprt) 464 { 465 SVCGROUP *grp = xprt->xp_group; 466 467 mtx_assert(&grp->sg_lock, MA_OWNED); 468 if (xprt->xp_active) { 469 if (xprt->xp_thread == NULL) 470 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 471 xprt->xp_active = FALSE; 472 } 473 } 474 475 void 476 xprt_inactive(SVCXPRT *xprt) 477 { 478 SVCGROUP *grp = xprt->xp_group; 479 480 mtx_lock(&grp->sg_lock); 481 xprt_inactive_locked(xprt); 482 mtx_unlock(&grp->sg_lock); 483 } 484 485 /* 486 * Variant of xprt_inactive() for use only when sure that port is 487 * assigned to thread. For example, within receive handlers. 488 */ 489 void 490 xprt_inactive_self(SVCXPRT *xprt) 491 { 492 493 KASSERT(xprt->xp_thread != NULL, 494 ("xprt_inactive_self(%p) with NULL xp_thread", xprt)); 495 xprt->xp_active = FALSE; 496 } 497 498 /* 499 * Add a service program to the callout list. 500 * The dispatch routine will be called when a rpc request for this 501 * program number comes in. 502 */ 503 bool_t 504 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 505 void (*dispatch)(struct svc_req *, SVCXPRT *), 506 const struct netconfig *nconf) 507 { 508 SVCPOOL *pool = xprt->xp_pool; 509 struct svc_callout *s; 510 char *netid = NULL; 511 int flag = 0; 512 513 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 514 515 if (xprt->xp_netid) { 516 netid = strdup(xprt->xp_netid, M_RPC); 517 flag = 1; 518 } else if (nconf && nconf->nc_netid) { 519 netid = strdup(nconf->nc_netid, M_RPC); 520 flag = 1; 521 } /* must have been created with svc_raw_create */ 522 if ((netid == NULL) && (flag == 1)) { 523 return (FALSE); 524 } 525 526 mtx_lock(&pool->sp_lock); 527 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 528 if (netid) 529 free(netid, M_RPC); 530 if (s->sc_dispatch == dispatch) 531 goto rpcb_it; /* he is registering another xptr */ 532 mtx_unlock(&pool->sp_lock); 533 return (FALSE); 534 } 535 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 536 if (s == NULL) { 537 if (netid) 538 free(netid, M_RPC); 539 mtx_unlock(&pool->sp_lock); 540 return (FALSE); 541 } 542 543 s->sc_prog = prog; 544 s->sc_vers = vers; 545 s->sc_dispatch = dispatch; 546 s->sc_netid = netid; 547 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 548 549 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 550 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 551 552 rpcb_it: 553 mtx_unlock(&pool->sp_lock); 554 /* now register the information with the local binder service */ 555 if (nconf) { 556 bool_t dummy; 557 struct netconfig tnc; 558 struct netbuf nb; 559 tnc = *nconf; 560 nb.buf = &xprt->xp_ltaddr; 561 nb.len = xprt->xp_ltaddr.ss_len; 562 dummy = rpcb_set(prog, vers, &tnc, &nb); 563 return (dummy); 564 } 565 return (TRUE); 566 } 567 568 /* 569 * Remove a service program from the callout list. 570 */ 571 void 572 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 573 { 574 struct svc_callout *s; 575 576 /* unregister the information anyway */ 577 (void) rpcb_unset(prog, vers, NULL); 578 mtx_lock(&pool->sp_lock); 579 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 580 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 581 if (s->sc_netid) 582 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 583 mem_free(s, sizeof (struct svc_callout)); 584 } 585 mtx_unlock(&pool->sp_lock); 586 } 587 588 /* 589 * Add a service connection loss program to the callout list. 590 * The dispatch routine will be called when some port in ths pool die. 591 */ 592 bool_t 593 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *)) 594 { 595 SVCPOOL *pool = xprt->xp_pool; 596 struct svc_loss_callout *s; 597 598 mtx_lock(&pool->sp_lock); 599 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 600 if (s->slc_dispatch == dispatch) 601 break; 602 } 603 if (s != NULL) { 604 mtx_unlock(&pool->sp_lock); 605 return (TRUE); 606 } 607 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT); 608 if (s == NULL) { 609 mtx_unlock(&pool->sp_lock); 610 return (FALSE); 611 } 612 s->slc_dispatch = dispatch; 613 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link); 614 mtx_unlock(&pool->sp_lock); 615 return (TRUE); 616 } 617 618 /* 619 * Remove a service connection loss program from the callout list. 620 */ 621 void 622 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *)) 623 { 624 struct svc_loss_callout *s; 625 626 mtx_lock(&pool->sp_lock); 627 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) { 628 if (s->slc_dispatch == dispatch) { 629 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link); 630 free(s, M_RPC); 631 break; 632 } 633 } 634 mtx_unlock(&pool->sp_lock); 635 } 636 637 /* ********************** CALLOUT list related stuff ************* */ 638 639 /* 640 * Search the callout list for a program number, return the callout 641 * struct. 642 */ 643 static struct svc_callout * 644 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 645 { 646 struct svc_callout *s; 647 648 mtx_assert(&pool->sp_lock, MA_OWNED); 649 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 650 if (s->sc_prog == prog && s->sc_vers == vers 651 && (netid == NULL || s->sc_netid == NULL || 652 strcmp(netid, s->sc_netid) == 0)) 653 break; 654 } 655 656 return (s); 657 } 658 659 /* ******************* REPLY GENERATION ROUTINES ************ */ 660 661 static bool_t 662 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 663 struct mbuf *body) 664 { 665 SVCXPRT *xprt = rqstp->rq_xprt; 666 bool_t ok; 667 668 if (rqstp->rq_args) { 669 m_freem(rqstp->rq_args); 670 rqstp->rq_args = NULL; 671 } 672 673 if (xprt->xp_pool->sp_rcache) 674 replay_setreply(xprt->xp_pool->sp_rcache, 675 rply, svc_getrpccaller(rqstp), body); 676 677 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 678 return (FALSE); 679 680 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq); 681 if (rqstp->rq_addr) { 682 free(rqstp->rq_addr, M_SONAME); 683 rqstp->rq_addr = NULL; 684 } 685 686 return (ok); 687 } 688 689 /* 690 * Send a reply to an rpc request 691 */ 692 bool_t 693 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 694 { 695 struct rpc_msg rply; 696 struct mbuf *m; 697 XDR xdrs; 698 bool_t ok; 699 700 rply.rm_xid = rqstp->rq_xid; 701 rply.rm_direction = REPLY; 702 rply.rm_reply.rp_stat = MSG_ACCEPTED; 703 rply.acpted_rply.ar_verf = rqstp->rq_verf; 704 rply.acpted_rply.ar_stat = SUCCESS; 705 rply.acpted_rply.ar_results.where = NULL; 706 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 707 708 m = m_getcl(M_WAITOK, MT_DATA, 0); 709 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 710 ok = xdr_results(&xdrs, xdr_location); 711 XDR_DESTROY(&xdrs); 712 713 if (ok) { 714 return (svc_sendreply_common(rqstp, &rply, m)); 715 } else { 716 m_freem(m); 717 return (FALSE); 718 } 719 } 720 721 bool_t 722 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 723 { 724 struct rpc_msg rply; 725 726 rply.rm_xid = rqstp->rq_xid; 727 rply.rm_direction = REPLY; 728 rply.rm_reply.rp_stat = MSG_ACCEPTED; 729 rply.acpted_rply.ar_verf = rqstp->rq_verf; 730 rply.acpted_rply.ar_stat = SUCCESS; 731 rply.acpted_rply.ar_results.where = NULL; 732 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 733 734 return (svc_sendreply_common(rqstp, &rply, m)); 735 } 736 737 /* 738 * No procedure error reply 739 */ 740 void 741 svcerr_noproc(struct svc_req *rqstp) 742 { 743 SVCXPRT *xprt = rqstp->rq_xprt; 744 struct rpc_msg rply; 745 746 rply.rm_xid = rqstp->rq_xid; 747 rply.rm_direction = REPLY; 748 rply.rm_reply.rp_stat = MSG_ACCEPTED; 749 rply.acpted_rply.ar_verf = rqstp->rq_verf; 750 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 751 752 if (xprt->xp_pool->sp_rcache) 753 replay_setreply(xprt->xp_pool->sp_rcache, 754 &rply, svc_getrpccaller(rqstp), NULL); 755 756 svc_sendreply_common(rqstp, &rply, NULL); 757 } 758 759 /* 760 * Can't decode args error reply 761 */ 762 void 763 svcerr_decode(struct svc_req *rqstp) 764 { 765 SVCXPRT *xprt = rqstp->rq_xprt; 766 struct rpc_msg rply; 767 768 rply.rm_xid = rqstp->rq_xid; 769 rply.rm_direction = REPLY; 770 rply.rm_reply.rp_stat = MSG_ACCEPTED; 771 rply.acpted_rply.ar_verf = rqstp->rq_verf; 772 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 773 774 if (xprt->xp_pool->sp_rcache) 775 replay_setreply(xprt->xp_pool->sp_rcache, 776 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 777 778 svc_sendreply_common(rqstp, &rply, NULL); 779 } 780 781 /* 782 * Some system error 783 */ 784 void 785 svcerr_systemerr(struct svc_req *rqstp) 786 { 787 SVCXPRT *xprt = rqstp->rq_xprt; 788 struct rpc_msg rply; 789 790 rply.rm_xid = rqstp->rq_xid; 791 rply.rm_direction = REPLY; 792 rply.rm_reply.rp_stat = MSG_ACCEPTED; 793 rply.acpted_rply.ar_verf = rqstp->rq_verf; 794 rply.acpted_rply.ar_stat = SYSTEM_ERR; 795 796 if (xprt->xp_pool->sp_rcache) 797 replay_setreply(xprt->xp_pool->sp_rcache, 798 &rply, svc_getrpccaller(rqstp), NULL); 799 800 svc_sendreply_common(rqstp, &rply, NULL); 801 } 802 803 /* 804 * Authentication error reply 805 */ 806 void 807 svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 808 { 809 SVCXPRT *xprt = rqstp->rq_xprt; 810 struct rpc_msg rply; 811 812 rply.rm_xid = rqstp->rq_xid; 813 rply.rm_direction = REPLY; 814 rply.rm_reply.rp_stat = MSG_DENIED; 815 rply.rjcted_rply.rj_stat = AUTH_ERROR; 816 rply.rjcted_rply.rj_why = why; 817 818 if (xprt->xp_pool->sp_rcache) 819 replay_setreply(xprt->xp_pool->sp_rcache, 820 &rply, svc_getrpccaller(rqstp), NULL); 821 822 svc_sendreply_common(rqstp, &rply, NULL); 823 } 824 825 /* 826 * Auth too weak error reply 827 */ 828 void 829 svcerr_weakauth(struct svc_req *rqstp) 830 { 831 832 svcerr_auth(rqstp, AUTH_TOOWEAK); 833 } 834 835 /* 836 * Program unavailable error reply 837 */ 838 void 839 svcerr_noprog(struct svc_req *rqstp) 840 { 841 SVCXPRT *xprt = rqstp->rq_xprt; 842 struct rpc_msg rply; 843 844 rply.rm_xid = rqstp->rq_xid; 845 rply.rm_direction = REPLY; 846 rply.rm_reply.rp_stat = MSG_ACCEPTED; 847 rply.acpted_rply.ar_verf = rqstp->rq_verf; 848 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 849 850 if (xprt->xp_pool->sp_rcache) 851 replay_setreply(xprt->xp_pool->sp_rcache, 852 &rply, svc_getrpccaller(rqstp), NULL); 853 854 svc_sendreply_common(rqstp, &rply, NULL); 855 } 856 857 /* 858 * Program version mismatch error reply 859 */ 860 void 861 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 862 { 863 SVCXPRT *xprt = rqstp->rq_xprt; 864 struct rpc_msg rply; 865 866 rply.rm_xid = rqstp->rq_xid; 867 rply.rm_direction = REPLY; 868 rply.rm_reply.rp_stat = MSG_ACCEPTED; 869 rply.acpted_rply.ar_verf = rqstp->rq_verf; 870 rply.acpted_rply.ar_stat = PROG_MISMATCH; 871 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 872 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 873 874 if (xprt->xp_pool->sp_rcache) 875 replay_setreply(xprt->xp_pool->sp_rcache, 876 &rply, svc_getrpccaller(rqstp), NULL); 877 878 svc_sendreply_common(rqstp, &rply, NULL); 879 } 880 881 /* 882 * Allocate a new server transport structure. All fields are 883 * initialized to zero and xp_p3 is initialized to point at an 884 * extension structure to hold various flags and authentication 885 * parameters. 886 */ 887 SVCXPRT * 888 svc_xprt_alloc(void) 889 { 890 SVCXPRT *xprt; 891 SVCXPRT_EXT *ext; 892 893 xprt = mem_alloc(sizeof(SVCXPRT)); 894 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 895 xprt->xp_p3 = ext; 896 refcount_init(&xprt->xp_refs, 1); 897 898 return (xprt); 899 } 900 901 /* 902 * Free a server transport structure. 903 */ 904 void 905 svc_xprt_free(SVCXPRT *xprt) 906 { 907 908 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 909 /* The size argument is ignored, so 0 is ok. */ 910 mem_free(xprt->xp_gidp, 0); 911 mem_free(xprt, sizeof(SVCXPRT)); 912 } 913 914 /* ******************* SERVER INPUT STUFF ******************* */ 915 916 /* 917 * Read RPC requests from a transport and queue them to be 918 * executed. We handle authentication and replay cache replies here. 919 * Actually dispatching the RPC is deferred till svc_executereq. 920 */ 921 static enum xprt_stat 922 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 923 { 924 SVCPOOL *pool = xprt->xp_pool; 925 struct svc_req *r; 926 struct rpc_msg msg; 927 struct mbuf *args; 928 struct svc_loss_callout *s; 929 enum xprt_stat stat; 930 931 /* now receive msgs from xprtprt (support batch calls) */ 932 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 933 934 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 935 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 936 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 937 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 938 enum auth_stat why; 939 940 /* 941 * Handle replays and authenticate before queuing the 942 * request to be executed. 943 */ 944 SVC_ACQUIRE(xprt); 945 r->rq_xprt = xprt; 946 if (pool->sp_rcache) { 947 struct rpc_msg repmsg; 948 struct mbuf *repbody; 949 enum replay_state rs; 950 rs = replay_find(pool->sp_rcache, &msg, 951 svc_getrpccaller(r), &repmsg, &repbody); 952 switch (rs) { 953 case RS_NEW: 954 break; 955 case RS_DONE: 956 SVC_REPLY(xprt, &repmsg, r->rq_addr, 957 repbody, &r->rq_reply_seq); 958 if (r->rq_addr) { 959 free(r->rq_addr, M_SONAME); 960 r->rq_addr = NULL; 961 } 962 m_freem(args); 963 goto call_done; 964 965 default: 966 m_freem(args); 967 goto call_done; 968 } 969 } 970 971 r->rq_xid = msg.rm_xid; 972 r->rq_prog = msg.rm_call.cb_prog; 973 r->rq_vers = msg.rm_call.cb_vers; 974 r->rq_proc = msg.rm_call.cb_proc; 975 r->rq_size = sizeof(*r) + m_length(args, NULL); 976 r->rq_args = args; 977 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 978 /* 979 * RPCSEC_GSS uses this return code 980 * for requests that form part of its 981 * context establishment protocol and 982 * should not be dispatched to the 983 * application. 984 */ 985 if (why != RPCSEC_GSS_NODISPATCH) 986 svcerr_auth(r, why); 987 goto call_done; 988 } 989 990 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 991 svcerr_decode(r); 992 goto call_done; 993 } 994 995 /* 996 * Everything checks out, return request to caller. 997 */ 998 *rqstp_ret = r; 999 r = NULL; 1000 } 1001 call_done: 1002 if (r) { 1003 svc_freereq(r); 1004 r = NULL; 1005 } 1006 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 1007 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) 1008 (*s->slc_dispatch)(xprt); 1009 xprt_unregister(xprt); 1010 } 1011 1012 return (stat); 1013 } 1014 1015 static void 1016 svc_executereq(struct svc_req *rqstp) 1017 { 1018 SVCXPRT *xprt = rqstp->rq_xprt; 1019 SVCPOOL *pool = xprt->xp_pool; 1020 int prog_found; 1021 rpcvers_t low_vers; 1022 rpcvers_t high_vers; 1023 struct svc_callout *s; 1024 1025 /* now match message with a registered service*/ 1026 prog_found = FALSE; 1027 low_vers = (rpcvers_t) -1L; 1028 high_vers = (rpcvers_t) 0L; 1029 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 1030 if (s->sc_prog == rqstp->rq_prog) { 1031 if (s->sc_vers == rqstp->rq_vers) { 1032 /* 1033 * We hand ownership of r to the 1034 * dispatch method - they must call 1035 * svc_freereq. 1036 */ 1037 (*s->sc_dispatch)(rqstp, xprt); 1038 return; 1039 } /* found correct version */ 1040 prog_found = TRUE; 1041 if (s->sc_vers < low_vers) 1042 low_vers = s->sc_vers; 1043 if (s->sc_vers > high_vers) 1044 high_vers = s->sc_vers; 1045 } /* found correct program */ 1046 } 1047 1048 /* 1049 * if we got here, the program or version 1050 * is not served ... 1051 */ 1052 if (prog_found) 1053 svcerr_progvers(rqstp, low_vers, high_vers); 1054 else 1055 svcerr_noprog(rqstp); 1056 1057 svc_freereq(rqstp); 1058 } 1059 1060 static void 1061 svc_checkidle(SVCGROUP *grp) 1062 { 1063 SVCXPRT *xprt, *nxprt; 1064 time_t timo; 1065 struct svcxprt_list cleanup; 1066 1067 TAILQ_INIT(&cleanup); 1068 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) { 1069 /* 1070 * Only some transports have idle timers. Don't time 1071 * something out which is just waking up. 1072 */ 1073 if (!xprt->xp_idletimeout || xprt->xp_thread) 1074 continue; 1075 1076 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 1077 if (time_uptime > timo) { 1078 xprt_unregister_locked(xprt); 1079 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 1080 } 1081 } 1082 1083 mtx_unlock(&grp->sg_lock); 1084 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 1085 soshutdown(xprt->xp_socket, SHUT_WR); 1086 SVC_RELEASE(xprt); 1087 } 1088 mtx_lock(&grp->sg_lock); 1089 } 1090 1091 static void 1092 svc_assign_waiting_sockets(SVCPOOL *pool) 1093 { 1094 SVCGROUP *grp; 1095 SVCXPRT *xprt; 1096 int g; 1097 1098 for (g = 0; g < pool->sp_groupcount; g++) { 1099 grp = &pool->sp_groups[g]; 1100 mtx_lock(&grp->sg_lock); 1101 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1102 if (xprt_assignthread(xprt)) 1103 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1104 else 1105 break; 1106 } 1107 mtx_unlock(&grp->sg_lock); 1108 } 1109 } 1110 1111 static void 1112 svc_change_space_used(SVCPOOL *pool, long delta) 1113 { 1114 unsigned long value; 1115 1116 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta; 1117 if (delta > 0) { 1118 if (value >= pool->sp_space_high && !pool->sp_space_throttled) { 1119 pool->sp_space_throttled = TRUE; 1120 pool->sp_space_throttle_count++; 1121 } 1122 if (value > pool->sp_space_used_highest) 1123 pool->sp_space_used_highest = value; 1124 } else { 1125 if (value < pool->sp_space_low && pool->sp_space_throttled) { 1126 pool->sp_space_throttled = FALSE; 1127 svc_assign_waiting_sockets(pool); 1128 } 1129 } 1130 } 1131 1132 static bool_t 1133 svc_request_space_available(SVCPOOL *pool) 1134 { 1135 1136 if (pool->sp_space_throttled) 1137 return (FALSE); 1138 return (TRUE); 1139 } 1140 1141 static void 1142 svc_run_internal(SVCGROUP *grp, bool_t ismaster) 1143 { 1144 SVCPOOL *pool = grp->sg_pool; 1145 SVCTHREAD *st, *stpref; 1146 SVCXPRT *xprt; 1147 enum xprt_stat stat; 1148 struct svc_req *rqstp; 1149 struct proc *p; 1150 long sz; 1151 int error; 1152 1153 st = mem_alloc(sizeof(*st)); 1154 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF); 1155 st->st_pool = pool; 1156 st->st_xprt = NULL; 1157 STAILQ_INIT(&st->st_reqs); 1158 cv_init(&st->st_cond, "rpcsvc"); 1159 1160 mtx_lock(&grp->sg_lock); 1161 1162 /* 1163 * If we are a new thread which was spawned to cope with 1164 * increased load, set the state back to SVCPOOL_ACTIVE. 1165 */ 1166 if (grp->sg_state == SVCPOOL_THREADSTARTING) 1167 grp->sg_state = SVCPOOL_ACTIVE; 1168 1169 while (grp->sg_state != SVCPOOL_CLOSING) { 1170 /* 1171 * Create new thread if requested. 1172 */ 1173 if (grp->sg_state == SVCPOOL_THREADWANTED) { 1174 grp->sg_state = SVCPOOL_THREADSTARTING; 1175 grp->sg_lastcreatetime = time_uptime; 1176 mtx_unlock(&grp->sg_lock); 1177 svc_new_thread(grp); 1178 mtx_lock(&grp->sg_lock); 1179 continue; 1180 } 1181 1182 /* 1183 * Check for idle transports once per second. 1184 */ 1185 if (time_uptime > grp->sg_lastidlecheck) { 1186 grp->sg_lastidlecheck = time_uptime; 1187 svc_checkidle(grp); 1188 } 1189 1190 xprt = st->st_xprt; 1191 if (!xprt) { 1192 /* 1193 * Enforce maxthreads count. 1194 */ 1195 if (!ismaster && grp->sg_threadcount > 1196 grp->sg_maxthreads) 1197 break; 1198 1199 /* 1200 * Before sleeping, see if we can find an 1201 * active transport which isn't being serviced 1202 * by a thread. 1203 */ 1204 if (svc_request_space_available(pool) && 1205 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) { 1206 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink); 1207 SVC_ACQUIRE(xprt); 1208 xprt->xp_thread = st; 1209 st->st_xprt = xprt; 1210 continue; 1211 } 1212 1213 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink); 1214 if (ismaster || (!ismaster && 1215 grp->sg_threadcount > grp->sg_minthreads)) 1216 error = cv_timedwait_sig(&st->st_cond, 1217 &grp->sg_lock, 5 * hz); 1218 else 1219 error = cv_wait_sig(&st->st_cond, 1220 &grp->sg_lock); 1221 if (st->st_xprt == NULL) 1222 LIST_REMOVE(st, st_ilink); 1223 1224 /* 1225 * Reduce worker thread count when idle. 1226 */ 1227 if (error == EWOULDBLOCK) { 1228 if (!ismaster 1229 && (grp->sg_threadcount 1230 > grp->sg_minthreads) 1231 && !st->st_xprt) 1232 break; 1233 } else if (error != 0) { 1234 KASSERT(error == EINTR || error == ERESTART, 1235 ("non-signal error %d", error)); 1236 mtx_unlock(&grp->sg_lock); 1237 p = curproc; 1238 PROC_LOCK(p); 1239 if (P_SHOULDSTOP(p) || 1240 (p->p_flag & P_TOTAL_STOP) != 0) { 1241 thread_suspend_check(0); 1242 PROC_UNLOCK(p); 1243 mtx_lock(&grp->sg_lock); 1244 } else { 1245 PROC_UNLOCK(p); 1246 svc_exit(pool); 1247 mtx_lock(&grp->sg_lock); 1248 break; 1249 } 1250 } 1251 continue; 1252 } 1253 mtx_unlock(&grp->sg_lock); 1254 1255 /* 1256 * Drain the transport socket and queue up any RPCs. 1257 */ 1258 xprt->xp_lastactive = time_uptime; 1259 do { 1260 if (!svc_request_space_available(pool)) 1261 break; 1262 rqstp = NULL; 1263 stat = svc_getreq(xprt, &rqstp); 1264 if (rqstp) { 1265 svc_change_space_used(pool, rqstp->rq_size); 1266 /* 1267 * See if the application has a preference 1268 * for some other thread. 1269 */ 1270 if (pool->sp_assign) { 1271 stpref = pool->sp_assign(st, rqstp); 1272 rqstp->rq_thread = stpref; 1273 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1274 rqstp, rq_link); 1275 mtx_unlock(&stpref->st_lock); 1276 if (stpref != st) 1277 rqstp = NULL; 1278 } else { 1279 rqstp->rq_thread = st; 1280 STAILQ_INSERT_TAIL(&st->st_reqs, 1281 rqstp, rq_link); 1282 } 1283 } 1284 } while (rqstp == NULL && stat == XPRT_MOREREQS 1285 && grp->sg_state != SVCPOOL_CLOSING); 1286 1287 /* 1288 * Move this transport to the end of the active list to 1289 * ensure fairness when multiple transports are active. 1290 * If this was the last queued request, svc_getreq will end 1291 * up calling xprt_inactive to remove from the active list. 1292 */ 1293 mtx_lock(&grp->sg_lock); 1294 xprt->xp_thread = NULL; 1295 st->st_xprt = NULL; 1296 if (xprt->xp_active) { 1297 if (!svc_request_space_available(pool) || 1298 !xprt_assignthread(xprt)) 1299 TAILQ_INSERT_TAIL(&grp->sg_active, 1300 xprt, xp_alink); 1301 } 1302 mtx_unlock(&grp->sg_lock); 1303 SVC_RELEASE(xprt); 1304 1305 /* 1306 * Execute what we have queued. 1307 */ 1308 mtx_lock(&st->st_lock); 1309 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1310 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1311 mtx_unlock(&st->st_lock); 1312 sz = (long)rqstp->rq_size; 1313 svc_executereq(rqstp); 1314 svc_change_space_used(pool, -sz); 1315 mtx_lock(&st->st_lock); 1316 } 1317 mtx_unlock(&st->st_lock); 1318 mtx_lock(&grp->sg_lock); 1319 } 1320 1321 if (st->st_xprt) { 1322 xprt = st->st_xprt; 1323 st->st_xprt = NULL; 1324 SVC_RELEASE(xprt); 1325 } 1326 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1327 mtx_destroy(&st->st_lock); 1328 cv_destroy(&st->st_cond); 1329 mem_free(st, sizeof(*st)); 1330 1331 grp->sg_threadcount--; 1332 if (!ismaster) 1333 wakeup(grp); 1334 mtx_unlock(&grp->sg_lock); 1335 } 1336 1337 static void 1338 svc_thread_start(void *arg) 1339 { 1340 1341 svc_run_internal((SVCGROUP *) arg, FALSE); 1342 kthread_exit(); 1343 } 1344 1345 static void 1346 svc_new_thread(SVCGROUP *grp) 1347 { 1348 SVCPOOL *pool = grp->sg_pool; 1349 struct thread *td; 1350 1351 mtx_lock(&grp->sg_lock); 1352 grp->sg_threadcount++; 1353 mtx_unlock(&grp->sg_lock); 1354 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0, 1355 "%s: service", pool->sp_name); 1356 } 1357 1358 void 1359 svc_run(SVCPOOL *pool) 1360 { 1361 int g, i; 1362 struct proc *p; 1363 struct thread *td; 1364 SVCGROUP *grp; 1365 1366 p = curproc; 1367 td = curthread; 1368 snprintf(td->td_name, sizeof(td->td_name), 1369 "%s: master", pool->sp_name); 1370 pool->sp_state = SVCPOOL_ACTIVE; 1371 pool->sp_proc = p; 1372 1373 /* Choose group count based on number of threads and CPUs. */ 1374 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS, 1375 min(pool->sp_maxthreads / 2, mp_ncpus) / 6)); 1376 for (g = 0; g < pool->sp_groupcount; g++) { 1377 grp = &pool->sp_groups[g]; 1378 grp->sg_minthreads = max(1, 1379 pool->sp_minthreads / pool->sp_groupcount); 1380 grp->sg_maxthreads = max(1, 1381 pool->sp_maxthreads / pool->sp_groupcount); 1382 grp->sg_lastcreatetime = time_uptime; 1383 } 1384 1385 /* Starting threads */ 1386 pool->sp_groups[0].sg_threadcount++; 1387 for (g = 0; g < pool->sp_groupcount; g++) { 1388 grp = &pool->sp_groups[g]; 1389 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++) 1390 svc_new_thread(grp); 1391 } 1392 svc_run_internal(&pool->sp_groups[0], TRUE); 1393 1394 /* Waiting for threads to stop. */ 1395 for (g = 0; g < pool->sp_groupcount; g++) { 1396 grp = &pool->sp_groups[g]; 1397 mtx_lock(&grp->sg_lock); 1398 while (grp->sg_threadcount > 0) 1399 msleep(grp, &grp->sg_lock, 0, "svcexit", 0); 1400 mtx_unlock(&grp->sg_lock); 1401 } 1402 } 1403 1404 void 1405 svc_exit(SVCPOOL *pool) 1406 { 1407 SVCGROUP *grp; 1408 SVCTHREAD *st; 1409 int g; 1410 1411 pool->sp_state = SVCPOOL_CLOSING; 1412 for (g = 0; g < pool->sp_groupcount; g++) { 1413 grp = &pool->sp_groups[g]; 1414 mtx_lock(&grp->sg_lock); 1415 if (grp->sg_state != SVCPOOL_CLOSING) { 1416 grp->sg_state = SVCPOOL_CLOSING; 1417 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink) 1418 cv_signal(&st->st_cond); 1419 } 1420 mtx_unlock(&grp->sg_lock); 1421 } 1422 } 1423 1424 bool_t 1425 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1426 { 1427 struct mbuf *m; 1428 XDR xdrs; 1429 bool_t stat; 1430 1431 m = rqstp->rq_args; 1432 rqstp->rq_args = NULL; 1433 1434 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1435 stat = xargs(&xdrs, args); 1436 XDR_DESTROY(&xdrs); 1437 1438 return (stat); 1439 } 1440 1441 bool_t 1442 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1443 { 1444 XDR xdrs; 1445 1446 if (rqstp->rq_addr) { 1447 free(rqstp->rq_addr, M_SONAME); 1448 rqstp->rq_addr = NULL; 1449 } 1450 1451 xdrs.x_op = XDR_FREE; 1452 return (xargs(&xdrs, args)); 1453 } 1454 1455 void 1456 svc_freereq(struct svc_req *rqstp) 1457 { 1458 SVCTHREAD *st; 1459 SVCPOOL *pool; 1460 1461 st = rqstp->rq_thread; 1462 if (st) { 1463 pool = st->st_pool; 1464 if (pool->sp_done) 1465 pool->sp_done(st, rqstp); 1466 } 1467 1468 if (rqstp->rq_auth.svc_ah_ops) 1469 SVCAUTH_RELEASE(&rqstp->rq_auth); 1470 1471 if (rqstp->rq_xprt) { 1472 SVC_RELEASE(rqstp->rq_xprt); 1473 } 1474 1475 if (rqstp->rq_addr) 1476 free(rqstp->rq_addr, M_SONAME); 1477 1478 if (rqstp->rq_args) 1479 m_freem(rqstp->rq_args); 1480 1481 free(rqstp, M_RPC); 1482 } 1483