xref: /freebsd/sys/rpc/svc.c (revision 243e928310d073338c5ec089f0dce238a80b9866)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice,
12  *   this list of conditions and the following disclaimer in the documentation
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37 
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62 
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66 
67 #include <rpc/rpc_com.h>
68 
69 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73     char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 
79 /* ***************  SVCXPRT related stuff **************** */
80 
81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84 
85 SVCPOOL*
86 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
87 {
88 	SVCPOOL *pool;
89 	SVCGROUP *grp;
90 	int g;
91 
92 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
93 
94 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95 	pool->sp_name = name;
96 	pool->sp_state = SVCPOOL_INIT;
97 	pool->sp_proc = NULL;
98 	TAILQ_INIT(&pool->sp_callouts);
99 	TAILQ_INIT(&pool->sp_lcallouts);
100 	pool->sp_minthreads = 1;
101 	pool->sp_maxthreads = 1;
102 	pool->sp_groupcount = 1;
103 	for (g = 0; g < SVC_MAXGROUPS; g++) {
104 		grp = &pool->sp_groups[g];
105 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
106 		grp->sg_pool = pool;
107 		grp->sg_state = SVCPOOL_ACTIVE;
108 		TAILQ_INIT(&grp->sg_xlist);
109 		TAILQ_INIT(&grp->sg_active);
110 		LIST_INIT(&grp->sg_idlethreads);
111 		grp->sg_minthreads = 1;
112 		grp->sg_maxthreads = 1;
113 	}
114 
115 	/*
116 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
117 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
118 	 * on LP64 architectures, so cast to u_long to avoid undefined
119 	 * behavior.  (ILP32 architectures cannot have nmbclusters
120 	 * large enough to overflow for other reasons.)
121 	 */
122 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
123 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
124 
125 	sysctl_ctx_init(&pool->sp_sysctl);
126 	if (sysctl_base) {
127 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
128 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
129 		    pool, 0, svcpool_minthread_sysctl, "I",
130 		    "Minimal number of threads");
131 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
133 		    pool, 0, svcpool_maxthread_sysctl, "I",
134 		    "Maximal number of threads");
135 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 		    "threads", CTLTYPE_INT | CTLFLAG_RD,
137 		    pool, 0, svcpool_threads_sysctl, "I",
138 		    "Current number of threads");
139 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
141 		    "Number of thread groups");
142 
143 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144 		    "request_space_used", CTLFLAG_RD,
145 		    &pool->sp_space_used,
146 		    "Space in parsed but not handled requests.");
147 
148 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
149 		    "request_space_used_highest", CTLFLAG_RD,
150 		    &pool->sp_space_used_highest,
151 		    "Highest space used since reboot.");
152 
153 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
154 		    "request_space_high", CTLFLAG_RW,
155 		    &pool->sp_space_high,
156 		    "Maximum space in parsed but not handled requests.");
157 
158 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
159 		    "request_space_low", CTLFLAG_RW,
160 		    &pool->sp_space_low,
161 		    "Low water mark for request space.");
162 
163 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
164 		    "request_space_throttled", CTLFLAG_RD,
165 		    &pool->sp_space_throttled, 0,
166 		    "Whether nfs requests are currently throttled");
167 
168 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
169 		    "request_space_throttle_count", CTLFLAG_RD,
170 		    &pool->sp_space_throttle_count, 0,
171 		    "Count of times throttling based on request space has occurred");
172 	}
173 
174 	return pool;
175 }
176 
177 void
178 svcpool_destroy(SVCPOOL *pool)
179 {
180 	SVCGROUP *grp;
181 	SVCXPRT *xprt, *nxprt;
182 	struct svc_callout *s;
183 	struct svc_loss_callout *sl;
184 	struct svcxprt_list cleanup;
185 	int g;
186 
187 	TAILQ_INIT(&cleanup);
188 
189 	for (g = 0; g < SVC_MAXGROUPS; g++) {
190 		grp = &pool->sp_groups[g];
191 		mtx_lock(&grp->sg_lock);
192 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
193 			xprt_unregister_locked(xprt);
194 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
195 		}
196 		mtx_unlock(&grp->sg_lock);
197 	}
198 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
199 		SVC_RELEASE(xprt);
200 	}
201 
202 	mtx_lock(&pool->sp_lock);
203 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
204 		mtx_unlock(&pool->sp_lock);
205 		svc_unreg(pool, s->sc_prog, s->sc_vers);
206 		mtx_lock(&pool->sp_lock);
207 	}
208 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
209 		mtx_unlock(&pool->sp_lock);
210 		svc_loss_unreg(pool, sl->slc_dispatch);
211 		mtx_lock(&pool->sp_lock);
212 	}
213 	mtx_unlock(&pool->sp_lock);
214 
215 	for (g = 0; g < SVC_MAXGROUPS; g++) {
216 		grp = &pool->sp_groups[g];
217 		mtx_destroy(&grp->sg_lock);
218 	}
219 	mtx_destroy(&pool->sp_lock);
220 
221 	if (pool->sp_rcache)
222 		replay_freecache(pool->sp_rcache);
223 
224 	sysctl_ctx_free(&pool->sp_sysctl);
225 	free(pool, M_RPC);
226 }
227 
228 /*
229  * Sysctl handler to get the present thread count on a pool
230  */
231 static int
232 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
233 {
234 	SVCPOOL *pool;
235 	int threads, error, g;
236 
237 	pool = oidp->oid_arg1;
238 	threads = 0;
239 	mtx_lock(&pool->sp_lock);
240 	for (g = 0; g < pool->sp_groupcount; g++)
241 		threads += pool->sp_groups[g].sg_threadcount;
242 	mtx_unlock(&pool->sp_lock);
243 	error = sysctl_handle_int(oidp, &threads, 0, req);
244 	return (error);
245 }
246 
247 /*
248  * Sysctl handler to set the minimum thread count on a pool
249  */
250 static int
251 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
252 {
253 	SVCPOOL *pool;
254 	int newminthreads, error, g;
255 
256 	pool = oidp->oid_arg1;
257 	newminthreads = pool->sp_minthreads;
258 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
259 	if (error == 0 && newminthreads != pool->sp_minthreads) {
260 		if (newminthreads > pool->sp_maxthreads)
261 			return (EINVAL);
262 		mtx_lock(&pool->sp_lock);
263 		pool->sp_minthreads = newminthreads;
264 		for (g = 0; g < pool->sp_groupcount; g++) {
265 			pool->sp_groups[g].sg_minthreads = max(1,
266 			    pool->sp_minthreads / pool->sp_groupcount);
267 		}
268 		mtx_unlock(&pool->sp_lock);
269 	}
270 	return (error);
271 }
272 
273 /*
274  * Sysctl handler to set the maximum thread count on a pool
275  */
276 static int
277 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
278 {
279 	SVCPOOL *pool;
280 	int newmaxthreads, error, g;
281 
282 	pool = oidp->oid_arg1;
283 	newmaxthreads = pool->sp_maxthreads;
284 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
285 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
286 		if (newmaxthreads < pool->sp_minthreads)
287 			return (EINVAL);
288 		mtx_lock(&pool->sp_lock);
289 		pool->sp_maxthreads = newmaxthreads;
290 		for (g = 0; g < pool->sp_groupcount; g++) {
291 			pool->sp_groups[g].sg_maxthreads = max(1,
292 			    pool->sp_maxthreads / pool->sp_groupcount);
293 		}
294 		mtx_unlock(&pool->sp_lock);
295 	}
296 	return (error);
297 }
298 
299 /*
300  * Activate a transport handle.
301  */
302 void
303 xprt_register(SVCXPRT *xprt)
304 {
305 	SVCPOOL *pool = xprt->xp_pool;
306 	SVCGROUP *grp;
307 	int g;
308 
309 	SVC_ACQUIRE(xprt);
310 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
311 	xprt->xp_group = grp = &pool->sp_groups[g];
312 	mtx_lock(&grp->sg_lock);
313 	xprt->xp_registered = TRUE;
314 	xprt->xp_active = FALSE;
315 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
316 	mtx_unlock(&grp->sg_lock);
317 }
318 
319 /*
320  * De-activate a transport handle. Note: the locked version doesn't
321  * release the transport - caller must do that after dropping the pool
322  * lock.
323  */
324 static void
325 xprt_unregister_locked(SVCXPRT *xprt)
326 {
327 	SVCGROUP *grp = xprt->xp_group;
328 
329 	mtx_assert(&grp->sg_lock, MA_OWNED);
330 	KASSERT(xprt->xp_registered == TRUE,
331 	    ("xprt_unregister_locked: not registered"));
332 	xprt_inactive_locked(xprt);
333 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
334 	xprt->xp_registered = FALSE;
335 }
336 
337 void
338 xprt_unregister(SVCXPRT *xprt)
339 {
340 	SVCGROUP *grp = xprt->xp_group;
341 
342 	mtx_lock(&grp->sg_lock);
343 	if (xprt->xp_registered == FALSE) {
344 		/* Already unregistered by another thread */
345 		mtx_unlock(&grp->sg_lock);
346 		return;
347 	}
348 	xprt_unregister_locked(xprt);
349 	mtx_unlock(&grp->sg_lock);
350 
351 	SVC_RELEASE(xprt);
352 }
353 
354 /*
355  * Attempt to assign a service thread to this transport.
356  */
357 static int
358 xprt_assignthread(SVCXPRT *xprt)
359 {
360 	SVCGROUP *grp = xprt->xp_group;
361 	SVCTHREAD *st;
362 
363 	mtx_assert(&grp->sg_lock, MA_OWNED);
364 	st = LIST_FIRST(&grp->sg_idlethreads);
365 	if (st) {
366 		LIST_REMOVE(st, st_ilink);
367 		SVC_ACQUIRE(xprt);
368 		xprt->xp_thread = st;
369 		st->st_xprt = xprt;
370 		cv_signal(&st->st_cond);
371 		return (TRUE);
372 	} else {
373 		/*
374 		 * See if we can create a new thread. The
375 		 * actual thread creation happens in
376 		 * svc_run_internal because our locking state
377 		 * is poorly defined (we are typically called
378 		 * from a socket upcall). Don't create more
379 		 * than one thread per second.
380 		 */
381 		if (grp->sg_state == SVCPOOL_ACTIVE
382 		    && grp->sg_lastcreatetime < time_uptime
383 		    && grp->sg_threadcount < grp->sg_maxthreads) {
384 			grp->sg_state = SVCPOOL_THREADWANTED;
385 		}
386 	}
387 	return (FALSE);
388 }
389 
390 void
391 xprt_active(SVCXPRT *xprt)
392 {
393 	SVCGROUP *grp = xprt->xp_group;
394 
395 	mtx_lock(&grp->sg_lock);
396 
397 	if (!xprt->xp_registered) {
398 		/*
399 		 * Race with xprt_unregister - we lose.
400 		 */
401 		mtx_unlock(&grp->sg_lock);
402 		return;
403 	}
404 
405 	if (!xprt->xp_active) {
406 		xprt->xp_active = TRUE;
407 		if (xprt->xp_thread == NULL) {
408 			if (!svc_request_space_available(xprt->xp_pool) ||
409 			    !xprt_assignthread(xprt))
410 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
411 				    xp_alink);
412 		}
413 	}
414 
415 	mtx_unlock(&grp->sg_lock);
416 }
417 
418 void
419 xprt_inactive_locked(SVCXPRT *xprt)
420 {
421 	SVCGROUP *grp = xprt->xp_group;
422 
423 	mtx_assert(&grp->sg_lock, MA_OWNED);
424 	if (xprt->xp_active) {
425 		if (xprt->xp_thread == NULL)
426 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
427 		xprt->xp_active = FALSE;
428 	}
429 }
430 
431 void
432 xprt_inactive(SVCXPRT *xprt)
433 {
434 	SVCGROUP *grp = xprt->xp_group;
435 
436 	mtx_lock(&grp->sg_lock);
437 	xprt_inactive_locked(xprt);
438 	mtx_unlock(&grp->sg_lock);
439 }
440 
441 /*
442  * Variant of xprt_inactive() for use only when sure that port is
443  * assigned to thread. For example, within receive handlers.
444  */
445 void
446 xprt_inactive_self(SVCXPRT *xprt)
447 {
448 
449 	KASSERT(xprt->xp_thread != NULL,
450 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
451 	xprt->xp_active = FALSE;
452 }
453 
454 /*
455  * Add a service program to the callout list.
456  * The dispatch routine will be called when a rpc request for this
457  * program number comes in.
458  */
459 bool_t
460 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
461     void (*dispatch)(struct svc_req *, SVCXPRT *),
462     const struct netconfig *nconf)
463 {
464 	SVCPOOL *pool = xprt->xp_pool;
465 	struct svc_callout *s;
466 	char *netid = NULL;
467 	int flag = 0;
468 
469 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
470 
471 	if (xprt->xp_netid) {
472 		netid = strdup(xprt->xp_netid, M_RPC);
473 		flag = 1;
474 	} else if (nconf && nconf->nc_netid) {
475 		netid = strdup(nconf->nc_netid, M_RPC);
476 		flag = 1;
477 	} /* must have been created with svc_raw_create */
478 	if ((netid == NULL) && (flag == 1)) {
479 		return (FALSE);
480 	}
481 
482 	mtx_lock(&pool->sp_lock);
483 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
484 		if (netid)
485 			free(netid, M_RPC);
486 		if (s->sc_dispatch == dispatch)
487 			goto rpcb_it; /* he is registering another xptr */
488 		mtx_unlock(&pool->sp_lock);
489 		return (FALSE);
490 	}
491 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
492 	if (s == NULL) {
493 		if (netid)
494 			free(netid, M_RPC);
495 		mtx_unlock(&pool->sp_lock);
496 		return (FALSE);
497 	}
498 
499 	s->sc_prog = prog;
500 	s->sc_vers = vers;
501 	s->sc_dispatch = dispatch;
502 	s->sc_netid = netid;
503 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
504 
505 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
506 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
507 
508 rpcb_it:
509 	mtx_unlock(&pool->sp_lock);
510 	/* now register the information with the local binder service */
511 	if (nconf) {
512 		bool_t dummy;
513 		struct netconfig tnc;
514 		struct netbuf nb;
515 		tnc = *nconf;
516 		nb.buf = &xprt->xp_ltaddr;
517 		nb.len = xprt->xp_ltaddr.ss_len;
518 		dummy = rpcb_set(prog, vers, &tnc, &nb);
519 		return (dummy);
520 	}
521 	return (TRUE);
522 }
523 
524 /*
525  * Remove a service program from the callout list.
526  */
527 void
528 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
529 {
530 	struct svc_callout *s;
531 
532 	/* unregister the information anyway */
533 	(void) rpcb_unset(prog, vers, NULL);
534 	mtx_lock(&pool->sp_lock);
535 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
536 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
537 		if (s->sc_netid)
538 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
539 		mem_free(s, sizeof (struct svc_callout));
540 	}
541 	mtx_unlock(&pool->sp_lock);
542 }
543 
544 /*
545  * Add a service connection loss program to the callout list.
546  * The dispatch routine will be called when some port in ths pool die.
547  */
548 bool_t
549 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
550 {
551 	SVCPOOL *pool = xprt->xp_pool;
552 	struct svc_loss_callout *s;
553 
554 	mtx_lock(&pool->sp_lock);
555 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
556 		if (s->slc_dispatch == dispatch)
557 			break;
558 	}
559 	if (s != NULL) {
560 		mtx_unlock(&pool->sp_lock);
561 		return (TRUE);
562 	}
563 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
564 	if (s == NULL) {
565 		mtx_unlock(&pool->sp_lock);
566 		return (FALSE);
567 	}
568 	s->slc_dispatch = dispatch;
569 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
570 	mtx_unlock(&pool->sp_lock);
571 	return (TRUE);
572 }
573 
574 /*
575  * Remove a service connection loss program from the callout list.
576  */
577 void
578 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
579 {
580 	struct svc_loss_callout *s;
581 
582 	mtx_lock(&pool->sp_lock);
583 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
584 		if (s->slc_dispatch == dispatch) {
585 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
586 			free(s, M_RPC);
587 			break;
588 		}
589 	}
590 	mtx_unlock(&pool->sp_lock);
591 }
592 
593 /* ********************** CALLOUT list related stuff ************* */
594 
595 /*
596  * Search the callout list for a program number, return the callout
597  * struct.
598  */
599 static struct svc_callout *
600 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
601 {
602 	struct svc_callout *s;
603 
604 	mtx_assert(&pool->sp_lock, MA_OWNED);
605 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
606 		if (s->sc_prog == prog && s->sc_vers == vers
607 		    && (netid == NULL || s->sc_netid == NULL ||
608 			strcmp(netid, s->sc_netid) == 0))
609 			break;
610 	}
611 
612 	return (s);
613 }
614 
615 /* ******************* REPLY GENERATION ROUTINES  ************ */
616 
617 static bool_t
618 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
619     struct mbuf *body)
620 {
621 	SVCXPRT *xprt = rqstp->rq_xprt;
622 	bool_t ok;
623 
624 	if (rqstp->rq_args) {
625 		m_freem(rqstp->rq_args);
626 		rqstp->rq_args = NULL;
627 	}
628 
629 	if (xprt->xp_pool->sp_rcache)
630 		replay_setreply(xprt->xp_pool->sp_rcache,
631 		    rply, svc_getrpccaller(rqstp), body);
632 
633 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
634 		return (FALSE);
635 
636 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
637 	if (rqstp->rq_addr) {
638 		free(rqstp->rq_addr, M_SONAME);
639 		rqstp->rq_addr = NULL;
640 	}
641 
642 	return (ok);
643 }
644 
645 /*
646  * Send a reply to an rpc request
647  */
648 bool_t
649 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
650 {
651 	struct rpc_msg rply;
652 	struct mbuf *m;
653 	XDR xdrs;
654 	bool_t ok;
655 
656 	rply.rm_xid = rqstp->rq_xid;
657 	rply.rm_direction = REPLY;
658 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
659 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
660 	rply.acpted_rply.ar_stat = SUCCESS;
661 	rply.acpted_rply.ar_results.where = NULL;
662 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
663 
664 	m = m_getcl(M_WAITOK, MT_DATA, 0);
665 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
666 	ok = xdr_results(&xdrs, xdr_location);
667 	XDR_DESTROY(&xdrs);
668 
669 	if (ok) {
670 		return (svc_sendreply_common(rqstp, &rply, m));
671 	} else {
672 		m_freem(m);
673 		return (FALSE);
674 	}
675 }
676 
677 bool_t
678 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
679 {
680 	struct rpc_msg rply;
681 
682 	rply.rm_xid = rqstp->rq_xid;
683 	rply.rm_direction = REPLY;
684 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
685 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
686 	rply.acpted_rply.ar_stat = SUCCESS;
687 	rply.acpted_rply.ar_results.where = NULL;
688 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
689 
690 	return (svc_sendreply_common(rqstp, &rply, m));
691 }
692 
693 /*
694  * No procedure error reply
695  */
696 void
697 svcerr_noproc(struct svc_req *rqstp)
698 {
699 	SVCXPRT *xprt = rqstp->rq_xprt;
700 	struct rpc_msg rply;
701 
702 	rply.rm_xid = rqstp->rq_xid;
703 	rply.rm_direction = REPLY;
704 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
705 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
706 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
707 
708 	if (xprt->xp_pool->sp_rcache)
709 		replay_setreply(xprt->xp_pool->sp_rcache,
710 		    &rply, svc_getrpccaller(rqstp), NULL);
711 
712 	svc_sendreply_common(rqstp, &rply, NULL);
713 }
714 
715 /*
716  * Can't decode args error reply
717  */
718 void
719 svcerr_decode(struct svc_req *rqstp)
720 {
721 	SVCXPRT *xprt = rqstp->rq_xprt;
722 	struct rpc_msg rply;
723 
724 	rply.rm_xid = rqstp->rq_xid;
725 	rply.rm_direction = REPLY;
726 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
727 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
728 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
729 
730 	if (xprt->xp_pool->sp_rcache)
731 		replay_setreply(xprt->xp_pool->sp_rcache,
732 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
733 
734 	svc_sendreply_common(rqstp, &rply, NULL);
735 }
736 
737 /*
738  * Some system error
739  */
740 void
741 svcerr_systemerr(struct svc_req *rqstp)
742 {
743 	SVCXPRT *xprt = rqstp->rq_xprt;
744 	struct rpc_msg rply;
745 
746 	rply.rm_xid = rqstp->rq_xid;
747 	rply.rm_direction = REPLY;
748 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
749 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
750 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
751 
752 	if (xprt->xp_pool->sp_rcache)
753 		replay_setreply(xprt->xp_pool->sp_rcache,
754 		    &rply, svc_getrpccaller(rqstp), NULL);
755 
756 	svc_sendreply_common(rqstp, &rply, NULL);
757 }
758 
759 /*
760  * Authentication error reply
761  */
762 void
763 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
764 {
765 	SVCXPRT *xprt = rqstp->rq_xprt;
766 	struct rpc_msg rply;
767 
768 	rply.rm_xid = rqstp->rq_xid;
769 	rply.rm_direction = REPLY;
770 	rply.rm_reply.rp_stat = MSG_DENIED;
771 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
772 	rply.rjcted_rply.rj_why = why;
773 
774 	if (xprt->xp_pool->sp_rcache)
775 		replay_setreply(xprt->xp_pool->sp_rcache,
776 		    &rply, svc_getrpccaller(rqstp), NULL);
777 
778 	svc_sendreply_common(rqstp, &rply, NULL);
779 }
780 
781 /*
782  * Auth too weak error reply
783  */
784 void
785 svcerr_weakauth(struct svc_req *rqstp)
786 {
787 
788 	svcerr_auth(rqstp, AUTH_TOOWEAK);
789 }
790 
791 /*
792  * Program unavailable error reply
793  */
794 void
795 svcerr_noprog(struct svc_req *rqstp)
796 {
797 	SVCXPRT *xprt = rqstp->rq_xprt;
798 	struct rpc_msg rply;
799 
800 	rply.rm_xid = rqstp->rq_xid;
801 	rply.rm_direction = REPLY;
802 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
803 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
804 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
805 
806 	if (xprt->xp_pool->sp_rcache)
807 		replay_setreply(xprt->xp_pool->sp_rcache,
808 		    &rply, svc_getrpccaller(rqstp), NULL);
809 
810 	svc_sendreply_common(rqstp, &rply, NULL);
811 }
812 
813 /*
814  * Program version mismatch error reply
815  */
816 void
817 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
818 {
819 	SVCXPRT *xprt = rqstp->rq_xprt;
820 	struct rpc_msg rply;
821 
822 	rply.rm_xid = rqstp->rq_xid;
823 	rply.rm_direction = REPLY;
824 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
825 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
826 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
827 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
828 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
829 
830 	if (xprt->xp_pool->sp_rcache)
831 		replay_setreply(xprt->xp_pool->sp_rcache,
832 		    &rply, svc_getrpccaller(rqstp), NULL);
833 
834 	svc_sendreply_common(rqstp, &rply, NULL);
835 }
836 
837 /*
838  * Allocate a new server transport structure. All fields are
839  * initialized to zero and xp_p3 is initialized to point at an
840  * extension structure to hold various flags and authentication
841  * parameters.
842  */
843 SVCXPRT *
844 svc_xprt_alloc()
845 {
846 	SVCXPRT *xprt;
847 	SVCXPRT_EXT *ext;
848 
849 	xprt = mem_alloc(sizeof(SVCXPRT));
850 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
851 	xprt->xp_p3 = ext;
852 	refcount_init(&xprt->xp_refs, 1);
853 
854 	return (xprt);
855 }
856 
857 /*
858  * Free a server transport structure.
859  */
860 void
861 svc_xprt_free(xprt)
862 	SVCXPRT *xprt;
863 {
864 
865 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
866 	mem_free(xprt, sizeof(SVCXPRT));
867 }
868 
869 /* ******************* SERVER INPUT STUFF ******************* */
870 
871 /*
872  * Read RPC requests from a transport and queue them to be
873  * executed. We handle authentication and replay cache replies here.
874  * Actually dispatching the RPC is deferred till svc_executereq.
875  */
876 static enum xprt_stat
877 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
878 {
879 	SVCPOOL *pool = xprt->xp_pool;
880 	struct svc_req *r;
881 	struct rpc_msg msg;
882 	struct mbuf *args;
883 	struct svc_loss_callout *s;
884 	enum xprt_stat stat;
885 
886 	/* now receive msgs from xprtprt (support batch calls) */
887 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
888 
889 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
890 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
891 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
892 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
893 		enum auth_stat why;
894 
895 		/*
896 		 * Handle replays and authenticate before queuing the
897 		 * request to be executed.
898 		 */
899 		SVC_ACQUIRE(xprt);
900 		r->rq_xprt = xprt;
901 		if (pool->sp_rcache) {
902 			struct rpc_msg repmsg;
903 			struct mbuf *repbody;
904 			enum replay_state rs;
905 			rs = replay_find(pool->sp_rcache, &msg,
906 			    svc_getrpccaller(r), &repmsg, &repbody);
907 			switch (rs) {
908 			case RS_NEW:
909 				break;
910 			case RS_DONE:
911 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
912 				    repbody, &r->rq_reply_seq);
913 				if (r->rq_addr) {
914 					free(r->rq_addr, M_SONAME);
915 					r->rq_addr = NULL;
916 				}
917 				m_freem(args);
918 				goto call_done;
919 
920 			default:
921 				m_freem(args);
922 				goto call_done;
923 			}
924 		}
925 
926 		r->rq_xid = msg.rm_xid;
927 		r->rq_prog = msg.rm_call.cb_prog;
928 		r->rq_vers = msg.rm_call.cb_vers;
929 		r->rq_proc = msg.rm_call.cb_proc;
930 		r->rq_size = sizeof(*r) + m_length(args, NULL);
931 		r->rq_args = args;
932 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
933 			/*
934 			 * RPCSEC_GSS uses this return code
935 			 * for requests that form part of its
936 			 * context establishment protocol and
937 			 * should not be dispatched to the
938 			 * application.
939 			 */
940 			if (why != RPCSEC_GSS_NODISPATCH)
941 				svcerr_auth(r, why);
942 			goto call_done;
943 		}
944 
945 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
946 			svcerr_decode(r);
947 			goto call_done;
948 		}
949 
950 		/*
951 		 * Everything checks out, return request to caller.
952 		 */
953 		*rqstp_ret = r;
954 		r = NULL;
955 	}
956 call_done:
957 	if (r) {
958 		svc_freereq(r);
959 		r = NULL;
960 	}
961 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
962 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
963 			(*s->slc_dispatch)(xprt);
964 		xprt_unregister(xprt);
965 	}
966 
967 	return (stat);
968 }
969 
970 static void
971 svc_executereq(struct svc_req *rqstp)
972 {
973 	SVCXPRT *xprt = rqstp->rq_xprt;
974 	SVCPOOL *pool = xprt->xp_pool;
975 	int prog_found;
976 	rpcvers_t low_vers;
977 	rpcvers_t high_vers;
978 	struct svc_callout *s;
979 
980 	/* now match message with a registered service*/
981 	prog_found = FALSE;
982 	low_vers = (rpcvers_t) -1L;
983 	high_vers = (rpcvers_t) 0L;
984 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
985 		if (s->sc_prog == rqstp->rq_prog) {
986 			if (s->sc_vers == rqstp->rq_vers) {
987 				/*
988 				 * We hand ownership of r to the
989 				 * dispatch method - they must call
990 				 * svc_freereq.
991 				 */
992 				(*s->sc_dispatch)(rqstp, xprt);
993 				return;
994 			}  /* found correct version */
995 			prog_found = TRUE;
996 			if (s->sc_vers < low_vers)
997 				low_vers = s->sc_vers;
998 			if (s->sc_vers > high_vers)
999 				high_vers = s->sc_vers;
1000 		}   /* found correct program */
1001 	}
1002 
1003 	/*
1004 	 * if we got here, the program or version
1005 	 * is not served ...
1006 	 */
1007 	if (prog_found)
1008 		svcerr_progvers(rqstp, low_vers, high_vers);
1009 	else
1010 		svcerr_noprog(rqstp);
1011 
1012 	svc_freereq(rqstp);
1013 }
1014 
1015 static void
1016 svc_checkidle(SVCGROUP *grp)
1017 {
1018 	SVCXPRT *xprt, *nxprt;
1019 	time_t timo;
1020 	struct svcxprt_list cleanup;
1021 
1022 	TAILQ_INIT(&cleanup);
1023 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1024 		/*
1025 		 * Only some transports have idle timers. Don't time
1026 		 * something out which is just waking up.
1027 		 */
1028 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1029 			continue;
1030 
1031 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1032 		if (time_uptime > timo) {
1033 			xprt_unregister_locked(xprt);
1034 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1035 		}
1036 	}
1037 
1038 	mtx_unlock(&grp->sg_lock);
1039 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1040 		SVC_RELEASE(xprt);
1041 	}
1042 	mtx_lock(&grp->sg_lock);
1043 }
1044 
1045 static void
1046 svc_assign_waiting_sockets(SVCPOOL *pool)
1047 {
1048 	SVCGROUP *grp;
1049 	SVCXPRT *xprt;
1050 	int g;
1051 
1052 	for (g = 0; g < pool->sp_groupcount; g++) {
1053 		grp = &pool->sp_groups[g];
1054 		mtx_lock(&grp->sg_lock);
1055 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1056 			if (xprt_assignthread(xprt))
1057 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1058 			else
1059 				break;
1060 		}
1061 		mtx_unlock(&grp->sg_lock);
1062 	}
1063 }
1064 
1065 static void
1066 svc_change_space_used(SVCPOOL *pool, long delta)
1067 {
1068 	unsigned long value;
1069 
1070 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1071 	if (delta > 0) {
1072 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1073 			pool->sp_space_throttled = TRUE;
1074 			pool->sp_space_throttle_count++;
1075 		}
1076 		if (value > pool->sp_space_used_highest)
1077 			pool->sp_space_used_highest = value;
1078 	} else {
1079 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1080 			pool->sp_space_throttled = FALSE;
1081 			svc_assign_waiting_sockets(pool);
1082 		}
1083 	}
1084 }
1085 
1086 static bool_t
1087 svc_request_space_available(SVCPOOL *pool)
1088 {
1089 
1090 	if (pool->sp_space_throttled)
1091 		return (FALSE);
1092 	return (TRUE);
1093 }
1094 
1095 static void
1096 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1097 {
1098 	SVCPOOL *pool = grp->sg_pool;
1099 	SVCTHREAD *st, *stpref;
1100 	SVCXPRT *xprt;
1101 	enum xprt_stat stat;
1102 	struct svc_req *rqstp;
1103 	struct proc *p;
1104 	long sz;
1105 	int error;
1106 
1107 	st = mem_alloc(sizeof(*st));
1108 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1109 	st->st_pool = pool;
1110 	st->st_xprt = NULL;
1111 	STAILQ_INIT(&st->st_reqs);
1112 	cv_init(&st->st_cond, "rpcsvc");
1113 
1114 	mtx_lock(&grp->sg_lock);
1115 
1116 	/*
1117 	 * If we are a new thread which was spawned to cope with
1118 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1119 	 */
1120 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1121 		grp->sg_state = SVCPOOL_ACTIVE;
1122 
1123 	while (grp->sg_state != SVCPOOL_CLOSING) {
1124 		/*
1125 		 * Create new thread if requested.
1126 		 */
1127 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1128 			grp->sg_state = SVCPOOL_THREADSTARTING;
1129 			grp->sg_lastcreatetime = time_uptime;
1130 			mtx_unlock(&grp->sg_lock);
1131 			svc_new_thread(grp);
1132 			mtx_lock(&grp->sg_lock);
1133 			continue;
1134 		}
1135 
1136 		/*
1137 		 * Check for idle transports once per second.
1138 		 */
1139 		if (time_uptime > grp->sg_lastidlecheck) {
1140 			grp->sg_lastidlecheck = time_uptime;
1141 			svc_checkidle(grp);
1142 		}
1143 
1144 		xprt = st->st_xprt;
1145 		if (!xprt) {
1146 			/*
1147 			 * Enforce maxthreads count.
1148 			 */
1149 			if (grp->sg_threadcount > grp->sg_maxthreads)
1150 				break;
1151 
1152 			/*
1153 			 * Before sleeping, see if we can find an
1154 			 * active transport which isn't being serviced
1155 			 * by a thread.
1156 			 */
1157 			if (svc_request_space_available(pool) &&
1158 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1159 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1160 				SVC_ACQUIRE(xprt);
1161 				xprt->xp_thread = st;
1162 				st->st_xprt = xprt;
1163 				continue;
1164 			}
1165 
1166 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1167 			if (ismaster || (!ismaster &&
1168 			    grp->sg_threadcount > grp->sg_minthreads))
1169 				error = cv_timedwait_sig(&st->st_cond,
1170 				    &grp->sg_lock, 5 * hz);
1171 			else
1172 				error = cv_wait_sig(&st->st_cond,
1173 				    &grp->sg_lock);
1174 			if (st->st_xprt == NULL)
1175 				LIST_REMOVE(st, st_ilink);
1176 
1177 			/*
1178 			 * Reduce worker thread count when idle.
1179 			 */
1180 			if (error == EWOULDBLOCK) {
1181 				if (!ismaster
1182 				    && (grp->sg_threadcount
1183 					> grp->sg_minthreads)
1184 					&& !st->st_xprt)
1185 					break;
1186 			} else if (error != 0) {
1187 				KASSERT(error == EINTR || error == ERESTART,
1188 				    ("non-signal error %d", error));
1189 				mtx_unlock(&grp->sg_lock);
1190 				p = curproc;
1191 				PROC_LOCK(p);
1192 				if (P_SHOULDSTOP(p) ||
1193 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1194 					thread_suspend_check(0);
1195 					PROC_UNLOCK(p);
1196 					mtx_lock(&grp->sg_lock);
1197 				} else {
1198 					PROC_UNLOCK(p);
1199 					svc_exit(pool);
1200 					mtx_lock(&grp->sg_lock);
1201 					break;
1202 				}
1203 			}
1204 			continue;
1205 		}
1206 		mtx_unlock(&grp->sg_lock);
1207 
1208 		/*
1209 		 * Drain the transport socket and queue up any RPCs.
1210 		 */
1211 		xprt->xp_lastactive = time_uptime;
1212 		do {
1213 			if (!svc_request_space_available(pool))
1214 				break;
1215 			rqstp = NULL;
1216 			stat = svc_getreq(xprt, &rqstp);
1217 			if (rqstp) {
1218 				svc_change_space_used(pool, rqstp->rq_size);
1219 				/*
1220 				 * See if the application has a preference
1221 				 * for some other thread.
1222 				 */
1223 				if (pool->sp_assign) {
1224 					stpref = pool->sp_assign(st, rqstp);
1225 					rqstp->rq_thread = stpref;
1226 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1227 					    rqstp, rq_link);
1228 					mtx_unlock(&stpref->st_lock);
1229 					if (stpref != st)
1230 						rqstp = NULL;
1231 				} else {
1232 					rqstp->rq_thread = st;
1233 					STAILQ_INSERT_TAIL(&st->st_reqs,
1234 					    rqstp, rq_link);
1235 				}
1236 			}
1237 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1238 		    && grp->sg_state != SVCPOOL_CLOSING);
1239 
1240 		/*
1241 		 * Move this transport to the end of the active list to
1242 		 * ensure fairness when multiple transports are active.
1243 		 * If this was the last queued request, svc_getreq will end
1244 		 * up calling xprt_inactive to remove from the active list.
1245 		 */
1246 		mtx_lock(&grp->sg_lock);
1247 		xprt->xp_thread = NULL;
1248 		st->st_xprt = NULL;
1249 		if (xprt->xp_active) {
1250 			if (!svc_request_space_available(pool) ||
1251 			    !xprt_assignthread(xprt))
1252 				TAILQ_INSERT_TAIL(&grp->sg_active,
1253 				    xprt, xp_alink);
1254 		}
1255 		mtx_unlock(&grp->sg_lock);
1256 		SVC_RELEASE(xprt);
1257 
1258 		/*
1259 		 * Execute what we have queued.
1260 		 */
1261 		mtx_lock(&st->st_lock);
1262 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1263 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1264 			mtx_unlock(&st->st_lock);
1265 			sz = (long)rqstp->rq_size;
1266 			svc_executereq(rqstp);
1267 			svc_change_space_used(pool, -sz);
1268 			mtx_lock(&st->st_lock);
1269 		}
1270 		mtx_unlock(&st->st_lock);
1271 		mtx_lock(&grp->sg_lock);
1272 	}
1273 
1274 	if (st->st_xprt) {
1275 		xprt = st->st_xprt;
1276 		st->st_xprt = NULL;
1277 		SVC_RELEASE(xprt);
1278 	}
1279 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1280 	mtx_destroy(&st->st_lock);
1281 	cv_destroy(&st->st_cond);
1282 	mem_free(st, sizeof(*st));
1283 
1284 	grp->sg_threadcount--;
1285 	if (!ismaster)
1286 		wakeup(grp);
1287 	mtx_unlock(&grp->sg_lock);
1288 }
1289 
1290 static void
1291 svc_thread_start(void *arg)
1292 {
1293 
1294 	svc_run_internal((SVCGROUP *) arg, FALSE);
1295 	kthread_exit();
1296 }
1297 
1298 static void
1299 svc_new_thread(SVCGROUP *grp)
1300 {
1301 	SVCPOOL *pool = grp->sg_pool;
1302 	struct thread *td;
1303 
1304 	mtx_lock(&grp->sg_lock);
1305 	grp->sg_threadcount++;
1306 	mtx_unlock(&grp->sg_lock);
1307 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1308 	    "%s: service", pool->sp_name);
1309 }
1310 
1311 void
1312 svc_run(SVCPOOL *pool)
1313 {
1314 	int g, i;
1315 	struct proc *p;
1316 	struct thread *td;
1317 	SVCGROUP *grp;
1318 
1319 	p = curproc;
1320 	td = curthread;
1321 	snprintf(td->td_name, sizeof(td->td_name),
1322 	    "%s: master", pool->sp_name);
1323 	pool->sp_state = SVCPOOL_ACTIVE;
1324 	pool->sp_proc = p;
1325 
1326 	/* Choose group count based on number of threads and CPUs. */
1327 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1328 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1329 	for (g = 0; g < pool->sp_groupcount; g++) {
1330 		grp = &pool->sp_groups[g];
1331 		grp->sg_minthreads = max(1,
1332 		    pool->sp_minthreads / pool->sp_groupcount);
1333 		grp->sg_maxthreads = max(1,
1334 		    pool->sp_maxthreads / pool->sp_groupcount);
1335 		grp->sg_lastcreatetime = time_uptime;
1336 	}
1337 
1338 	/* Starting threads */
1339 	pool->sp_groups[0].sg_threadcount++;
1340 	for (g = 0; g < pool->sp_groupcount; g++) {
1341 		grp = &pool->sp_groups[g];
1342 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1343 			svc_new_thread(grp);
1344 	}
1345 	svc_run_internal(&pool->sp_groups[0], TRUE);
1346 
1347 	/* Waiting for threads to stop. */
1348 	for (g = 0; g < pool->sp_groupcount; g++) {
1349 		grp = &pool->sp_groups[g];
1350 		mtx_lock(&grp->sg_lock);
1351 		while (grp->sg_threadcount > 0)
1352 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1353 		mtx_unlock(&grp->sg_lock);
1354 	}
1355 }
1356 
1357 void
1358 svc_exit(SVCPOOL *pool)
1359 {
1360 	SVCGROUP *grp;
1361 	SVCTHREAD *st;
1362 	int g;
1363 
1364 	pool->sp_state = SVCPOOL_CLOSING;
1365 	for (g = 0; g < pool->sp_groupcount; g++) {
1366 		grp = &pool->sp_groups[g];
1367 		mtx_lock(&grp->sg_lock);
1368 		if (grp->sg_state != SVCPOOL_CLOSING) {
1369 			grp->sg_state = SVCPOOL_CLOSING;
1370 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1371 				cv_signal(&st->st_cond);
1372 		}
1373 		mtx_unlock(&grp->sg_lock);
1374 	}
1375 }
1376 
1377 bool_t
1378 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1379 {
1380 	struct mbuf *m;
1381 	XDR xdrs;
1382 	bool_t stat;
1383 
1384 	m = rqstp->rq_args;
1385 	rqstp->rq_args = NULL;
1386 
1387 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1388 	stat = xargs(&xdrs, args);
1389 	XDR_DESTROY(&xdrs);
1390 
1391 	return (stat);
1392 }
1393 
1394 bool_t
1395 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1396 {
1397 	XDR xdrs;
1398 
1399 	if (rqstp->rq_addr) {
1400 		free(rqstp->rq_addr, M_SONAME);
1401 		rqstp->rq_addr = NULL;
1402 	}
1403 
1404 	xdrs.x_op = XDR_FREE;
1405 	return (xargs(&xdrs, args));
1406 }
1407 
1408 void
1409 svc_freereq(struct svc_req *rqstp)
1410 {
1411 	SVCTHREAD *st;
1412 	SVCPOOL *pool;
1413 
1414 	st = rqstp->rq_thread;
1415 	if (st) {
1416 		pool = st->st_pool;
1417 		if (pool->sp_done)
1418 			pool->sp_done(st, rqstp);
1419 	}
1420 
1421 	if (rqstp->rq_auth.svc_ah_ops)
1422 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1423 
1424 	if (rqstp->rq_xprt) {
1425 		SVC_RELEASE(rqstp->rq_xprt);
1426 	}
1427 
1428 	if (rqstp->rq_addr)
1429 		free(rqstp->rq_addr, M_SONAME);
1430 
1431 	if (rqstp->rq_args)
1432 		m_freem(rqstp->rq_args);
1433 
1434 	free(rqstp, M_RPC);
1435 }
1436