xref: /freebsd/sys/rpc/svc.c (revision 59c8e88e72633afbc47a4ace0d2170d00d51f7dc)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice,
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice,
14  *   this list of conditions and the following disclaimer in the documentation
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its
17  *   contributors may be used to endorse or promote products derived
18  *   from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <sys/cdefs.h>
34 /*
35  * svc.c, Server-side remote procedure call interface.
36  *
37  * There are two sets of procedures here.  The xprt routines are
38  * for handling transport handles.  The svc routines handle the
39  * list of service routines.
40  *
41  * Copyright (C) 1984, Sun Microsystems, Inc.
42  */
43 
44 #include <sys/param.h>
45 #include <sys/jail.h>
46 #include <sys/lock.h>
47 #include <sys/kernel.h>
48 #include <sys/kthread.h>
49 #include <sys/malloc.h>
50 #include <sys/mbuf.h>
51 #include <sys/mutex.h>
52 #include <sys/proc.h>
53 #include <sys/queue.h>
54 #include <sys/socketvar.h>
55 #include <sys/systm.h>
56 #include <sys/smp.h>
57 #include <sys/sx.h>
58 #include <sys/ucred.h>
59 
60 #include <rpc/rpc.h>
61 #include <rpc/rpcb_clnt.h>
62 #include <rpc/replay.h>
63 
64 #include <rpc/rpc_com.h>
65 
66 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
67 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
68 
69 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
70     char *);
71 static void svc_new_thread(SVCGROUP *grp);
72 static void xprt_unregister_locked(SVCXPRT *xprt);
73 static void svc_change_space_used(SVCPOOL *pool, long delta);
74 static bool_t svc_request_space_available(SVCPOOL *pool);
75 static void svcpool_cleanup(SVCPOOL *pool);
76 
77 /* ***************  SVCXPRT related stuff **************** */
78 
79 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
80 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
81 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
82 
83 SVCPOOL*
84 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
85 {
86 	SVCPOOL *pool;
87 	SVCGROUP *grp;
88 	int g;
89 
90 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
91 
92 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
93 	pool->sp_name = name;
94 	pool->sp_state = SVCPOOL_INIT;
95 	pool->sp_proc = NULL;
96 	TAILQ_INIT(&pool->sp_callouts);
97 	TAILQ_INIT(&pool->sp_lcallouts);
98 	pool->sp_minthreads = 1;
99 	pool->sp_maxthreads = 1;
100 	pool->sp_groupcount = 1;
101 	for (g = 0; g < SVC_MAXGROUPS; g++) {
102 		grp = &pool->sp_groups[g];
103 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
104 		grp->sg_pool = pool;
105 		grp->sg_state = SVCPOOL_ACTIVE;
106 		TAILQ_INIT(&grp->sg_xlist);
107 		TAILQ_INIT(&grp->sg_active);
108 		LIST_INIT(&grp->sg_idlethreads);
109 		grp->sg_minthreads = 1;
110 		grp->sg_maxthreads = 1;
111 	}
112 
113 	/*
114 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
115 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
116 	 * on LP64 architectures, so cast to u_long to avoid undefined
117 	 * behavior.  (ILP32 architectures cannot have nmbclusters
118 	 * large enough to overflow for other reasons.)
119 	 */
120 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
121 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
122 
123 	sysctl_ctx_init(&pool->sp_sysctl);
124 	if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
125 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
127 		    pool, 0, svcpool_minthread_sysctl, "I",
128 		    "Minimal number of threads");
129 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
130 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
131 		    pool, 0, svcpool_maxthread_sysctl, "I",
132 		    "Maximal number of threads");
133 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
134 		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
135 		    pool, 0, svcpool_threads_sysctl, "I",
136 		    "Current number of threads");
137 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
138 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
139 		    "Number of thread groups");
140 
141 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 		    "request_space_used", CTLFLAG_RD,
143 		    &pool->sp_space_used,
144 		    "Space in parsed but not handled requests.");
145 
146 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 		    "request_space_used_highest", CTLFLAG_RD,
148 		    &pool->sp_space_used_highest,
149 		    "Highest space used since reboot.");
150 
151 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152 		    "request_space_high", CTLFLAG_RW,
153 		    &pool->sp_space_high,
154 		    "Maximum space in parsed but not handled requests.");
155 
156 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157 		    "request_space_low", CTLFLAG_RW,
158 		    &pool->sp_space_low,
159 		    "Low water mark for request space.");
160 
161 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162 		    "request_space_throttled", CTLFLAG_RD,
163 		    &pool->sp_space_throttled, 0,
164 		    "Whether nfs requests are currently throttled");
165 
166 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167 		    "request_space_throttle_count", CTLFLAG_RD,
168 		    &pool->sp_space_throttle_count, 0,
169 		    "Count of times throttling based on request space has occurred");
170 	}
171 
172 	return pool;
173 }
174 
175 /*
176  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
177  * the pool data structures.
178  */
179 static void
180 svcpool_cleanup(SVCPOOL *pool)
181 {
182 	SVCGROUP *grp;
183 	SVCXPRT *xprt, *nxprt;
184 	struct svc_callout *s;
185 	struct svc_loss_callout *sl;
186 	struct svcxprt_list cleanup;
187 	int g;
188 
189 	TAILQ_INIT(&cleanup);
190 
191 	for (g = 0; g < SVC_MAXGROUPS; g++) {
192 		grp = &pool->sp_groups[g];
193 		mtx_lock(&grp->sg_lock);
194 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
195 			xprt_unregister_locked(xprt);
196 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
197 		}
198 		mtx_unlock(&grp->sg_lock);
199 	}
200 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
201 		if (xprt->xp_socket != NULL)
202 			soshutdown(xprt->xp_socket, SHUT_WR);
203 		SVC_RELEASE(xprt);
204 	}
205 
206 	mtx_lock(&pool->sp_lock);
207 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
208 		mtx_unlock(&pool->sp_lock);
209 		svc_unreg(pool, s->sc_prog, s->sc_vers);
210 		mtx_lock(&pool->sp_lock);
211 	}
212 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
213 		mtx_unlock(&pool->sp_lock);
214 		svc_loss_unreg(pool, sl->slc_dispatch);
215 		mtx_lock(&pool->sp_lock);
216 	}
217 	mtx_unlock(&pool->sp_lock);
218 }
219 
220 void
221 svcpool_destroy(SVCPOOL *pool)
222 {
223 	SVCGROUP *grp;
224 	int g;
225 
226 	svcpool_cleanup(pool);
227 
228 	for (g = 0; g < SVC_MAXGROUPS; g++) {
229 		grp = &pool->sp_groups[g];
230 		mtx_destroy(&grp->sg_lock);
231 	}
232 	mtx_destroy(&pool->sp_lock);
233 
234 	if (pool->sp_rcache)
235 		replay_freecache(pool->sp_rcache);
236 
237 	sysctl_ctx_free(&pool->sp_sysctl);
238 	free(pool, M_RPC);
239 }
240 
241 /*
242  * Similar to svcpool_destroy(), except that it does not destroy the actual
243  * data structures.  As such, "pool" may be used again.
244  */
245 void
246 svcpool_close(SVCPOOL *pool)
247 {
248 	SVCGROUP *grp;
249 	int g;
250 
251 	svcpool_cleanup(pool);
252 
253 	/* Now, initialize the pool's state for a fresh svc_run() call. */
254 	mtx_lock(&pool->sp_lock);
255 	pool->sp_state = SVCPOOL_INIT;
256 	mtx_unlock(&pool->sp_lock);
257 	for (g = 0; g < SVC_MAXGROUPS; g++) {
258 		grp = &pool->sp_groups[g];
259 		mtx_lock(&grp->sg_lock);
260 		grp->sg_state = SVCPOOL_ACTIVE;
261 		mtx_unlock(&grp->sg_lock);
262 	}
263 }
264 
265 /*
266  * Sysctl handler to get the present thread count on a pool
267  */
268 static int
269 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
270 {
271 	SVCPOOL *pool;
272 	int threads, error, g;
273 
274 	pool = oidp->oid_arg1;
275 	threads = 0;
276 	mtx_lock(&pool->sp_lock);
277 	for (g = 0; g < pool->sp_groupcount; g++)
278 		threads += pool->sp_groups[g].sg_threadcount;
279 	mtx_unlock(&pool->sp_lock);
280 	error = sysctl_handle_int(oidp, &threads, 0, req);
281 	return (error);
282 }
283 
284 /*
285  * Sysctl handler to set the minimum thread count on a pool
286  */
287 static int
288 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
289 {
290 	SVCPOOL *pool;
291 	int newminthreads, error, g;
292 
293 	pool = oidp->oid_arg1;
294 	newminthreads = pool->sp_minthreads;
295 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
296 	if (error == 0 && newminthreads != pool->sp_minthreads) {
297 		if (newminthreads > pool->sp_maxthreads)
298 			return (EINVAL);
299 		mtx_lock(&pool->sp_lock);
300 		pool->sp_minthreads = newminthreads;
301 		for (g = 0; g < pool->sp_groupcount; g++) {
302 			pool->sp_groups[g].sg_minthreads = max(1,
303 			    pool->sp_minthreads / pool->sp_groupcount);
304 		}
305 		mtx_unlock(&pool->sp_lock);
306 	}
307 	return (error);
308 }
309 
310 /*
311  * Sysctl handler to set the maximum thread count on a pool
312  */
313 static int
314 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
315 {
316 	SVCPOOL *pool;
317 	int newmaxthreads, error, g;
318 
319 	pool = oidp->oid_arg1;
320 	newmaxthreads = pool->sp_maxthreads;
321 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
322 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
323 		if (newmaxthreads < pool->sp_minthreads)
324 			return (EINVAL);
325 		mtx_lock(&pool->sp_lock);
326 		pool->sp_maxthreads = newmaxthreads;
327 		for (g = 0; g < pool->sp_groupcount; g++) {
328 			pool->sp_groups[g].sg_maxthreads = max(1,
329 			    pool->sp_maxthreads / pool->sp_groupcount);
330 		}
331 		mtx_unlock(&pool->sp_lock);
332 	}
333 	return (error);
334 }
335 
336 /*
337  * Activate a transport handle.
338  */
339 void
340 xprt_register(SVCXPRT *xprt)
341 {
342 	SVCPOOL *pool = xprt->xp_pool;
343 	SVCGROUP *grp;
344 	int g;
345 
346 	SVC_ACQUIRE(xprt);
347 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
348 	xprt->xp_group = grp = &pool->sp_groups[g];
349 	mtx_lock(&grp->sg_lock);
350 	xprt->xp_registered = TRUE;
351 	xprt->xp_active = FALSE;
352 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
353 	mtx_unlock(&grp->sg_lock);
354 }
355 
356 /*
357  * De-activate a transport handle. Note: the locked version doesn't
358  * release the transport - caller must do that after dropping the pool
359  * lock.
360  */
361 static void
362 xprt_unregister_locked(SVCXPRT *xprt)
363 {
364 	SVCGROUP *grp = xprt->xp_group;
365 
366 	mtx_assert(&grp->sg_lock, MA_OWNED);
367 	KASSERT(xprt->xp_registered == TRUE,
368 	    ("xprt_unregister_locked: not registered"));
369 	xprt_inactive_locked(xprt);
370 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
371 	xprt->xp_registered = FALSE;
372 }
373 
374 void
375 xprt_unregister(SVCXPRT *xprt)
376 {
377 	SVCGROUP *grp = xprt->xp_group;
378 
379 	mtx_lock(&grp->sg_lock);
380 	if (xprt->xp_registered == FALSE) {
381 		/* Already unregistered by another thread */
382 		mtx_unlock(&grp->sg_lock);
383 		return;
384 	}
385 	xprt_unregister_locked(xprt);
386 	mtx_unlock(&grp->sg_lock);
387 
388 	if (xprt->xp_socket != NULL)
389 		soshutdown(xprt->xp_socket, SHUT_WR);
390 	SVC_RELEASE(xprt);
391 }
392 
393 /*
394  * Attempt to assign a service thread to this transport.
395  */
396 static int
397 xprt_assignthread(SVCXPRT *xprt)
398 {
399 	SVCGROUP *grp = xprt->xp_group;
400 	SVCTHREAD *st;
401 
402 	mtx_assert(&grp->sg_lock, MA_OWNED);
403 	st = LIST_FIRST(&grp->sg_idlethreads);
404 	if (st) {
405 		LIST_REMOVE(st, st_ilink);
406 		SVC_ACQUIRE(xprt);
407 		xprt->xp_thread = st;
408 		st->st_xprt = xprt;
409 		cv_signal(&st->st_cond);
410 		return (TRUE);
411 	} else {
412 		/*
413 		 * See if we can create a new thread. The
414 		 * actual thread creation happens in
415 		 * svc_run_internal because our locking state
416 		 * is poorly defined (we are typically called
417 		 * from a socket upcall). Don't create more
418 		 * than one thread per second.
419 		 */
420 		if (grp->sg_state == SVCPOOL_ACTIVE
421 		    && grp->sg_lastcreatetime < time_uptime
422 		    && grp->sg_threadcount < grp->sg_maxthreads) {
423 			grp->sg_state = SVCPOOL_THREADWANTED;
424 		}
425 	}
426 	return (FALSE);
427 }
428 
429 void
430 xprt_active(SVCXPRT *xprt)
431 {
432 	SVCGROUP *grp = xprt->xp_group;
433 
434 	mtx_lock(&grp->sg_lock);
435 
436 	if (!xprt->xp_registered) {
437 		/*
438 		 * Race with xprt_unregister - we lose.
439 		 */
440 		mtx_unlock(&grp->sg_lock);
441 		return;
442 	}
443 
444 	if (!xprt->xp_active) {
445 		xprt->xp_active = TRUE;
446 		if (xprt->xp_thread == NULL) {
447 			if (!svc_request_space_available(xprt->xp_pool) ||
448 			    !xprt_assignthread(xprt))
449 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
450 				    xp_alink);
451 		}
452 	}
453 
454 	mtx_unlock(&grp->sg_lock);
455 }
456 
457 void
458 xprt_inactive_locked(SVCXPRT *xprt)
459 {
460 	SVCGROUP *grp = xprt->xp_group;
461 
462 	mtx_assert(&grp->sg_lock, MA_OWNED);
463 	if (xprt->xp_active) {
464 		if (xprt->xp_thread == NULL)
465 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
466 		xprt->xp_active = FALSE;
467 	}
468 }
469 
470 void
471 xprt_inactive(SVCXPRT *xprt)
472 {
473 	SVCGROUP *grp = xprt->xp_group;
474 
475 	mtx_lock(&grp->sg_lock);
476 	xprt_inactive_locked(xprt);
477 	mtx_unlock(&grp->sg_lock);
478 }
479 
480 /*
481  * Variant of xprt_inactive() for use only when sure that port is
482  * assigned to thread. For example, within receive handlers.
483  */
484 void
485 xprt_inactive_self(SVCXPRT *xprt)
486 {
487 
488 	KASSERT(xprt->xp_thread != NULL,
489 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
490 	xprt->xp_active = FALSE;
491 }
492 
493 /*
494  * Add a service program to the callout list.
495  * The dispatch routine will be called when a rpc request for this
496  * program number comes in.
497  */
498 bool_t
499 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
500     void (*dispatch)(struct svc_req *, SVCXPRT *),
501     const struct netconfig *nconf)
502 {
503 	SVCPOOL *pool = xprt->xp_pool;
504 	struct svc_callout *s;
505 	char *netid = NULL;
506 	int flag = 0;
507 
508 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
509 
510 	if (xprt->xp_netid) {
511 		netid = strdup(xprt->xp_netid, M_RPC);
512 		flag = 1;
513 	} else if (nconf && nconf->nc_netid) {
514 		netid = strdup(nconf->nc_netid, M_RPC);
515 		flag = 1;
516 	} /* must have been created with svc_raw_create */
517 	if ((netid == NULL) && (flag == 1)) {
518 		return (FALSE);
519 	}
520 
521 	mtx_lock(&pool->sp_lock);
522 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
523 		if (netid)
524 			free(netid, M_RPC);
525 		if (s->sc_dispatch == dispatch)
526 			goto rpcb_it; /* he is registering another xptr */
527 		mtx_unlock(&pool->sp_lock);
528 		return (FALSE);
529 	}
530 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
531 	if (s == NULL) {
532 		if (netid)
533 			free(netid, M_RPC);
534 		mtx_unlock(&pool->sp_lock);
535 		return (FALSE);
536 	}
537 
538 	s->sc_prog = prog;
539 	s->sc_vers = vers;
540 	s->sc_dispatch = dispatch;
541 	s->sc_netid = netid;
542 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
543 
544 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
545 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
546 
547 rpcb_it:
548 	mtx_unlock(&pool->sp_lock);
549 	/* now register the information with the local binder service */
550 	if (nconf) {
551 		bool_t dummy;
552 		struct netconfig tnc;
553 		struct netbuf nb;
554 		tnc = *nconf;
555 		nb.buf = &xprt->xp_ltaddr;
556 		nb.len = xprt->xp_ltaddr.ss_len;
557 		dummy = rpcb_set(prog, vers, &tnc, &nb);
558 		return (dummy);
559 	}
560 	return (TRUE);
561 }
562 
563 /*
564  * Remove a service program from the callout list.
565  */
566 void
567 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
568 {
569 	struct svc_callout *s;
570 
571 	/* unregister the information anyway */
572 	(void) rpcb_unset(prog, vers, NULL);
573 	mtx_lock(&pool->sp_lock);
574 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
575 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
576 		if (s->sc_netid)
577 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
578 		mem_free(s, sizeof (struct svc_callout));
579 	}
580 	mtx_unlock(&pool->sp_lock);
581 }
582 
583 /*
584  * Add a service connection loss program to the callout list.
585  * The dispatch routine will be called when some port in ths pool die.
586  */
587 bool_t
588 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
589 {
590 	SVCPOOL *pool = xprt->xp_pool;
591 	struct svc_loss_callout *s;
592 
593 	mtx_lock(&pool->sp_lock);
594 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
595 		if (s->slc_dispatch == dispatch)
596 			break;
597 	}
598 	if (s != NULL) {
599 		mtx_unlock(&pool->sp_lock);
600 		return (TRUE);
601 	}
602 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
603 	if (s == NULL) {
604 		mtx_unlock(&pool->sp_lock);
605 		return (FALSE);
606 	}
607 	s->slc_dispatch = dispatch;
608 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
609 	mtx_unlock(&pool->sp_lock);
610 	return (TRUE);
611 }
612 
613 /*
614  * Remove a service connection loss program from the callout list.
615  */
616 void
617 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
618 {
619 	struct svc_loss_callout *s;
620 
621 	mtx_lock(&pool->sp_lock);
622 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
623 		if (s->slc_dispatch == dispatch) {
624 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
625 			free(s, M_RPC);
626 			break;
627 		}
628 	}
629 	mtx_unlock(&pool->sp_lock);
630 }
631 
632 /* ********************** CALLOUT list related stuff ************* */
633 
634 /*
635  * Search the callout list for a program number, return the callout
636  * struct.
637  */
638 static struct svc_callout *
639 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
640 {
641 	struct svc_callout *s;
642 
643 	mtx_assert(&pool->sp_lock, MA_OWNED);
644 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
645 		if (s->sc_prog == prog && s->sc_vers == vers
646 		    && (netid == NULL || s->sc_netid == NULL ||
647 			strcmp(netid, s->sc_netid) == 0))
648 			break;
649 	}
650 
651 	return (s);
652 }
653 
654 /* ******************* REPLY GENERATION ROUTINES  ************ */
655 
656 static bool_t
657 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
658     struct mbuf *body)
659 {
660 	SVCXPRT *xprt = rqstp->rq_xprt;
661 	bool_t ok;
662 
663 	if (rqstp->rq_args) {
664 		m_freem(rqstp->rq_args);
665 		rqstp->rq_args = NULL;
666 	}
667 
668 	if (xprt->xp_pool->sp_rcache)
669 		replay_setreply(xprt->xp_pool->sp_rcache,
670 		    rply, svc_getrpccaller(rqstp), body);
671 
672 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
673 		return (FALSE);
674 
675 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
676 	if (rqstp->rq_addr) {
677 		free(rqstp->rq_addr, M_SONAME);
678 		rqstp->rq_addr = NULL;
679 	}
680 
681 	return (ok);
682 }
683 
684 /*
685  * Send a reply to an rpc request
686  */
687 bool_t
688 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
689 {
690 	struct rpc_msg rply;
691 	struct mbuf *m;
692 	XDR xdrs;
693 	bool_t ok;
694 
695 	rply.rm_xid = rqstp->rq_xid;
696 	rply.rm_direction = REPLY;
697 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
698 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
699 	rply.acpted_rply.ar_stat = SUCCESS;
700 	rply.acpted_rply.ar_results.where = NULL;
701 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
702 
703 	m = m_getcl(M_WAITOK, MT_DATA, 0);
704 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
705 	ok = xdr_results(&xdrs, xdr_location);
706 	XDR_DESTROY(&xdrs);
707 
708 	if (ok) {
709 		return (svc_sendreply_common(rqstp, &rply, m));
710 	} else {
711 		m_freem(m);
712 		return (FALSE);
713 	}
714 }
715 
716 bool_t
717 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
718 {
719 	struct rpc_msg rply;
720 
721 	rply.rm_xid = rqstp->rq_xid;
722 	rply.rm_direction = REPLY;
723 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
724 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
725 	rply.acpted_rply.ar_stat = SUCCESS;
726 	rply.acpted_rply.ar_results.where = NULL;
727 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
728 
729 	return (svc_sendreply_common(rqstp, &rply, m));
730 }
731 
732 /*
733  * No procedure error reply
734  */
735 void
736 svcerr_noproc(struct svc_req *rqstp)
737 {
738 	SVCXPRT *xprt = rqstp->rq_xprt;
739 	struct rpc_msg rply;
740 
741 	rply.rm_xid = rqstp->rq_xid;
742 	rply.rm_direction = REPLY;
743 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
744 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
745 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
746 
747 	if (xprt->xp_pool->sp_rcache)
748 		replay_setreply(xprt->xp_pool->sp_rcache,
749 		    &rply, svc_getrpccaller(rqstp), NULL);
750 
751 	svc_sendreply_common(rqstp, &rply, NULL);
752 }
753 
754 /*
755  * Can't decode args error reply
756  */
757 void
758 svcerr_decode(struct svc_req *rqstp)
759 {
760 	SVCXPRT *xprt = rqstp->rq_xprt;
761 	struct rpc_msg rply;
762 
763 	rply.rm_xid = rqstp->rq_xid;
764 	rply.rm_direction = REPLY;
765 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
766 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
767 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
768 
769 	if (xprt->xp_pool->sp_rcache)
770 		replay_setreply(xprt->xp_pool->sp_rcache,
771 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
772 
773 	svc_sendreply_common(rqstp, &rply, NULL);
774 }
775 
776 /*
777  * Some system error
778  */
779 void
780 svcerr_systemerr(struct svc_req *rqstp)
781 {
782 	SVCXPRT *xprt = rqstp->rq_xprt;
783 	struct rpc_msg rply;
784 
785 	rply.rm_xid = rqstp->rq_xid;
786 	rply.rm_direction = REPLY;
787 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
788 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
789 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
790 
791 	if (xprt->xp_pool->sp_rcache)
792 		replay_setreply(xprt->xp_pool->sp_rcache,
793 		    &rply, svc_getrpccaller(rqstp), NULL);
794 
795 	svc_sendreply_common(rqstp, &rply, NULL);
796 }
797 
798 /*
799  * Authentication error reply
800  */
801 void
802 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
803 {
804 	SVCXPRT *xprt = rqstp->rq_xprt;
805 	struct rpc_msg rply;
806 
807 	rply.rm_xid = rqstp->rq_xid;
808 	rply.rm_direction = REPLY;
809 	rply.rm_reply.rp_stat = MSG_DENIED;
810 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
811 	rply.rjcted_rply.rj_why = why;
812 
813 	if (xprt->xp_pool->sp_rcache)
814 		replay_setreply(xprt->xp_pool->sp_rcache,
815 		    &rply, svc_getrpccaller(rqstp), NULL);
816 
817 	svc_sendreply_common(rqstp, &rply, NULL);
818 }
819 
820 /*
821  * Auth too weak error reply
822  */
823 void
824 svcerr_weakauth(struct svc_req *rqstp)
825 {
826 
827 	svcerr_auth(rqstp, AUTH_TOOWEAK);
828 }
829 
830 /*
831  * Program unavailable error reply
832  */
833 void
834 svcerr_noprog(struct svc_req *rqstp)
835 {
836 	SVCXPRT *xprt = rqstp->rq_xprt;
837 	struct rpc_msg rply;
838 
839 	rply.rm_xid = rqstp->rq_xid;
840 	rply.rm_direction = REPLY;
841 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
842 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
843 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
844 
845 	if (xprt->xp_pool->sp_rcache)
846 		replay_setreply(xprt->xp_pool->sp_rcache,
847 		    &rply, svc_getrpccaller(rqstp), NULL);
848 
849 	svc_sendreply_common(rqstp, &rply, NULL);
850 }
851 
852 /*
853  * Program version mismatch error reply
854  */
855 void
856 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
857 {
858 	SVCXPRT *xprt = rqstp->rq_xprt;
859 	struct rpc_msg rply;
860 
861 	rply.rm_xid = rqstp->rq_xid;
862 	rply.rm_direction = REPLY;
863 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
864 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
865 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
866 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
867 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
868 
869 	if (xprt->xp_pool->sp_rcache)
870 		replay_setreply(xprt->xp_pool->sp_rcache,
871 		    &rply, svc_getrpccaller(rqstp), NULL);
872 
873 	svc_sendreply_common(rqstp, &rply, NULL);
874 }
875 
876 /*
877  * Allocate a new server transport structure. All fields are
878  * initialized to zero and xp_p3 is initialized to point at an
879  * extension structure to hold various flags and authentication
880  * parameters.
881  */
882 SVCXPRT *
883 svc_xprt_alloc(void)
884 {
885 	SVCXPRT *xprt;
886 	SVCXPRT_EXT *ext;
887 
888 	xprt = mem_alloc(sizeof(SVCXPRT));
889 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
890 	xprt->xp_p3 = ext;
891 	refcount_init(&xprt->xp_refs, 1);
892 
893 	return (xprt);
894 }
895 
896 /*
897  * Free a server transport structure.
898  */
899 void
900 svc_xprt_free(SVCXPRT *xprt)
901 {
902 
903 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
904 	/* The size argument is ignored, so 0 is ok. */
905 	mem_free(xprt->xp_gidp, 0);
906 	mem_free(xprt, sizeof(SVCXPRT));
907 }
908 
909 /* ******************* SERVER INPUT STUFF ******************* */
910 
911 /*
912  * Read RPC requests from a transport and queue them to be
913  * executed. We handle authentication and replay cache replies here.
914  * Actually dispatching the RPC is deferred till svc_executereq.
915  */
916 static enum xprt_stat
917 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
918 {
919 	SVCPOOL *pool = xprt->xp_pool;
920 	struct svc_req *r;
921 	struct rpc_msg msg;
922 	struct mbuf *args;
923 	struct svc_loss_callout *s;
924 	enum xprt_stat stat;
925 
926 	/* now receive msgs from xprtprt (support batch calls) */
927 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
928 
929 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
930 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
931 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
932 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
933 		enum auth_stat why;
934 
935 		/*
936 		 * Handle replays and authenticate before queuing the
937 		 * request to be executed.
938 		 */
939 		SVC_ACQUIRE(xprt);
940 		r->rq_xprt = xprt;
941 		if (pool->sp_rcache) {
942 			struct rpc_msg repmsg;
943 			struct mbuf *repbody;
944 			enum replay_state rs;
945 			rs = replay_find(pool->sp_rcache, &msg,
946 			    svc_getrpccaller(r), &repmsg, &repbody);
947 			switch (rs) {
948 			case RS_NEW:
949 				break;
950 			case RS_DONE:
951 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
952 				    repbody, &r->rq_reply_seq);
953 				if (r->rq_addr) {
954 					free(r->rq_addr, M_SONAME);
955 					r->rq_addr = NULL;
956 				}
957 				m_freem(args);
958 				goto call_done;
959 
960 			default:
961 				m_freem(args);
962 				goto call_done;
963 			}
964 		}
965 
966 		r->rq_xid = msg.rm_xid;
967 		r->rq_prog = msg.rm_call.cb_prog;
968 		r->rq_vers = msg.rm_call.cb_vers;
969 		r->rq_proc = msg.rm_call.cb_proc;
970 		r->rq_size = sizeof(*r) + m_length(args, NULL);
971 		r->rq_args = args;
972 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
973 			/*
974 			 * RPCSEC_GSS uses this return code
975 			 * for requests that form part of its
976 			 * context establishment protocol and
977 			 * should not be dispatched to the
978 			 * application.
979 			 */
980 			if (why != RPCSEC_GSS_NODISPATCH)
981 				svcerr_auth(r, why);
982 			goto call_done;
983 		}
984 
985 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
986 			svcerr_decode(r);
987 			goto call_done;
988 		}
989 
990 		/*
991 		 * Everything checks out, return request to caller.
992 		 */
993 		*rqstp_ret = r;
994 		r = NULL;
995 	}
996 call_done:
997 	if (r) {
998 		svc_freereq(r);
999 		r = NULL;
1000 	}
1001 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1002 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1003 			(*s->slc_dispatch)(xprt);
1004 		xprt_unregister(xprt);
1005 	}
1006 
1007 	return (stat);
1008 }
1009 
1010 static void
1011 svc_executereq(struct svc_req *rqstp)
1012 {
1013 	SVCXPRT *xprt = rqstp->rq_xprt;
1014 	SVCPOOL *pool = xprt->xp_pool;
1015 	int prog_found;
1016 	rpcvers_t low_vers;
1017 	rpcvers_t high_vers;
1018 	struct svc_callout *s;
1019 
1020 	/* now match message with a registered service*/
1021 	prog_found = FALSE;
1022 	low_vers = (rpcvers_t) -1L;
1023 	high_vers = (rpcvers_t) 0L;
1024 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1025 		if (s->sc_prog == rqstp->rq_prog) {
1026 			if (s->sc_vers == rqstp->rq_vers) {
1027 				/*
1028 				 * We hand ownership of r to the
1029 				 * dispatch method - they must call
1030 				 * svc_freereq.
1031 				 */
1032 				(*s->sc_dispatch)(rqstp, xprt);
1033 				return;
1034 			}  /* found correct version */
1035 			prog_found = TRUE;
1036 			if (s->sc_vers < low_vers)
1037 				low_vers = s->sc_vers;
1038 			if (s->sc_vers > high_vers)
1039 				high_vers = s->sc_vers;
1040 		}   /* found correct program */
1041 	}
1042 
1043 	/*
1044 	 * if we got here, the program or version
1045 	 * is not served ...
1046 	 */
1047 	if (prog_found)
1048 		svcerr_progvers(rqstp, low_vers, high_vers);
1049 	else
1050 		svcerr_noprog(rqstp);
1051 
1052 	svc_freereq(rqstp);
1053 }
1054 
1055 static void
1056 svc_checkidle(SVCGROUP *grp)
1057 {
1058 	SVCXPRT *xprt, *nxprt;
1059 	time_t timo;
1060 	struct svcxprt_list cleanup;
1061 
1062 	TAILQ_INIT(&cleanup);
1063 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1064 		/*
1065 		 * Only some transports have idle timers. Don't time
1066 		 * something out which is just waking up.
1067 		 */
1068 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1069 			continue;
1070 
1071 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1072 		if (time_uptime > timo) {
1073 			xprt_unregister_locked(xprt);
1074 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1075 		}
1076 	}
1077 
1078 	mtx_unlock(&grp->sg_lock);
1079 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1080 		soshutdown(xprt->xp_socket, SHUT_WR);
1081 		SVC_RELEASE(xprt);
1082 	}
1083 	mtx_lock(&grp->sg_lock);
1084 }
1085 
1086 static void
1087 svc_assign_waiting_sockets(SVCPOOL *pool)
1088 {
1089 	SVCGROUP *grp;
1090 	SVCXPRT *xprt;
1091 	int g;
1092 
1093 	for (g = 0; g < pool->sp_groupcount; g++) {
1094 		grp = &pool->sp_groups[g];
1095 		mtx_lock(&grp->sg_lock);
1096 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1097 			if (xprt_assignthread(xprt))
1098 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1099 			else
1100 				break;
1101 		}
1102 		mtx_unlock(&grp->sg_lock);
1103 	}
1104 }
1105 
1106 static void
1107 svc_change_space_used(SVCPOOL *pool, long delta)
1108 {
1109 	unsigned long value;
1110 
1111 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1112 	if (delta > 0) {
1113 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1114 			pool->sp_space_throttled = TRUE;
1115 			pool->sp_space_throttle_count++;
1116 		}
1117 		if (value > pool->sp_space_used_highest)
1118 			pool->sp_space_used_highest = value;
1119 	} else {
1120 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1121 			pool->sp_space_throttled = FALSE;
1122 			svc_assign_waiting_sockets(pool);
1123 		}
1124 	}
1125 }
1126 
1127 static bool_t
1128 svc_request_space_available(SVCPOOL *pool)
1129 {
1130 
1131 	if (pool->sp_space_throttled)
1132 		return (FALSE);
1133 	return (TRUE);
1134 }
1135 
1136 static void
1137 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1138 {
1139 	SVCPOOL *pool = grp->sg_pool;
1140 	SVCTHREAD *st, *stpref;
1141 	SVCXPRT *xprt;
1142 	enum xprt_stat stat;
1143 	struct svc_req *rqstp;
1144 	struct proc *p;
1145 	long sz;
1146 	int error;
1147 
1148 	st = mem_alloc(sizeof(*st));
1149 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1150 	st->st_pool = pool;
1151 	st->st_xprt = NULL;
1152 	STAILQ_INIT(&st->st_reqs);
1153 	cv_init(&st->st_cond, "rpcsvc");
1154 
1155 	mtx_lock(&grp->sg_lock);
1156 
1157 	/*
1158 	 * If we are a new thread which was spawned to cope with
1159 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1160 	 */
1161 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1162 		grp->sg_state = SVCPOOL_ACTIVE;
1163 
1164 	while (grp->sg_state != SVCPOOL_CLOSING) {
1165 		/*
1166 		 * Create new thread if requested.
1167 		 */
1168 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1169 			grp->sg_state = SVCPOOL_THREADSTARTING;
1170 			grp->sg_lastcreatetime = time_uptime;
1171 			mtx_unlock(&grp->sg_lock);
1172 			svc_new_thread(grp);
1173 			mtx_lock(&grp->sg_lock);
1174 			continue;
1175 		}
1176 
1177 		/*
1178 		 * Check for idle transports once per second.
1179 		 */
1180 		if (time_uptime > grp->sg_lastidlecheck) {
1181 			grp->sg_lastidlecheck = time_uptime;
1182 			svc_checkidle(grp);
1183 		}
1184 
1185 		xprt = st->st_xprt;
1186 		if (!xprt) {
1187 			/*
1188 			 * Enforce maxthreads count.
1189 			 */
1190 			if (!ismaster && grp->sg_threadcount >
1191 			    grp->sg_maxthreads)
1192 				break;
1193 
1194 			/*
1195 			 * Before sleeping, see if we can find an
1196 			 * active transport which isn't being serviced
1197 			 * by a thread.
1198 			 */
1199 			if (svc_request_space_available(pool) &&
1200 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1201 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1202 				SVC_ACQUIRE(xprt);
1203 				xprt->xp_thread = st;
1204 				st->st_xprt = xprt;
1205 				continue;
1206 			}
1207 
1208 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1209 			if (ismaster || (!ismaster &&
1210 			    grp->sg_threadcount > grp->sg_minthreads))
1211 				error = cv_timedwait_sig(&st->st_cond,
1212 				    &grp->sg_lock, 5 * hz);
1213 			else
1214 				error = cv_wait_sig(&st->st_cond,
1215 				    &grp->sg_lock);
1216 			if (st->st_xprt == NULL)
1217 				LIST_REMOVE(st, st_ilink);
1218 
1219 			/*
1220 			 * Reduce worker thread count when idle.
1221 			 */
1222 			if (error == EWOULDBLOCK) {
1223 				if (!ismaster
1224 				    && (grp->sg_threadcount
1225 					> grp->sg_minthreads)
1226 					&& !st->st_xprt)
1227 					break;
1228 			} else if (error != 0) {
1229 				KASSERT(error == EINTR || error == ERESTART,
1230 				    ("non-signal error %d", error));
1231 				mtx_unlock(&grp->sg_lock);
1232 				p = curproc;
1233 				PROC_LOCK(p);
1234 				if (P_SHOULDSTOP(p) ||
1235 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1236 					thread_suspend_check(0);
1237 					PROC_UNLOCK(p);
1238 					mtx_lock(&grp->sg_lock);
1239 				} else {
1240 					PROC_UNLOCK(p);
1241 					svc_exit(pool);
1242 					mtx_lock(&grp->sg_lock);
1243 					break;
1244 				}
1245 			}
1246 			continue;
1247 		}
1248 		mtx_unlock(&grp->sg_lock);
1249 
1250 		/*
1251 		 * Drain the transport socket and queue up any RPCs.
1252 		 */
1253 		xprt->xp_lastactive = time_uptime;
1254 		do {
1255 			if (!svc_request_space_available(pool))
1256 				break;
1257 			rqstp = NULL;
1258 			stat = svc_getreq(xprt, &rqstp);
1259 			if (rqstp) {
1260 				svc_change_space_used(pool, rqstp->rq_size);
1261 				/*
1262 				 * See if the application has a preference
1263 				 * for some other thread.
1264 				 */
1265 				if (pool->sp_assign) {
1266 					stpref = pool->sp_assign(st, rqstp);
1267 					rqstp->rq_thread = stpref;
1268 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1269 					    rqstp, rq_link);
1270 					mtx_unlock(&stpref->st_lock);
1271 					if (stpref != st)
1272 						rqstp = NULL;
1273 				} else {
1274 					rqstp->rq_thread = st;
1275 					STAILQ_INSERT_TAIL(&st->st_reqs,
1276 					    rqstp, rq_link);
1277 				}
1278 			}
1279 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1280 		    && grp->sg_state != SVCPOOL_CLOSING);
1281 
1282 		/*
1283 		 * Move this transport to the end of the active list to
1284 		 * ensure fairness when multiple transports are active.
1285 		 * If this was the last queued request, svc_getreq will end
1286 		 * up calling xprt_inactive to remove from the active list.
1287 		 */
1288 		mtx_lock(&grp->sg_lock);
1289 		xprt->xp_thread = NULL;
1290 		st->st_xprt = NULL;
1291 		if (xprt->xp_active) {
1292 			if (!svc_request_space_available(pool) ||
1293 			    !xprt_assignthread(xprt))
1294 				TAILQ_INSERT_TAIL(&grp->sg_active,
1295 				    xprt, xp_alink);
1296 		}
1297 		mtx_unlock(&grp->sg_lock);
1298 		SVC_RELEASE(xprt);
1299 
1300 		/*
1301 		 * Execute what we have queued.
1302 		 */
1303 		mtx_lock(&st->st_lock);
1304 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1305 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1306 			mtx_unlock(&st->st_lock);
1307 			sz = (long)rqstp->rq_size;
1308 			svc_executereq(rqstp);
1309 			svc_change_space_used(pool, -sz);
1310 			mtx_lock(&st->st_lock);
1311 		}
1312 		mtx_unlock(&st->st_lock);
1313 		mtx_lock(&grp->sg_lock);
1314 	}
1315 
1316 	if (st->st_xprt) {
1317 		xprt = st->st_xprt;
1318 		st->st_xprt = NULL;
1319 		SVC_RELEASE(xprt);
1320 	}
1321 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1322 	mtx_destroy(&st->st_lock);
1323 	cv_destroy(&st->st_cond);
1324 	mem_free(st, sizeof(*st));
1325 
1326 	grp->sg_threadcount--;
1327 	if (!ismaster)
1328 		wakeup(grp);
1329 	mtx_unlock(&grp->sg_lock);
1330 }
1331 
1332 static void
1333 svc_thread_start(void *arg)
1334 {
1335 
1336 	svc_run_internal((SVCGROUP *) arg, FALSE);
1337 	kthread_exit();
1338 }
1339 
1340 static void
1341 svc_new_thread(SVCGROUP *grp)
1342 {
1343 	SVCPOOL *pool = grp->sg_pool;
1344 	struct thread *td;
1345 
1346 	mtx_lock(&grp->sg_lock);
1347 	grp->sg_threadcount++;
1348 	mtx_unlock(&grp->sg_lock);
1349 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1350 	    "%s: service", pool->sp_name);
1351 }
1352 
1353 void
1354 svc_run(SVCPOOL *pool)
1355 {
1356 	int g, i;
1357 	struct proc *p;
1358 	struct thread *td;
1359 	SVCGROUP *grp;
1360 
1361 	p = curproc;
1362 	td = curthread;
1363 	snprintf(td->td_name, sizeof(td->td_name),
1364 	    "%s: master", pool->sp_name);
1365 	pool->sp_state = SVCPOOL_ACTIVE;
1366 	pool->sp_proc = p;
1367 
1368 	/* Choose group count based on number of threads and CPUs. */
1369 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1370 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1371 	for (g = 0; g < pool->sp_groupcount; g++) {
1372 		grp = &pool->sp_groups[g];
1373 		grp->sg_minthreads = max(1,
1374 		    pool->sp_minthreads / pool->sp_groupcount);
1375 		grp->sg_maxthreads = max(1,
1376 		    pool->sp_maxthreads / pool->sp_groupcount);
1377 		grp->sg_lastcreatetime = time_uptime;
1378 	}
1379 
1380 	/* Starting threads */
1381 	pool->sp_groups[0].sg_threadcount++;
1382 	for (g = 0; g < pool->sp_groupcount; g++) {
1383 		grp = &pool->sp_groups[g];
1384 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1385 			svc_new_thread(grp);
1386 	}
1387 	svc_run_internal(&pool->sp_groups[0], TRUE);
1388 
1389 	/* Waiting for threads to stop. */
1390 	for (g = 0; g < pool->sp_groupcount; g++) {
1391 		grp = &pool->sp_groups[g];
1392 		mtx_lock(&grp->sg_lock);
1393 		while (grp->sg_threadcount > 0)
1394 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1395 		mtx_unlock(&grp->sg_lock);
1396 	}
1397 }
1398 
1399 void
1400 svc_exit(SVCPOOL *pool)
1401 {
1402 	SVCGROUP *grp;
1403 	SVCTHREAD *st;
1404 	int g;
1405 
1406 	pool->sp_state = SVCPOOL_CLOSING;
1407 	for (g = 0; g < pool->sp_groupcount; g++) {
1408 		grp = &pool->sp_groups[g];
1409 		mtx_lock(&grp->sg_lock);
1410 		if (grp->sg_state != SVCPOOL_CLOSING) {
1411 			grp->sg_state = SVCPOOL_CLOSING;
1412 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1413 				cv_signal(&st->st_cond);
1414 		}
1415 		mtx_unlock(&grp->sg_lock);
1416 	}
1417 }
1418 
1419 bool_t
1420 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1421 {
1422 	struct mbuf *m;
1423 	XDR xdrs;
1424 	bool_t stat;
1425 
1426 	m = rqstp->rq_args;
1427 	rqstp->rq_args = NULL;
1428 
1429 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1430 	stat = xargs(&xdrs, args);
1431 	XDR_DESTROY(&xdrs);
1432 
1433 	return (stat);
1434 }
1435 
1436 bool_t
1437 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1438 {
1439 	XDR xdrs;
1440 
1441 	if (rqstp->rq_addr) {
1442 		free(rqstp->rq_addr, M_SONAME);
1443 		rqstp->rq_addr = NULL;
1444 	}
1445 
1446 	xdrs.x_op = XDR_FREE;
1447 	return (xargs(&xdrs, args));
1448 }
1449 
1450 void
1451 svc_freereq(struct svc_req *rqstp)
1452 {
1453 	SVCTHREAD *st;
1454 	SVCPOOL *pool;
1455 
1456 	st = rqstp->rq_thread;
1457 	if (st) {
1458 		pool = st->st_pool;
1459 		if (pool->sp_done)
1460 			pool->sp_done(st, rqstp);
1461 	}
1462 
1463 	if (rqstp->rq_auth.svc_ah_ops)
1464 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1465 
1466 	if (rqstp->rq_xprt) {
1467 		SVC_RELEASE(rqstp->rq_xprt);
1468 	}
1469 
1470 	if (rqstp->rq_addr)
1471 		free(rqstp->rq_addr, M_SONAME);
1472 
1473 	if (rqstp->rq_args)
1474 		m_freem(rqstp->rq_args);
1475 
1476 	free(rqstp, M_RPC);
1477 }
1478