xref: /freebsd/sys/rpc/svc.c (revision 907b59d76938e654f0d040a888e8dfca3de1e222)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice,
12  *   this list of conditions and the following disclaimer in the documentation
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37 
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62 
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66 
67 #include <rpc/rpc_com.h>
68 
69 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73     char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 
79 /* ***************  SVCXPRT related stuff **************** */
80 
81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84 
85 SVCPOOL*
86 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
87 {
88 	SVCPOOL *pool;
89 	SVCGROUP *grp;
90 	int g;
91 
92 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
93 
94 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95 	pool->sp_name = name;
96 	pool->sp_state = SVCPOOL_INIT;
97 	pool->sp_proc = NULL;
98 	TAILQ_INIT(&pool->sp_callouts);
99 	TAILQ_INIT(&pool->sp_lcallouts);
100 	pool->sp_minthreads = 1;
101 	pool->sp_maxthreads = 1;
102 	pool->sp_groupcount = 1;
103 	for (g = 0; g < SVC_MAXGROUPS; g++) {
104 		grp = &pool->sp_groups[g];
105 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
106 		grp->sg_pool = pool;
107 		grp->sg_state = SVCPOOL_ACTIVE;
108 		TAILQ_INIT(&grp->sg_xlist);
109 		TAILQ_INIT(&grp->sg_active);
110 		LIST_INIT(&grp->sg_idlethreads);
111 		grp->sg_minthreads = 1;
112 		grp->sg_maxthreads = 1;
113 	}
114 
115 	/*
116 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
117 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
118 	 * on LP64 architectures, so cast to u_long to avoid undefined
119 	 * behavior.  (ILP32 architectures cannot have nmbclusters
120 	 * large enough to overflow for other reasons.)
121 	 */
122 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
123 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
124 
125 	sysctl_ctx_init(&pool->sp_sysctl);
126 	if (sysctl_base) {
127 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
128 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
129 		    pool, 0, svcpool_minthread_sysctl, "I",
130 		    "Minimal number of threads");
131 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
133 		    pool, 0, svcpool_maxthread_sysctl, "I",
134 		    "Maximal number of threads");
135 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 		    "threads", CTLTYPE_INT | CTLFLAG_RD,
137 		    pool, 0, svcpool_threads_sysctl, "I",
138 		    "Current number of threads");
139 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
141 		    "Number of thread groups");
142 
143 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144 		    "request_space_used", CTLFLAG_RD,
145 		    &pool->sp_space_used,
146 		    "Space in parsed but not handled requests.");
147 
148 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
149 		    "request_space_used_highest", CTLFLAG_RD,
150 		    &pool->sp_space_used_highest,
151 		    "Highest space used since reboot.");
152 
153 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
154 		    "request_space_high", CTLFLAG_RW,
155 		    &pool->sp_space_high,
156 		    "Maximum space in parsed but not handled requests.");
157 
158 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
159 		    "request_space_low", CTLFLAG_RW,
160 		    &pool->sp_space_low,
161 		    "Low water mark for request space.");
162 
163 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
164 		    "request_space_throttled", CTLFLAG_RD,
165 		    &pool->sp_space_throttled, 0,
166 		    "Whether nfs requests are currently throttled");
167 
168 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
169 		    "request_space_throttle_count", CTLFLAG_RD,
170 		    &pool->sp_space_throttle_count, 0,
171 		    "Count of times throttling based on request space has occurred");
172 	}
173 
174 	return pool;
175 }
176 
177 void
178 svcpool_destroy(SVCPOOL *pool)
179 {
180 	SVCGROUP *grp;
181 	SVCXPRT *xprt, *nxprt;
182 	struct svc_callout *s;
183 	struct svc_loss_callout *sl;
184 	struct svcxprt_list cleanup;
185 	int g;
186 
187 	TAILQ_INIT(&cleanup);
188 
189 	for (g = 0; g < SVC_MAXGROUPS; g++) {
190 		grp = &pool->sp_groups[g];
191 		mtx_lock(&grp->sg_lock);
192 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
193 			xprt_unregister_locked(xprt);
194 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
195 		}
196 		mtx_unlock(&grp->sg_lock);
197 	}
198 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
199 		SVC_RELEASE(xprt);
200 	}
201 
202 	mtx_lock(&pool->sp_lock);
203 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
204 		mtx_unlock(&pool->sp_lock);
205 		svc_unreg(pool, s->sc_prog, s->sc_vers);
206 		mtx_lock(&pool->sp_lock);
207 	}
208 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
209 		mtx_unlock(&pool->sp_lock);
210 		svc_loss_unreg(pool, sl->slc_dispatch);
211 		mtx_lock(&pool->sp_lock);
212 	}
213 	mtx_unlock(&pool->sp_lock);
214 
215 	for (g = 0; g < SVC_MAXGROUPS; g++) {
216 		grp = &pool->sp_groups[g];
217 		mtx_destroy(&grp->sg_lock);
218 	}
219 	mtx_destroy(&pool->sp_lock);
220 
221 	if (pool->sp_rcache)
222 		replay_freecache(pool->sp_rcache);
223 
224 	sysctl_ctx_free(&pool->sp_sysctl);
225 	free(pool, M_RPC);
226 }
227 
228 /*
229  * Sysctl handler to get the present thread count on a pool
230  */
231 static int
232 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
233 {
234 	SVCPOOL *pool;
235 	int threads, error, g;
236 
237 	pool = oidp->oid_arg1;
238 	threads = 0;
239 	mtx_lock(&pool->sp_lock);
240 	for (g = 0; g < pool->sp_groupcount; g++)
241 		threads += pool->sp_groups[g].sg_threadcount;
242 	mtx_unlock(&pool->sp_lock);
243 	error = sysctl_handle_int(oidp, &threads, 0, req);
244 	return (error);
245 }
246 
247 /*
248  * Sysctl handler to set the minimum thread count on a pool
249  */
250 static int
251 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
252 {
253 	SVCPOOL *pool;
254 	int newminthreads, error, g;
255 
256 	pool = oidp->oid_arg1;
257 	newminthreads = pool->sp_minthreads;
258 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
259 	if (error == 0 && newminthreads != pool->sp_minthreads) {
260 		if (newminthreads > pool->sp_maxthreads)
261 			return (EINVAL);
262 		mtx_lock(&pool->sp_lock);
263 		pool->sp_minthreads = newminthreads;
264 		for (g = 0; g < pool->sp_groupcount; g++) {
265 			pool->sp_groups[g].sg_minthreads = max(1,
266 			    pool->sp_minthreads / pool->sp_groupcount);
267 		}
268 		mtx_unlock(&pool->sp_lock);
269 	}
270 	return (error);
271 }
272 
273 /*
274  * Sysctl handler to set the maximum thread count on a pool
275  */
276 static int
277 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
278 {
279 	SVCPOOL *pool;
280 	int newmaxthreads, error, g;
281 
282 	pool = oidp->oid_arg1;
283 	newmaxthreads = pool->sp_maxthreads;
284 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
285 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
286 		if (newmaxthreads < pool->sp_minthreads)
287 			return (EINVAL);
288 		mtx_lock(&pool->sp_lock);
289 		pool->sp_maxthreads = newmaxthreads;
290 		for (g = 0; g < pool->sp_groupcount; g++) {
291 			pool->sp_groups[g].sg_maxthreads = max(1,
292 			    pool->sp_maxthreads / pool->sp_groupcount);
293 		}
294 		mtx_unlock(&pool->sp_lock);
295 	}
296 	return (error);
297 }
298 
299 /*
300  * Activate a transport handle.
301  */
302 void
303 xprt_register(SVCXPRT *xprt)
304 {
305 	SVCPOOL *pool = xprt->xp_pool;
306 	SVCGROUP *grp;
307 	int g;
308 
309 	SVC_ACQUIRE(xprt);
310 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
311 	xprt->xp_group = grp = &pool->sp_groups[g];
312 	mtx_lock(&grp->sg_lock);
313 	xprt->xp_registered = TRUE;
314 	xprt->xp_active = FALSE;
315 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
316 	mtx_unlock(&grp->sg_lock);
317 }
318 
319 /*
320  * De-activate a transport handle. Note: the locked version doesn't
321  * release the transport - caller must do that after dropping the pool
322  * lock.
323  */
324 static void
325 xprt_unregister_locked(SVCXPRT *xprt)
326 {
327 	SVCGROUP *grp = xprt->xp_group;
328 
329 	mtx_assert(&grp->sg_lock, MA_OWNED);
330 	KASSERT(xprt->xp_registered == TRUE,
331 	    ("xprt_unregister_locked: not registered"));
332 	xprt_inactive_locked(xprt);
333 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
334 	xprt->xp_registered = FALSE;
335 }
336 
337 void
338 xprt_unregister(SVCXPRT *xprt)
339 {
340 	SVCGROUP *grp = xprt->xp_group;
341 
342 	mtx_lock(&grp->sg_lock);
343 	if (xprt->xp_registered == FALSE) {
344 		/* Already unregistered by another thread */
345 		mtx_unlock(&grp->sg_lock);
346 		return;
347 	}
348 	xprt_unregister_locked(xprt);
349 	mtx_unlock(&grp->sg_lock);
350 
351 	SVC_RELEASE(xprt);
352 }
353 
354 /*
355  * Attempt to assign a service thread to this transport.
356  */
357 static int
358 xprt_assignthread(SVCXPRT *xprt)
359 {
360 	SVCGROUP *grp = xprt->xp_group;
361 	SVCTHREAD *st;
362 
363 	mtx_assert(&grp->sg_lock, MA_OWNED);
364 	st = LIST_FIRST(&grp->sg_idlethreads);
365 	if (st) {
366 		LIST_REMOVE(st, st_ilink);
367 		SVC_ACQUIRE(xprt);
368 		xprt->xp_thread = st;
369 		st->st_xprt = xprt;
370 		cv_signal(&st->st_cond);
371 		return (TRUE);
372 	} else {
373 		/*
374 		 * See if we can create a new thread. The
375 		 * actual thread creation happens in
376 		 * svc_run_internal because our locking state
377 		 * is poorly defined (we are typically called
378 		 * from a socket upcall). Don't create more
379 		 * than one thread per second.
380 		 */
381 		if (grp->sg_state == SVCPOOL_ACTIVE
382 		    && grp->sg_lastcreatetime < time_uptime
383 		    && grp->sg_threadcount < grp->sg_maxthreads) {
384 			grp->sg_state = SVCPOOL_THREADWANTED;
385 		}
386 	}
387 	return (FALSE);
388 }
389 
390 void
391 xprt_active(SVCXPRT *xprt)
392 {
393 	SVCGROUP *grp = xprt->xp_group;
394 
395 	mtx_lock(&grp->sg_lock);
396 
397 	if (!xprt->xp_registered) {
398 		/*
399 		 * Race with xprt_unregister - we lose.
400 		 */
401 		mtx_unlock(&grp->sg_lock);
402 		return;
403 	}
404 
405 	if (!xprt->xp_active) {
406 		xprt->xp_active = TRUE;
407 		if (xprt->xp_thread == NULL) {
408 			if (!svc_request_space_available(xprt->xp_pool) ||
409 			    !xprt_assignthread(xprt))
410 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
411 				    xp_alink);
412 		}
413 	}
414 
415 	mtx_unlock(&grp->sg_lock);
416 }
417 
418 void
419 xprt_inactive_locked(SVCXPRT *xprt)
420 {
421 	SVCGROUP *grp = xprt->xp_group;
422 
423 	mtx_assert(&grp->sg_lock, MA_OWNED);
424 	if (xprt->xp_active) {
425 		if (xprt->xp_thread == NULL)
426 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
427 		xprt->xp_active = FALSE;
428 	}
429 }
430 
431 void
432 xprt_inactive(SVCXPRT *xprt)
433 {
434 	SVCGROUP *grp = xprt->xp_group;
435 
436 	mtx_lock(&grp->sg_lock);
437 	xprt_inactive_locked(xprt);
438 	mtx_unlock(&grp->sg_lock);
439 }
440 
441 /*
442  * Variant of xprt_inactive() for use only when sure that port is
443  * assigned to thread. For example, within receive handlers.
444  */
445 void
446 xprt_inactive_self(SVCXPRT *xprt)
447 {
448 
449 	KASSERT(xprt->xp_thread != NULL,
450 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
451 	xprt->xp_active = FALSE;
452 }
453 
454 /*
455  * Add a service program to the callout list.
456  * The dispatch routine will be called when a rpc request for this
457  * program number comes in.
458  */
459 bool_t
460 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
461     void (*dispatch)(struct svc_req *, SVCXPRT *),
462     const struct netconfig *nconf)
463 {
464 	SVCPOOL *pool = xprt->xp_pool;
465 	struct svc_callout *s;
466 	char *netid = NULL;
467 	int flag = 0;
468 
469 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
470 
471 	if (xprt->xp_netid) {
472 		netid = strdup(xprt->xp_netid, M_RPC);
473 		flag = 1;
474 	} else if (nconf && nconf->nc_netid) {
475 		netid = strdup(nconf->nc_netid, M_RPC);
476 		flag = 1;
477 	} /* must have been created with svc_raw_create */
478 	if ((netid == NULL) && (flag == 1)) {
479 		return (FALSE);
480 	}
481 
482 	mtx_lock(&pool->sp_lock);
483 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
484 		if (netid)
485 			free(netid, M_RPC);
486 		if (s->sc_dispatch == dispatch)
487 			goto rpcb_it; /* he is registering another xptr */
488 		mtx_unlock(&pool->sp_lock);
489 		return (FALSE);
490 	}
491 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
492 	if (s == NULL) {
493 		if (netid)
494 			free(netid, M_RPC);
495 		mtx_unlock(&pool->sp_lock);
496 		return (FALSE);
497 	}
498 
499 	s->sc_prog = prog;
500 	s->sc_vers = vers;
501 	s->sc_dispatch = dispatch;
502 	s->sc_netid = netid;
503 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
504 
505 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
506 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
507 
508 rpcb_it:
509 	mtx_unlock(&pool->sp_lock);
510 	/* now register the information with the local binder service */
511 	if (nconf) {
512 		bool_t dummy;
513 		struct netconfig tnc;
514 		struct netbuf nb;
515 		tnc = *nconf;
516 		nb.buf = &xprt->xp_ltaddr;
517 		nb.len = xprt->xp_ltaddr.ss_len;
518 		dummy = rpcb_set(prog, vers, &tnc, &nb);
519 		return (dummy);
520 	}
521 	return (TRUE);
522 }
523 
524 /*
525  * Remove a service program from the callout list.
526  */
527 void
528 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
529 {
530 	struct svc_callout *s;
531 
532 	/* unregister the information anyway */
533 	(void) rpcb_unset(prog, vers, NULL);
534 	mtx_lock(&pool->sp_lock);
535 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
536 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
537 		if (s->sc_netid)
538 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
539 		mem_free(s, sizeof (struct svc_callout));
540 	}
541 	mtx_unlock(&pool->sp_lock);
542 }
543 
544 /*
545  * Add a service connection loss program to the callout list.
546  * The dispatch routine will be called when some port in ths pool die.
547  */
548 bool_t
549 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
550 {
551 	SVCPOOL *pool = xprt->xp_pool;
552 	struct svc_loss_callout *s;
553 
554 	mtx_lock(&pool->sp_lock);
555 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
556 		if (s->slc_dispatch == dispatch)
557 			break;
558 	}
559 	if (s != NULL) {
560 		mtx_unlock(&pool->sp_lock);
561 		return (TRUE);
562 	}
563 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
564 	if (s == NULL) {
565 		mtx_unlock(&pool->sp_lock);
566 		return (FALSE);
567 	}
568 	s->slc_dispatch = dispatch;
569 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
570 	mtx_unlock(&pool->sp_lock);
571 	return (TRUE);
572 }
573 
574 /*
575  * Remove a service connection loss program from the callout list.
576  */
577 void
578 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
579 {
580 	struct svc_loss_callout *s;
581 
582 	mtx_lock(&pool->sp_lock);
583 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
584 		if (s->slc_dispatch == dispatch) {
585 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
586 			free(s, M_RPC);
587 			break;
588 		}
589 	}
590 	mtx_unlock(&pool->sp_lock);
591 }
592 
593 /* ********************** CALLOUT list related stuff ************* */
594 
595 /*
596  * Search the callout list for a program number, return the callout
597  * struct.
598  */
599 static struct svc_callout *
600 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
601 {
602 	struct svc_callout *s;
603 
604 	mtx_assert(&pool->sp_lock, MA_OWNED);
605 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
606 		if (s->sc_prog == prog && s->sc_vers == vers
607 		    && (netid == NULL || s->sc_netid == NULL ||
608 			strcmp(netid, s->sc_netid) == 0))
609 			break;
610 	}
611 
612 	return (s);
613 }
614 
615 /* ******************* REPLY GENERATION ROUTINES  ************ */
616 
617 static bool_t
618 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
619     struct mbuf *body)
620 {
621 	SVCXPRT *xprt = rqstp->rq_xprt;
622 	bool_t ok;
623 
624 	if (rqstp->rq_args) {
625 		m_freem(rqstp->rq_args);
626 		rqstp->rq_args = NULL;
627 	}
628 
629 	if (xprt->xp_pool->sp_rcache)
630 		replay_setreply(xprt->xp_pool->sp_rcache,
631 		    rply, svc_getrpccaller(rqstp), body);
632 
633 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
634 		return (FALSE);
635 
636 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
637 	if (rqstp->rq_addr) {
638 		free(rqstp->rq_addr, M_SONAME);
639 		rqstp->rq_addr = NULL;
640 	}
641 
642 	return (ok);
643 }
644 
645 /*
646  * Send a reply to an rpc request
647  */
648 bool_t
649 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
650 {
651 	struct rpc_msg rply;
652 	struct mbuf *m;
653 	XDR xdrs;
654 	bool_t ok;
655 
656 	rply.rm_xid = rqstp->rq_xid;
657 	rply.rm_direction = REPLY;
658 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
659 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
660 	rply.acpted_rply.ar_stat = SUCCESS;
661 	rply.acpted_rply.ar_results.where = NULL;
662 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
663 
664 	m = m_getcl(M_WAITOK, MT_DATA, 0);
665 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
666 	ok = xdr_results(&xdrs, xdr_location);
667 	XDR_DESTROY(&xdrs);
668 
669 	if (ok) {
670 		return (svc_sendreply_common(rqstp, &rply, m));
671 	} else {
672 		m_freem(m);
673 		return (FALSE);
674 	}
675 }
676 
677 bool_t
678 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
679 {
680 	struct rpc_msg rply;
681 
682 	rply.rm_xid = rqstp->rq_xid;
683 	rply.rm_direction = REPLY;
684 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
685 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
686 	rply.acpted_rply.ar_stat = SUCCESS;
687 	rply.acpted_rply.ar_results.where = NULL;
688 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
689 
690 	return (svc_sendreply_common(rqstp, &rply, m));
691 }
692 
693 /*
694  * No procedure error reply
695  */
696 void
697 svcerr_noproc(struct svc_req *rqstp)
698 {
699 	SVCXPRT *xprt = rqstp->rq_xprt;
700 	struct rpc_msg rply;
701 
702 	rply.rm_xid = rqstp->rq_xid;
703 	rply.rm_direction = REPLY;
704 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
705 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
706 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
707 
708 	if (xprt->xp_pool->sp_rcache)
709 		replay_setreply(xprt->xp_pool->sp_rcache,
710 		    &rply, svc_getrpccaller(rqstp), NULL);
711 
712 	svc_sendreply_common(rqstp, &rply, NULL);
713 }
714 
715 /*
716  * Can't decode args error reply
717  */
718 void
719 svcerr_decode(struct svc_req *rqstp)
720 {
721 	SVCXPRT *xprt = rqstp->rq_xprt;
722 	struct rpc_msg rply;
723 
724 	rply.rm_xid = rqstp->rq_xid;
725 	rply.rm_direction = REPLY;
726 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
727 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
728 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
729 
730 	if (xprt->xp_pool->sp_rcache)
731 		replay_setreply(xprt->xp_pool->sp_rcache,
732 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
733 
734 	svc_sendreply_common(rqstp, &rply, NULL);
735 }
736 
737 /*
738  * Some system error
739  */
740 void
741 svcerr_systemerr(struct svc_req *rqstp)
742 {
743 	SVCXPRT *xprt = rqstp->rq_xprt;
744 	struct rpc_msg rply;
745 
746 	rply.rm_xid = rqstp->rq_xid;
747 	rply.rm_direction = REPLY;
748 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
749 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
750 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
751 
752 	if (xprt->xp_pool->sp_rcache)
753 		replay_setreply(xprt->xp_pool->sp_rcache,
754 		    &rply, svc_getrpccaller(rqstp), NULL);
755 
756 	svc_sendreply_common(rqstp, &rply, NULL);
757 }
758 
759 /*
760  * Authentication error reply
761  */
762 void
763 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
764 {
765 	SVCXPRT *xprt = rqstp->rq_xprt;
766 	struct rpc_msg rply;
767 
768 	rply.rm_xid = rqstp->rq_xid;
769 	rply.rm_direction = REPLY;
770 	rply.rm_reply.rp_stat = MSG_DENIED;
771 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
772 	rply.rjcted_rply.rj_why = why;
773 
774 	if (xprt->xp_pool->sp_rcache)
775 		replay_setreply(xprt->xp_pool->sp_rcache,
776 		    &rply, svc_getrpccaller(rqstp), NULL);
777 
778 	svc_sendreply_common(rqstp, &rply, NULL);
779 }
780 
781 /*
782  * Auth too weak error reply
783  */
784 void
785 svcerr_weakauth(struct svc_req *rqstp)
786 {
787 
788 	svcerr_auth(rqstp, AUTH_TOOWEAK);
789 }
790 
791 /*
792  * Program unavailable error reply
793  */
794 void
795 svcerr_noprog(struct svc_req *rqstp)
796 {
797 	SVCXPRT *xprt = rqstp->rq_xprt;
798 	struct rpc_msg rply;
799 
800 	rply.rm_xid = rqstp->rq_xid;
801 	rply.rm_direction = REPLY;
802 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
803 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
804 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
805 
806 	if (xprt->xp_pool->sp_rcache)
807 		replay_setreply(xprt->xp_pool->sp_rcache,
808 		    &rply, svc_getrpccaller(rqstp), NULL);
809 
810 	svc_sendreply_common(rqstp, &rply, NULL);
811 }
812 
813 /*
814  * Program version mismatch error reply
815  */
816 void
817 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
818 {
819 	SVCXPRT *xprt = rqstp->rq_xprt;
820 	struct rpc_msg rply;
821 
822 	rply.rm_xid = rqstp->rq_xid;
823 	rply.rm_direction = REPLY;
824 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
825 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
826 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
827 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
828 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
829 
830 	if (xprt->xp_pool->sp_rcache)
831 		replay_setreply(xprt->xp_pool->sp_rcache,
832 		    &rply, svc_getrpccaller(rqstp), NULL);
833 
834 	svc_sendreply_common(rqstp, &rply, NULL);
835 }
836 
837 /*
838  * Allocate a new server transport structure. All fields are
839  * initialized to zero and xp_p3 is initialized to point at an
840  * extension structure to hold various flags and authentication
841  * parameters.
842  */
843 SVCXPRT *
844 svc_xprt_alloc(void)
845 {
846 	SVCXPRT *xprt;
847 	SVCXPRT_EXT *ext;
848 
849 	xprt = mem_alloc(sizeof(SVCXPRT));
850 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
851 	xprt->xp_p3 = ext;
852 	refcount_init(&xprt->xp_refs, 1);
853 
854 	return (xprt);
855 }
856 
857 /*
858  * Free a server transport structure.
859  */
860 void
861 svc_xprt_free(SVCXPRT *xprt)
862 {
863 
864 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
865 	mem_free(xprt, sizeof(SVCXPRT));
866 }
867 
868 /* ******************* SERVER INPUT STUFF ******************* */
869 
870 /*
871  * Read RPC requests from a transport and queue them to be
872  * executed. We handle authentication and replay cache replies here.
873  * Actually dispatching the RPC is deferred till svc_executereq.
874  */
875 static enum xprt_stat
876 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
877 {
878 	SVCPOOL *pool = xprt->xp_pool;
879 	struct svc_req *r;
880 	struct rpc_msg msg;
881 	struct mbuf *args;
882 	struct svc_loss_callout *s;
883 	enum xprt_stat stat;
884 
885 	/* now receive msgs from xprtprt (support batch calls) */
886 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
887 
888 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
889 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
890 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
891 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
892 		enum auth_stat why;
893 
894 		/*
895 		 * Handle replays and authenticate before queuing the
896 		 * request to be executed.
897 		 */
898 		SVC_ACQUIRE(xprt);
899 		r->rq_xprt = xprt;
900 		if (pool->sp_rcache) {
901 			struct rpc_msg repmsg;
902 			struct mbuf *repbody;
903 			enum replay_state rs;
904 			rs = replay_find(pool->sp_rcache, &msg,
905 			    svc_getrpccaller(r), &repmsg, &repbody);
906 			switch (rs) {
907 			case RS_NEW:
908 				break;
909 			case RS_DONE:
910 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
911 				    repbody, &r->rq_reply_seq);
912 				if (r->rq_addr) {
913 					free(r->rq_addr, M_SONAME);
914 					r->rq_addr = NULL;
915 				}
916 				m_freem(args);
917 				goto call_done;
918 
919 			default:
920 				m_freem(args);
921 				goto call_done;
922 			}
923 		}
924 
925 		r->rq_xid = msg.rm_xid;
926 		r->rq_prog = msg.rm_call.cb_prog;
927 		r->rq_vers = msg.rm_call.cb_vers;
928 		r->rq_proc = msg.rm_call.cb_proc;
929 		r->rq_size = sizeof(*r) + m_length(args, NULL);
930 		r->rq_args = args;
931 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
932 			/*
933 			 * RPCSEC_GSS uses this return code
934 			 * for requests that form part of its
935 			 * context establishment protocol and
936 			 * should not be dispatched to the
937 			 * application.
938 			 */
939 			if (why != RPCSEC_GSS_NODISPATCH)
940 				svcerr_auth(r, why);
941 			goto call_done;
942 		}
943 
944 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
945 			svcerr_decode(r);
946 			goto call_done;
947 		}
948 
949 		/*
950 		 * Everything checks out, return request to caller.
951 		 */
952 		*rqstp_ret = r;
953 		r = NULL;
954 	}
955 call_done:
956 	if (r) {
957 		svc_freereq(r);
958 		r = NULL;
959 	}
960 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
961 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
962 			(*s->slc_dispatch)(xprt);
963 		xprt_unregister(xprt);
964 	}
965 
966 	return (stat);
967 }
968 
969 static void
970 svc_executereq(struct svc_req *rqstp)
971 {
972 	SVCXPRT *xprt = rqstp->rq_xprt;
973 	SVCPOOL *pool = xprt->xp_pool;
974 	int prog_found;
975 	rpcvers_t low_vers;
976 	rpcvers_t high_vers;
977 	struct svc_callout *s;
978 
979 	/* now match message with a registered service*/
980 	prog_found = FALSE;
981 	low_vers = (rpcvers_t) -1L;
982 	high_vers = (rpcvers_t) 0L;
983 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
984 		if (s->sc_prog == rqstp->rq_prog) {
985 			if (s->sc_vers == rqstp->rq_vers) {
986 				/*
987 				 * We hand ownership of r to the
988 				 * dispatch method - they must call
989 				 * svc_freereq.
990 				 */
991 				(*s->sc_dispatch)(rqstp, xprt);
992 				return;
993 			}  /* found correct version */
994 			prog_found = TRUE;
995 			if (s->sc_vers < low_vers)
996 				low_vers = s->sc_vers;
997 			if (s->sc_vers > high_vers)
998 				high_vers = s->sc_vers;
999 		}   /* found correct program */
1000 	}
1001 
1002 	/*
1003 	 * if we got here, the program or version
1004 	 * is not served ...
1005 	 */
1006 	if (prog_found)
1007 		svcerr_progvers(rqstp, low_vers, high_vers);
1008 	else
1009 		svcerr_noprog(rqstp);
1010 
1011 	svc_freereq(rqstp);
1012 }
1013 
1014 static void
1015 svc_checkidle(SVCGROUP *grp)
1016 {
1017 	SVCXPRT *xprt, *nxprt;
1018 	time_t timo;
1019 	struct svcxprt_list cleanup;
1020 
1021 	TAILQ_INIT(&cleanup);
1022 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1023 		/*
1024 		 * Only some transports have idle timers. Don't time
1025 		 * something out which is just waking up.
1026 		 */
1027 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1028 			continue;
1029 
1030 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1031 		if (time_uptime > timo) {
1032 			xprt_unregister_locked(xprt);
1033 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1034 		}
1035 	}
1036 
1037 	mtx_unlock(&grp->sg_lock);
1038 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1039 		SVC_RELEASE(xprt);
1040 	}
1041 	mtx_lock(&grp->sg_lock);
1042 }
1043 
1044 static void
1045 svc_assign_waiting_sockets(SVCPOOL *pool)
1046 {
1047 	SVCGROUP *grp;
1048 	SVCXPRT *xprt;
1049 	int g;
1050 
1051 	for (g = 0; g < pool->sp_groupcount; g++) {
1052 		grp = &pool->sp_groups[g];
1053 		mtx_lock(&grp->sg_lock);
1054 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1055 			if (xprt_assignthread(xprt))
1056 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1057 			else
1058 				break;
1059 		}
1060 		mtx_unlock(&grp->sg_lock);
1061 	}
1062 }
1063 
1064 static void
1065 svc_change_space_used(SVCPOOL *pool, long delta)
1066 {
1067 	unsigned long value;
1068 
1069 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1070 	if (delta > 0) {
1071 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1072 			pool->sp_space_throttled = TRUE;
1073 			pool->sp_space_throttle_count++;
1074 		}
1075 		if (value > pool->sp_space_used_highest)
1076 			pool->sp_space_used_highest = value;
1077 	} else {
1078 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1079 			pool->sp_space_throttled = FALSE;
1080 			svc_assign_waiting_sockets(pool);
1081 		}
1082 	}
1083 }
1084 
1085 static bool_t
1086 svc_request_space_available(SVCPOOL *pool)
1087 {
1088 
1089 	if (pool->sp_space_throttled)
1090 		return (FALSE);
1091 	return (TRUE);
1092 }
1093 
1094 static void
1095 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1096 {
1097 	SVCPOOL *pool = grp->sg_pool;
1098 	SVCTHREAD *st, *stpref;
1099 	SVCXPRT *xprt;
1100 	enum xprt_stat stat;
1101 	struct svc_req *rqstp;
1102 	struct proc *p;
1103 	long sz;
1104 	int error;
1105 
1106 	st = mem_alloc(sizeof(*st));
1107 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1108 	st->st_pool = pool;
1109 	st->st_xprt = NULL;
1110 	STAILQ_INIT(&st->st_reqs);
1111 	cv_init(&st->st_cond, "rpcsvc");
1112 
1113 	mtx_lock(&grp->sg_lock);
1114 
1115 	/*
1116 	 * If we are a new thread which was spawned to cope with
1117 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1118 	 */
1119 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1120 		grp->sg_state = SVCPOOL_ACTIVE;
1121 
1122 	while (grp->sg_state != SVCPOOL_CLOSING) {
1123 		/*
1124 		 * Create new thread if requested.
1125 		 */
1126 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1127 			grp->sg_state = SVCPOOL_THREADSTARTING;
1128 			grp->sg_lastcreatetime = time_uptime;
1129 			mtx_unlock(&grp->sg_lock);
1130 			svc_new_thread(grp);
1131 			mtx_lock(&grp->sg_lock);
1132 			continue;
1133 		}
1134 
1135 		/*
1136 		 * Check for idle transports once per second.
1137 		 */
1138 		if (time_uptime > grp->sg_lastidlecheck) {
1139 			grp->sg_lastidlecheck = time_uptime;
1140 			svc_checkidle(grp);
1141 		}
1142 
1143 		xprt = st->st_xprt;
1144 		if (!xprt) {
1145 			/*
1146 			 * Enforce maxthreads count.
1147 			 */
1148 			if (grp->sg_threadcount > grp->sg_maxthreads)
1149 				break;
1150 
1151 			/*
1152 			 * Before sleeping, see if we can find an
1153 			 * active transport which isn't being serviced
1154 			 * by a thread.
1155 			 */
1156 			if (svc_request_space_available(pool) &&
1157 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1158 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1159 				SVC_ACQUIRE(xprt);
1160 				xprt->xp_thread = st;
1161 				st->st_xprt = xprt;
1162 				continue;
1163 			}
1164 
1165 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1166 			if (ismaster || (!ismaster &&
1167 			    grp->sg_threadcount > grp->sg_minthreads))
1168 				error = cv_timedwait_sig(&st->st_cond,
1169 				    &grp->sg_lock, 5 * hz);
1170 			else
1171 				error = cv_wait_sig(&st->st_cond,
1172 				    &grp->sg_lock);
1173 			if (st->st_xprt == NULL)
1174 				LIST_REMOVE(st, st_ilink);
1175 
1176 			/*
1177 			 * Reduce worker thread count when idle.
1178 			 */
1179 			if (error == EWOULDBLOCK) {
1180 				if (!ismaster
1181 				    && (grp->sg_threadcount
1182 					> grp->sg_minthreads)
1183 					&& !st->st_xprt)
1184 					break;
1185 			} else if (error != 0) {
1186 				KASSERT(error == EINTR || error == ERESTART,
1187 				    ("non-signal error %d", error));
1188 				mtx_unlock(&grp->sg_lock);
1189 				p = curproc;
1190 				PROC_LOCK(p);
1191 				if (P_SHOULDSTOP(p) ||
1192 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1193 					thread_suspend_check(0);
1194 					PROC_UNLOCK(p);
1195 					mtx_lock(&grp->sg_lock);
1196 				} else {
1197 					PROC_UNLOCK(p);
1198 					svc_exit(pool);
1199 					mtx_lock(&grp->sg_lock);
1200 					break;
1201 				}
1202 			}
1203 			continue;
1204 		}
1205 		mtx_unlock(&grp->sg_lock);
1206 
1207 		/*
1208 		 * Drain the transport socket and queue up any RPCs.
1209 		 */
1210 		xprt->xp_lastactive = time_uptime;
1211 		do {
1212 			if (!svc_request_space_available(pool))
1213 				break;
1214 			rqstp = NULL;
1215 			stat = svc_getreq(xprt, &rqstp);
1216 			if (rqstp) {
1217 				svc_change_space_used(pool, rqstp->rq_size);
1218 				/*
1219 				 * See if the application has a preference
1220 				 * for some other thread.
1221 				 */
1222 				if (pool->sp_assign) {
1223 					stpref = pool->sp_assign(st, rqstp);
1224 					rqstp->rq_thread = stpref;
1225 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1226 					    rqstp, rq_link);
1227 					mtx_unlock(&stpref->st_lock);
1228 					if (stpref != st)
1229 						rqstp = NULL;
1230 				} else {
1231 					rqstp->rq_thread = st;
1232 					STAILQ_INSERT_TAIL(&st->st_reqs,
1233 					    rqstp, rq_link);
1234 				}
1235 			}
1236 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1237 		    && grp->sg_state != SVCPOOL_CLOSING);
1238 
1239 		/*
1240 		 * Move this transport to the end of the active list to
1241 		 * ensure fairness when multiple transports are active.
1242 		 * If this was the last queued request, svc_getreq will end
1243 		 * up calling xprt_inactive to remove from the active list.
1244 		 */
1245 		mtx_lock(&grp->sg_lock);
1246 		xprt->xp_thread = NULL;
1247 		st->st_xprt = NULL;
1248 		if (xprt->xp_active) {
1249 			if (!svc_request_space_available(pool) ||
1250 			    !xprt_assignthread(xprt))
1251 				TAILQ_INSERT_TAIL(&grp->sg_active,
1252 				    xprt, xp_alink);
1253 		}
1254 		mtx_unlock(&grp->sg_lock);
1255 		SVC_RELEASE(xprt);
1256 
1257 		/*
1258 		 * Execute what we have queued.
1259 		 */
1260 		mtx_lock(&st->st_lock);
1261 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1262 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1263 			mtx_unlock(&st->st_lock);
1264 			sz = (long)rqstp->rq_size;
1265 			svc_executereq(rqstp);
1266 			svc_change_space_used(pool, -sz);
1267 			mtx_lock(&st->st_lock);
1268 		}
1269 		mtx_unlock(&st->st_lock);
1270 		mtx_lock(&grp->sg_lock);
1271 	}
1272 
1273 	if (st->st_xprt) {
1274 		xprt = st->st_xprt;
1275 		st->st_xprt = NULL;
1276 		SVC_RELEASE(xprt);
1277 	}
1278 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1279 	mtx_destroy(&st->st_lock);
1280 	cv_destroy(&st->st_cond);
1281 	mem_free(st, sizeof(*st));
1282 
1283 	grp->sg_threadcount--;
1284 	if (!ismaster)
1285 		wakeup(grp);
1286 	mtx_unlock(&grp->sg_lock);
1287 }
1288 
1289 static void
1290 svc_thread_start(void *arg)
1291 {
1292 
1293 	svc_run_internal((SVCGROUP *) arg, FALSE);
1294 	kthread_exit();
1295 }
1296 
1297 static void
1298 svc_new_thread(SVCGROUP *grp)
1299 {
1300 	SVCPOOL *pool = grp->sg_pool;
1301 	struct thread *td;
1302 
1303 	mtx_lock(&grp->sg_lock);
1304 	grp->sg_threadcount++;
1305 	mtx_unlock(&grp->sg_lock);
1306 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1307 	    "%s: service", pool->sp_name);
1308 }
1309 
1310 void
1311 svc_run(SVCPOOL *pool)
1312 {
1313 	int g, i;
1314 	struct proc *p;
1315 	struct thread *td;
1316 	SVCGROUP *grp;
1317 
1318 	p = curproc;
1319 	td = curthread;
1320 	snprintf(td->td_name, sizeof(td->td_name),
1321 	    "%s: master", pool->sp_name);
1322 	pool->sp_state = SVCPOOL_ACTIVE;
1323 	pool->sp_proc = p;
1324 
1325 	/* Choose group count based on number of threads and CPUs. */
1326 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1327 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1328 	for (g = 0; g < pool->sp_groupcount; g++) {
1329 		grp = &pool->sp_groups[g];
1330 		grp->sg_minthreads = max(1,
1331 		    pool->sp_minthreads / pool->sp_groupcount);
1332 		grp->sg_maxthreads = max(1,
1333 		    pool->sp_maxthreads / pool->sp_groupcount);
1334 		grp->sg_lastcreatetime = time_uptime;
1335 	}
1336 
1337 	/* Starting threads */
1338 	pool->sp_groups[0].sg_threadcount++;
1339 	for (g = 0; g < pool->sp_groupcount; g++) {
1340 		grp = &pool->sp_groups[g];
1341 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1342 			svc_new_thread(grp);
1343 	}
1344 	svc_run_internal(&pool->sp_groups[0], TRUE);
1345 
1346 	/* Waiting for threads to stop. */
1347 	for (g = 0; g < pool->sp_groupcount; g++) {
1348 		grp = &pool->sp_groups[g];
1349 		mtx_lock(&grp->sg_lock);
1350 		while (grp->sg_threadcount > 0)
1351 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1352 		mtx_unlock(&grp->sg_lock);
1353 	}
1354 }
1355 
1356 void
1357 svc_exit(SVCPOOL *pool)
1358 {
1359 	SVCGROUP *grp;
1360 	SVCTHREAD *st;
1361 	int g;
1362 
1363 	pool->sp_state = SVCPOOL_CLOSING;
1364 	for (g = 0; g < pool->sp_groupcount; g++) {
1365 		grp = &pool->sp_groups[g];
1366 		mtx_lock(&grp->sg_lock);
1367 		if (grp->sg_state != SVCPOOL_CLOSING) {
1368 			grp->sg_state = SVCPOOL_CLOSING;
1369 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1370 				cv_signal(&st->st_cond);
1371 		}
1372 		mtx_unlock(&grp->sg_lock);
1373 	}
1374 }
1375 
1376 bool_t
1377 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1378 {
1379 	struct mbuf *m;
1380 	XDR xdrs;
1381 	bool_t stat;
1382 
1383 	m = rqstp->rq_args;
1384 	rqstp->rq_args = NULL;
1385 
1386 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1387 	stat = xargs(&xdrs, args);
1388 	XDR_DESTROY(&xdrs);
1389 
1390 	return (stat);
1391 }
1392 
1393 bool_t
1394 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1395 {
1396 	XDR xdrs;
1397 
1398 	if (rqstp->rq_addr) {
1399 		free(rqstp->rq_addr, M_SONAME);
1400 		rqstp->rq_addr = NULL;
1401 	}
1402 
1403 	xdrs.x_op = XDR_FREE;
1404 	return (xargs(&xdrs, args));
1405 }
1406 
1407 void
1408 svc_freereq(struct svc_req *rqstp)
1409 {
1410 	SVCTHREAD *st;
1411 	SVCPOOL *pool;
1412 
1413 	st = rqstp->rq_thread;
1414 	if (st) {
1415 		pool = st->st_pool;
1416 		if (pool->sp_done)
1417 			pool->sp_done(st, rqstp);
1418 	}
1419 
1420 	if (rqstp->rq_auth.svc_ah_ops)
1421 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1422 
1423 	if (rqstp->rq_xprt) {
1424 		SVC_RELEASE(rqstp->rq_xprt);
1425 	}
1426 
1427 	if (rqstp->rq_addr)
1428 		free(rqstp->rq_addr, M_SONAME);
1429 
1430 	if (rqstp->rq_args)
1431 		m_freem(rqstp->rq_args);
1432 
1433 	free(rqstp, M_RPC);
1434 }
1435