xref: /freebsd/sys/rpc/svc.c (revision aa24f48b361effe51163877d84f1b70d32b77e04)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * Copyright (c) 2009, Sun Microsystems, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  * - Redistributions of source code must retain the above copyright notice,
10  *   this list of conditions and the following disclaimer.
11  * - Redistributions in binary form must reproduce the above copyright notice,
12  *   this list of conditions and the following disclaimer in the documentation
13  *   and/or other materials provided with the distribution.
14  * - Neither the name of Sun Microsystems, Inc. nor the names of its
15  *   contributors may be used to endorse or promote products derived
16  *   from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37 
38 /*
39  * svc.c, Server-side remote procedure call interface.
40  *
41  * There are two sets of procedures here.  The xprt routines are
42  * for handling transport handles.  The svc routines handle the
43  * list of service routines.
44  *
45  * Copyright (C) 1984, Sun Microsystems, Inc.
46  */
47 
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62 
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66 
67 #include <rpc/rpc_com.h>
68 
69 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71 
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73     char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 static void svcpool_cleanup(SVCPOOL *pool);
79 
80 /* ***************  SVCXPRT related stuff **************** */
81 
82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85 
86 SVCPOOL*
87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88 {
89 	SVCPOOL *pool;
90 	SVCGROUP *grp;
91 	int g;
92 
93 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94 
95 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96 	pool->sp_name = name;
97 	pool->sp_state = SVCPOOL_INIT;
98 	pool->sp_proc = NULL;
99 	TAILQ_INIT(&pool->sp_callouts);
100 	TAILQ_INIT(&pool->sp_lcallouts);
101 	pool->sp_minthreads = 1;
102 	pool->sp_maxthreads = 1;
103 	pool->sp_groupcount = 1;
104 	for (g = 0; g < SVC_MAXGROUPS; g++) {
105 		grp = &pool->sp_groups[g];
106 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107 		grp->sg_pool = pool;
108 		grp->sg_state = SVCPOOL_ACTIVE;
109 		TAILQ_INIT(&grp->sg_xlist);
110 		TAILQ_INIT(&grp->sg_active);
111 		LIST_INIT(&grp->sg_idlethreads);
112 		grp->sg_minthreads = 1;
113 		grp->sg_maxthreads = 1;
114 	}
115 
116 	/*
117 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
118 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119 	 * on LP64 architectures, so cast to u_long to avoid undefined
120 	 * behavior.  (ILP32 architectures cannot have nmbclusters
121 	 * large enough to overflow for other reasons.)
122 	 */
123 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125 
126 	sysctl_ctx_init(&pool->sp_sysctl);
127 	if (sysctl_base) {
128 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
130 		    pool, 0, svcpool_minthread_sysctl, "I",
131 		    "Minimal number of threads");
132 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
134 		    pool, 0, svcpool_maxthread_sysctl, "I",
135 		    "Maximal number of threads");
136 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 		    "threads", CTLTYPE_INT | CTLFLAG_RD,
138 		    pool, 0, svcpool_threads_sysctl, "I",
139 		    "Current number of threads");
140 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142 		    "Number of thread groups");
143 
144 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 		    "request_space_used", CTLFLAG_RD,
146 		    &pool->sp_space_used,
147 		    "Space in parsed but not handled requests.");
148 
149 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 		    "request_space_used_highest", CTLFLAG_RD,
151 		    &pool->sp_space_used_highest,
152 		    "Highest space used since reboot.");
153 
154 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 		    "request_space_high", CTLFLAG_RW,
156 		    &pool->sp_space_high,
157 		    "Maximum space in parsed but not handled requests.");
158 
159 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 		    "request_space_low", CTLFLAG_RW,
161 		    &pool->sp_space_low,
162 		    "Low water mark for request space.");
163 
164 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 		    "request_space_throttled", CTLFLAG_RD,
166 		    &pool->sp_space_throttled, 0,
167 		    "Whether nfs requests are currently throttled");
168 
169 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 		    "request_space_throttle_count", CTLFLAG_RD,
171 		    &pool->sp_space_throttle_count, 0,
172 		    "Count of times throttling based on request space has occurred");
173 	}
174 
175 	return pool;
176 }
177 
178 /*
179  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180  * the pool data structures.
181  */
182 static void
183 svcpool_cleanup(SVCPOOL *pool)
184 {
185 	SVCGROUP *grp;
186 	SVCXPRT *xprt, *nxprt;
187 	struct svc_callout *s;
188 	struct svc_loss_callout *sl;
189 	struct svcxprt_list cleanup;
190 	int g;
191 
192 	TAILQ_INIT(&cleanup);
193 
194 	for (g = 0; g < SVC_MAXGROUPS; g++) {
195 		grp = &pool->sp_groups[g];
196 		mtx_lock(&grp->sg_lock);
197 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198 			xprt_unregister_locked(xprt);
199 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200 		}
201 		mtx_unlock(&grp->sg_lock);
202 	}
203 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204 		SVC_RELEASE(xprt);
205 	}
206 
207 	mtx_lock(&pool->sp_lock);
208 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
209 		mtx_unlock(&pool->sp_lock);
210 		svc_unreg(pool, s->sc_prog, s->sc_vers);
211 		mtx_lock(&pool->sp_lock);
212 	}
213 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
214 		mtx_unlock(&pool->sp_lock);
215 		svc_loss_unreg(pool, sl->slc_dispatch);
216 		mtx_lock(&pool->sp_lock);
217 	}
218 	mtx_unlock(&pool->sp_lock);
219 }
220 
221 void
222 svcpool_destroy(SVCPOOL *pool)
223 {
224 	SVCGROUP *grp;
225 	int g;
226 
227 	svcpool_cleanup(pool);
228 
229 	for (g = 0; g < SVC_MAXGROUPS; g++) {
230 		grp = &pool->sp_groups[g];
231 		mtx_destroy(&grp->sg_lock);
232 	}
233 	mtx_destroy(&pool->sp_lock);
234 
235 	if (pool->sp_rcache)
236 		replay_freecache(pool->sp_rcache);
237 
238 	sysctl_ctx_free(&pool->sp_sysctl);
239 	free(pool, M_RPC);
240 }
241 
242 /*
243  * Similar to svcpool_destroy(), except that it does not destroy the actual
244  * data structures.  As such, "pool" may be used again.
245  */
246 void
247 svcpool_close(SVCPOOL *pool)
248 {
249 	SVCGROUP *grp;
250 	int g;
251 
252 	svcpool_cleanup(pool);
253 
254 	/* Now, initialize the pool's state for a fresh svc_run() call. */
255 	mtx_lock(&pool->sp_lock);
256 	pool->sp_state = SVCPOOL_INIT;
257 	mtx_unlock(&pool->sp_lock);
258 	for (g = 0; g < SVC_MAXGROUPS; g++) {
259 		grp = &pool->sp_groups[g];
260 		mtx_lock(&grp->sg_lock);
261 		grp->sg_state = SVCPOOL_ACTIVE;
262 		mtx_unlock(&grp->sg_lock);
263 	}
264 }
265 
266 /*
267  * Sysctl handler to get the present thread count on a pool
268  */
269 static int
270 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
271 {
272 	SVCPOOL *pool;
273 	int threads, error, g;
274 
275 	pool = oidp->oid_arg1;
276 	threads = 0;
277 	mtx_lock(&pool->sp_lock);
278 	for (g = 0; g < pool->sp_groupcount; g++)
279 		threads += pool->sp_groups[g].sg_threadcount;
280 	mtx_unlock(&pool->sp_lock);
281 	error = sysctl_handle_int(oidp, &threads, 0, req);
282 	return (error);
283 }
284 
285 /*
286  * Sysctl handler to set the minimum thread count on a pool
287  */
288 static int
289 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
290 {
291 	SVCPOOL *pool;
292 	int newminthreads, error, g;
293 
294 	pool = oidp->oid_arg1;
295 	newminthreads = pool->sp_minthreads;
296 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
297 	if (error == 0 && newminthreads != pool->sp_minthreads) {
298 		if (newminthreads > pool->sp_maxthreads)
299 			return (EINVAL);
300 		mtx_lock(&pool->sp_lock);
301 		pool->sp_minthreads = newminthreads;
302 		for (g = 0; g < pool->sp_groupcount; g++) {
303 			pool->sp_groups[g].sg_minthreads = max(1,
304 			    pool->sp_minthreads / pool->sp_groupcount);
305 		}
306 		mtx_unlock(&pool->sp_lock);
307 	}
308 	return (error);
309 }
310 
311 /*
312  * Sysctl handler to set the maximum thread count on a pool
313  */
314 static int
315 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
316 {
317 	SVCPOOL *pool;
318 	int newmaxthreads, error, g;
319 
320 	pool = oidp->oid_arg1;
321 	newmaxthreads = pool->sp_maxthreads;
322 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
323 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
324 		if (newmaxthreads < pool->sp_minthreads)
325 			return (EINVAL);
326 		mtx_lock(&pool->sp_lock);
327 		pool->sp_maxthreads = newmaxthreads;
328 		for (g = 0; g < pool->sp_groupcount; g++) {
329 			pool->sp_groups[g].sg_maxthreads = max(1,
330 			    pool->sp_maxthreads / pool->sp_groupcount);
331 		}
332 		mtx_unlock(&pool->sp_lock);
333 	}
334 	return (error);
335 }
336 
337 /*
338  * Activate a transport handle.
339  */
340 void
341 xprt_register(SVCXPRT *xprt)
342 {
343 	SVCPOOL *pool = xprt->xp_pool;
344 	SVCGROUP *grp;
345 	int g;
346 
347 	SVC_ACQUIRE(xprt);
348 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
349 	xprt->xp_group = grp = &pool->sp_groups[g];
350 	mtx_lock(&grp->sg_lock);
351 	xprt->xp_registered = TRUE;
352 	xprt->xp_active = FALSE;
353 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
354 	mtx_unlock(&grp->sg_lock);
355 }
356 
357 /*
358  * De-activate a transport handle. Note: the locked version doesn't
359  * release the transport - caller must do that after dropping the pool
360  * lock.
361  */
362 static void
363 xprt_unregister_locked(SVCXPRT *xprt)
364 {
365 	SVCGROUP *grp = xprt->xp_group;
366 
367 	mtx_assert(&grp->sg_lock, MA_OWNED);
368 	KASSERT(xprt->xp_registered == TRUE,
369 	    ("xprt_unregister_locked: not registered"));
370 	xprt_inactive_locked(xprt);
371 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
372 	xprt->xp_registered = FALSE;
373 }
374 
375 void
376 xprt_unregister(SVCXPRT *xprt)
377 {
378 	SVCGROUP *grp = xprt->xp_group;
379 
380 	mtx_lock(&grp->sg_lock);
381 	if (xprt->xp_registered == FALSE) {
382 		/* Already unregistered by another thread */
383 		mtx_unlock(&grp->sg_lock);
384 		return;
385 	}
386 	xprt_unregister_locked(xprt);
387 	mtx_unlock(&grp->sg_lock);
388 
389 	SVC_RELEASE(xprt);
390 }
391 
392 /*
393  * Attempt to assign a service thread to this transport.
394  */
395 static int
396 xprt_assignthread(SVCXPRT *xprt)
397 {
398 	SVCGROUP *grp = xprt->xp_group;
399 	SVCTHREAD *st;
400 
401 	mtx_assert(&grp->sg_lock, MA_OWNED);
402 	st = LIST_FIRST(&grp->sg_idlethreads);
403 	if (st) {
404 		LIST_REMOVE(st, st_ilink);
405 		SVC_ACQUIRE(xprt);
406 		xprt->xp_thread = st;
407 		st->st_xprt = xprt;
408 		cv_signal(&st->st_cond);
409 		return (TRUE);
410 	} else {
411 		/*
412 		 * See if we can create a new thread. The
413 		 * actual thread creation happens in
414 		 * svc_run_internal because our locking state
415 		 * is poorly defined (we are typically called
416 		 * from a socket upcall). Don't create more
417 		 * than one thread per second.
418 		 */
419 		if (grp->sg_state == SVCPOOL_ACTIVE
420 		    && grp->sg_lastcreatetime < time_uptime
421 		    && grp->sg_threadcount < grp->sg_maxthreads) {
422 			grp->sg_state = SVCPOOL_THREADWANTED;
423 		}
424 	}
425 	return (FALSE);
426 }
427 
428 void
429 xprt_active(SVCXPRT *xprt)
430 {
431 	SVCGROUP *grp = xprt->xp_group;
432 
433 	mtx_lock(&grp->sg_lock);
434 
435 	if (!xprt->xp_registered) {
436 		/*
437 		 * Race with xprt_unregister - we lose.
438 		 */
439 		mtx_unlock(&grp->sg_lock);
440 		return;
441 	}
442 
443 	if (!xprt->xp_active) {
444 		xprt->xp_active = TRUE;
445 		if (xprt->xp_thread == NULL) {
446 			if (!svc_request_space_available(xprt->xp_pool) ||
447 			    !xprt_assignthread(xprt))
448 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
449 				    xp_alink);
450 		}
451 	}
452 
453 	mtx_unlock(&grp->sg_lock);
454 }
455 
456 void
457 xprt_inactive_locked(SVCXPRT *xprt)
458 {
459 	SVCGROUP *grp = xprt->xp_group;
460 
461 	mtx_assert(&grp->sg_lock, MA_OWNED);
462 	if (xprt->xp_active) {
463 		if (xprt->xp_thread == NULL)
464 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
465 		xprt->xp_active = FALSE;
466 	}
467 }
468 
469 void
470 xprt_inactive(SVCXPRT *xprt)
471 {
472 	SVCGROUP *grp = xprt->xp_group;
473 
474 	mtx_lock(&grp->sg_lock);
475 	xprt_inactive_locked(xprt);
476 	mtx_unlock(&grp->sg_lock);
477 }
478 
479 /*
480  * Variant of xprt_inactive() for use only when sure that port is
481  * assigned to thread. For example, within receive handlers.
482  */
483 void
484 xprt_inactive_self(SVCXPRT *xprt)
485 {
486 
487 	KASSERT(xprt->xp_thread != NULL,
488 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
489 	xprt->xp_active = FALSE;
490 }
491 
492 /*
493  * Add a service program to the callout list.
494  * The dispatch routine will be called when a rpc request for this
495  * program number comes in.
496  */
497 bool_t
498 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
499     void (*dispatch)(struct svc_req *, SVCXPRT *),
500     const struct netconfig *nconf)
501 {
502 	SVCPOOL *pool = xprt->xp_pool;
503 	struct svc_callout *s;
504 	char *netid = NULL;
505 	int flag = 0;
506 
507 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
508 
509 	if (xprt->xp_netid) {
510 		netid = strdup(xprt->xp_netid, M_RPC);
511 		flag = 1;
512 	} else if (nconf && nconf->nc_netid) {
513 		netid = strdup(nconf->nc_netid, M_RPC);
514 		flag = 1;
515 	} /* must have been created with svc_raw_create */
516 	if ((netid == NULL) && (flag == 1)) {
517 		return (FALSE);
518 	}
519 
520 	mtx_lock(&pool->sp_lock);
521 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
522 		if (netid)
523 			free(netid, M_RPC);
524 		if (s->sc_dispatch == dispatch)
525 			goto rpcb_it; /* he is registering another xptr */
526 		mtx_unlock(&pool->sp_lock);
527 		return (FALSE);
528 	}
529 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
530 	if (s == NULL) {
531 		if (netid)
532 			free(netid, M_RPC);
533 		mtx_unlock(&pool->sp_lock);
534 		return (FALSE);
535 	}
536 
537 	s->sc_prog = prog;
538 	s->sc_vers = vers;
539 	s->sc_dispatch = dispatch;
540 	s->sc_netid = netid;
541 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
542 
543 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
544 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
545 
546 rpcb_it:
547 	mtx_unlock(&pool->sp_lock);
548 	/* now register the information with the local binder service */
549 	if (nconf) {
550 		bool_t dummy;
551 		struct netconfig tnc;
552 		struct netbuf nb;
553 		tnc = *nconf;
554 		nb.buf = &xprt->xp_ltaddr;
555 		nb.len = xprt->xp_ltaddr.ss_len;
556 		dummy = rpcb_set(prog, vers, &tnc, &nb);
557 		return (dummy);
558 	}
559 	return (TRUE);
560 }
561 
562 /*
563  * Remove a service program from the callout list.
564  */
565 void
566 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
567 {
568 	struct svc_callout *s;
569 
570 	/* unregister the information anyway */
571 	(void) rpcb_unset(prog, vers, NULL);
572 	mtx_lock(&pool->sp_lock);
573 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
574 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
575 		if (s->sc_netid)
576 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
577 		mem_free(s, sizeof (struct svc_callout));
578 	}
579 	mtx_unlock(&pool->sp_lock);
580 }
581 
582 /*
583  * Add a service connection loss program to the callout list.
584  * The dispatch routine will be called when some port in ths pool die.
585  */
586 bool_t
587 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
588 {
589 	SVCPOOL *pool = xprt->xp_pool;
590 	struct svc_loss_callout *s;
591 
592 	mtx_lock(&pool->sp_lock);
593 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
594 		if (s->slc_dispatch == dispatch)
595 			break;
596 	}
597 	if (s != NULL) {
598 		mtx_unlock(&pool->sp_lock);
599 		return (TRUE);
600 	}
601 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
602 	if (s == NULL) {
603 		mtx_unlock(&pool->sp_lock);
604 		return (FALSE);
605 	}
606 	s->slc_dispatch = dispatch;
607 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
608 	mtx_unlock(&pool->sp_lock);
609 	return (TRUE);
610 }
611 
612 /*
613  * Remove a service connection loss program from the callout list.
614  */
615 void
616 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
617 {
618 	struct svc_loss_callout *s;
619 
620 	mtx_lock(&pool->sp_lock);
621 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
622 		if (s->slc_dispatch == dispatch) {
623 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
624 			free(s, M_RPC);
625 			break;
626 		}
627 	}
628 	mtx_unlock(&pool->sp_lock);
629 }
630 
631 /* ********************** CALLOUT list related stuff ************* */
632 
633 /*
634  * Search the callout list for a program number, return the callout
635  * struct.
636  */
637 static struct svc_callout *
638 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
639 {
640 	struct svc_callout *s;
641 
642 	mtx_assert(&pool->sp_lock, MA_OWNED);
643 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
644 		if (s->sc_prog == prog && s->sc_vers == vers
645 		    && (netid == NULL || s->sc_netid == NULL ||
646 			strcmp(netid, s->sc_netid) == 0))
647 			break;
648 	}
649 
650 	return (s);
651 }
652 
653 /* ******************* REPLY GENERATION ROUTINES  ************ */
654 
655 static bool_t
656 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
657     struct mbuf *body)
658 {
659 	SVCXPRT *xprt = rqstp->rq_xprt;
660 	bool_t ok;
661 
662 	if (rqstp->rq_args) {
663 		m_freem(rqstp->rq_args);
664 		rqstp->rq_args = NULL;
665 	}
666 
667 	if (xprt->xp_pool->sp_rcache)
668 		replay_setreply(xprt->xp_pool->sp_rcache,
669 		    rply, svc_getrpccaller(rqstp), body);
670 
671 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
672 		return (FALSE);
673 
674 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
675 	if (rqstp->rq_addr) {
676 		free(rqstp->rq_addr, M_SONAME);
677 		rqstp->rq_addr = NULL;
678 	}
679 
680 	return (ok);
681 }
682 
683 /*
684  * Send a reply to an rpc request
685  */
686 bool_t
687 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
688 {
689 	struct rpc_msg rply;
690 	struct mbuf *m;
691 	XDR xdrs;
692 	bool_t ok;
693 
694 	rply.rm_xid = rqstp->rq_xid;
695 	rply.rm_direction = REPLY;
696 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 	rply.acpted_rply.ar_stat = SUCCESS;
699 	rply.acpted_rply.ar_results.where = NULL;
700 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
701 
702 	m = m_getcl(M_WAITOK, MT_DATA, 0);
703 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
704 	ok = xdr_results(&xdrs, xdr_location);
705 	XDR_DESTROY(&xdrs);
706 
707 	if (ok) {
708 		return (svc_sendreply_common(rqstp, &rply, m));
709 	} else {
710 		m_freem(m);
711 		return (FALSE);
712 	}
713 }
714 
715 bool_t
716 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
717 {
718 	struct rpc_msg rply;
719 
720 	rply.rm_xid = rqstp->rq_xid;
721 	rply.rm_direction = REPLY;
722 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
723 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
724 	rply.acpted_rply.ar_stat = SUCCESS;
725 	rply.acpted_rply.ar_results.where = NULL;
726 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
727 
728 	return (svc_sendreply_common(rqstp, &rply, m));
729 }
730 
731 /*
732  * No procedure error reply
733  */
734 void
735 svcerr_noproc(struct svc_req *rqstp)
736 {
737 	SVCXPRT *xprt = rqstp->rq_xprt;
738 	struct rpc_msg rply;
739 
740 	rply.rm_xid = rqstp->rq_xid;
741 	rply.rm_direction = REPLY;
742 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
743 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
744 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
745 
746 	if (xprt->xp_pool->sp_rcache)
747 		replay_setreply(xprt->xp_pool->sp_rcache,
748 		    &rply, svc_getrpccaller(rqstp), NULL);
749 
750 	svc_sendreply_common(rqstp, &rply, NULL);
751 }
752 
753 /*
754  * Can't decode args error reply
755  */
756 void
757 svcerr_decode(struct svc_req *rqstp)
758 {
759 	SVCXPRT *xprt = rqstp->rq_xprt;
760 	struct rpc_msg rply;
761 
762 	rply.rm_xid = rqstp->rq_xid;
763 	rply.rm_direction = REPLY;
764 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
765 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
766 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
767 
768 	if (xprt->xp_pool->sp_rcache)
769 		replay_setreply(xprt->xp_pool->sp_rcache,
770 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
771 
772 	svc_sendreply_common(rqstp, &rply, NULL);
773 }
774 
775 /*
776  * Some system error
777  */
778 void
779 svcerr_systemerr(struct svc_req *rqstp)
780 {
781 	SVCXPRT *xprt = rqstp->rq_xprt;
782 	struct rpc_msg rply;
783 
784 	rply.rm_xid = rqstp->rq_xid;
785 	rply.rm_direction = REPLY;
786 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
787 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
788 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
789 
790 	if (xprt->xp_pool->sp_rcache)
791 		replay_setreply(xprt->xp_pool->sp_rcache,
792 		    &rply, svc_getrpccaller(rqstp), NULL);
793 
794 	svc_sendreply_common(rqstp, &rply, NULL);
795 }
796 
797 /*
798  * Authentication error reply
799  */
800 void
801 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
802 {
803 	SVCXPRT *xprt = rqstp->rq_xprt;
804 	struct rpc_msg rply;
805 
806 	rply.rm_xid = rqstp->rq_xid;
807 	rply.rm_direction = REPLY;
808 	rply.rm_reply.rp_stat = MSG_DENIED;
809 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
810 	rply.rjcted_rply.rj_why = why;
811 
812 	if (xprt->xp_pool->sp_rcache)
813 		replay_setreply(xprt->xp_pool->sp_rcache,
814 		    &rply, svc_getrpccaller(rqstp), NULL);
815 
816 	svc_sendreply_common(rqstp, &rply, NULL);
817 }
818 
819 /*
820  * Auth too weak error reply
821  */
822 void
823 svcerr_weakauth(struct svc_req *rqstp)
824 {
825 
826 	svcerr_auth(rqstp, AUTH_TOOWEAK);
827 }
828 
829 /*
830  * Program unavailable error reply
831  */
832 void
833 svcerr_noprog(struct svc_req *rqstp)
834 {
835 	SVCXPRT *xprt = rqstp->rq_xprt;
836 	struct rpc_msg rply;
837 
838 	rply.rm_xid = rqstp->rq_xid;
839 	rply.rm_direction = REPLY;
840 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
841 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
842 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
843 
844 	if (xprt->xp_pool->sp_rcache)
845 		replay_setreply(xprt->xp_pool->sp_rcache,
846 		    &rply, svc_getrpccaller(rqstp), NULL);
847 
848 	svc_sendreply_common(rqstp, &rply, NULL);
849 }
850 
851 /*
852  * Program version mismatch error reply
853  */
854 void
855 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
856 {
857 	SVCXPRT *xprt = rqstp->rq_xprt;
858 	struct rpc_msg rply;
859 
860 	rply.rm_xid = rqstp->rq_xid;
861 	rply.rm_direction = REPLY;
862 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
863 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
864 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
865 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
866 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
867 
868 	if (xprt->xp_pool->sp_rcache)
869 		replay_setreply(xprt->xp_pool->sp_rcache,
870 		    &rply, svc_getrpccaller(rqstp), NULL);
871 
872 	svc_sendreply_common(rqstp, &rply, NULL);
873 }
874 
875 /*
876  * Allocate a new server transport structure. All fields are
877  * initialized to zero and xp_p3 is initialized to point at an
878  * extension structure to hold various flags and authentication
879  * parameters.
880  */
881 SVCXPRT *
882 svc_xprt_alloc(void)
883 {
884 	SVCXPRT *xprt;
885 	SVCXPRT_EXT *ext;
886 
887 	xprt = mem_alloc(sizeof(SVCXPRT));
888 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
889 	xprt->xp_p3 = ext;
890 	refcount_init(&xprt->xp_refs, 1);
891 
892 	return (xprt);
893 }
894 
895 /*
896  * Free a server transport structure.
897  */
898 void
899 svc_xprt_free(SVCXPRT *xprt)
900 {
901 
902 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
903 	mem_free(xprt, sizeof(SVCXPRT));
904 }
905 
906 /* ******************* SERVER INPUT STUFF ******************* */
907 
908 /*
909  * Read RPC requests from a transport and queue them to be
910  * executed. We handle authentication and replay cache replies here.
911  * Actually dispatching the RPC is deferred till svc_executereq.
912  */
913 static enum xprt_stat
914 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
915 {
916 	SVCPOOL *pool = xprt->xp_pool;
917 	struct svc_req *r;
918 	struct rpc_msg msg;
919 	struct mbuf *args;
920 	struct svc_loss_callout *s;
921 	enum xprt_stat stat;
922 
923 	/* now receive msgs from xprtprt (support batch calls) */
924 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
925 
926 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
927 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
928 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
929 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
930 		enum auth_stat why;
931 
932 		/*
933 		 * Handle replays and authenticate before queuing the
934 		 * request to be executed.
935 		 */
936 		SVC_ACQUIRE(xprt);
937 		r->rq_xprt = xprt;
938 		if (pool->sp_rcache) {
939 			struct rpc_msg repmsg;
940 			struct mbuf *repbody;
941 			enum replay_state rs;
942 			rs = replay_find(pool->sp_rcache, &msg,
943 			    svc_getrpccaller(r), &repmsg, &repbody);
944 			switch (rs) {
945 			case RS_NEW:
946 				break;
947 			case RS_DONE:
948 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
949 				    repbody, &r->rq_reply_seq);
950 				if (r->rq_addr) {
951 					free(r->rq_addr, M_SONAME);
952 					r->rq_addr = NULL;
953 				}
954 				m_freem(args);
955 				goto call_done;
956 
957 			default:
958 				m_freem(args);
959 				goto call_done;
960 			}
961 		}
962 
963 		r->rq_xid = msg.rm_xid;
964 		r->rq_prog = msg.rm_call.cb_prog;
965 		r->rq_vers = msg.rm_call.cb_vers;
966 		r->rq_proc = msg.rm_call.cb_proc;
967 		r->rq_size = sizeof(*r) + m_length(args, NULL);
968 		r->rq_args = args;
969 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
970 			/*
971 			 * RPCSEC_GSS uses this return code
972 			 * for requests that form part of its
973 			 * context establishment protocol and
974 			 * should not be dispatched to the
975 			 * application.
976 			 */
977 			if (why != RPCSEC_GSS_NODISPATCH)
978 				svcerr_auth(r, why);
979 			goto call_done;
980 		}
981 
982 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
983 			svcerr_decode(r);
984 			goto call_done;
985 		}
986 
987 		/*
988 		 * Everything checks out, return request to caller.
989 		 */
990 		*rqstp_ret = r;
991 		r = NULL;
992 	}
993 call_done:
994 	if (r) {
995 		svc_freereq(r);
996 		r = NULL;
997 	}
998 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
999 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1000 			(*s->slc_dispatch)(xprt);
1001 		xprt_unregister(xprt);
1002 	}
1003 
1004 	return (stat);
1005 }
1006 
1007 static void
1008 svc_executereq(struct svc_req *rqstp)
1009 {
1010 	SVCXPRT *xprt = rqstp->rq_xprt;
1011 	SVCPOOL *pool = xprt->xp_pool;
1012 	int prog_found;
1013 	rpcvers_t low_vers;
1014 	rpcvers_t high_vers;
1015 	struct svc_callout *s;
1016 
1017 	/* now match message with a registered service*/
1018 	prog_found = FALSE;
1019 	low_vers = (rpcvers_t) -1L;
1020 	high_vers = (rpcvers_t) 0L;
1021 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1022 		if (s->sc_prog == rqstp->rq_prog) {
1023 			if (s->sc_vers == rqstp->rq_vers) {
1024 				/*
1025 				 * We hand ownership of r to the
1026 				 * dispatch method - they must call
1027 				 * svc_freereq.
1028 				 */
1029 				(*s->sc_dispatch)(rqstp, xprt);
1030 				return;
1031 			}  /* found correct version */
1032 			prog_found = TRUE;
1033 			if (s->sc_vers < low_vers)
1034 				low_vers = s->sc_vers;
1035 			if (s->sc_vers > high_vers)
1036 				high_vers = s->sc_vers;
1037 		}   /* found correct program */
1038 	}
1039 
1040 	/*
1041 	 * if we got here, the program or version
1042 	 * is not served ...
1043 	 */
1044 	if (prog_found)
1045 		svcerr_progvers(rqstp, low_vers, high_vers);
1046 	else
1047 		svcerr_noprog(rqstp);
1048 
1049 	svc_freereq(rqstp);
1050 }
1051 
1052 static void
1053 svc_checkidle(SVCGROUP *grp)
1054 {
1055 	SVCXPRT *xprt, *nxprt;
1056 	time_t timo;
1057 	struct svcxprt_list cleanup;
1058 
1059 	TAILQ_INIT(&cleanup);
1060 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1061 		/*
1062 		 * Only some transports have idle timers. Don't time
1063 		 * something out which is just waking up.
1064 		 */
1065 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1066 			continue;
1067 
1068 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1069 		if (time_uptime > timo) {
1070 			xprt_unregister_locked(xprt);
1071 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1072 		}
1073 	}
1074 
1075 	mtx_unlock(&grp->sg_lock);
1076 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1077 		SVC_RELEASE(xprt);
1078 	}
1079 	mtx_lock(&grp->sg_lock);
1080 }
1081 
1082 static void
1083 svc_assign_waiting_sockets(SVCPOOL *pool)
1084 {
1085 	SVCGROUP *grp;
1086 	SVCXPRT *xprt;
1087 	int g;
1088 
1089 	for (g = 0; g < pool->sp_groupcount; g++) {
1090 		grp = &pool->sp_groups[g];
1091 		mtx_lock(&grp->sg_lock);
1092 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1093 			if (xprt_assignthread(xprt))
1094 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1095 			else
1096 				break;
1097 		}
1098 		mtx_unlock(&grp->sg_lock);
1099 	}
1100 }
1101 
1102 static void
1103 svc_change_space_used(SVCPOOL *pool, long delta)
1104 {
1105 	unsigned long value;
1106 
1107 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1108 	if (delta > 0) {
1109 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1110 			pool->sp_space_throttled = TRUE;
1111 			pool->sp_space_throttle_count++;
1112 		}
1113 		if (value > pool->sp_space_used_highest)
1114 			pool->sp_space_used_highest = value;
1115 	} else {
1116 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1117 			pool->sp_space_throttled = FALSE;
1118 			svc_assign_waiting_sockets(pool);
1119 		}
1120 	}
1121 }
1122 
1123 static bool_t
1124 svc_request_space_available(SVCPOOL *pool)
1125 {
1126 
1127 	if (pool->sp_space_throttled)
1128 		return (FALSE);
1129 	return (TRUE);
1130 }
1131 
1132 static void
1133 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1134 {
1135 	SVCPOOL *pool = grp->sg_pool;
1136 	SVCTHREAD *st, *stpref;
1137 	SVCXPRT *xprt;
1138 	enum xprt_stat stat;
1139 	struct svc_req *rqstp;
1140 	struct proc *p;
1141 	long sz;
1142 	int error;
1143 
1144 	st = mem_alloc(sizeof(*st));
1145 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1146 	st->st_pool = pool;
1147 	st->st_xprt = NULL;
1148 	STAILQ_INIT(&st->st_reqs);
1149 	cv_init(&st->st_cond, "rpcsvc");
1150 
1151 	mtx_lock(&grp->sg_lock);
1152 
1153 	/*
1154 	 * If we are a new thread which was spawned to cope with
1155 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1156 	 */
1157 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1158 		grp->sg_state = SVCPOOL_ACTIVE;
1159 
1160 	while (grp->sg_state != SVCPOOL_CLOSING) {
1161 		/*
1162 		 * Create new thread if requested.
1163 		 */
1164 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1165 			grp->sg_state = SVCPOOL_THREADSTARTING;
1166 			grp->sg_lastcreatetime = time_uptime;
1167 			mtx_unlock(&grp->sg_lock);
1168 			svc_new_thread(grp);
1169 			mtx_lock(&grp->sg_lock);
1170 			continue;
1171 		}
1172 
1173 		/*
1174 		 * Check for idle transports once per second.
1175 		 */
1176 		if (time_uptime > grp->sg_lastidlecheck) {
1177 			grp->sg_lastidlecheck = time_uptime;
1178 			svc_checkidle(grp);
1179 		}
1180 
1181 		xprt = st->st_xprt;
1182 		if (!xprt) {
1183 			/*
1184 			 * Enforce maxthreads count.
1185 			 */
1186 			if (grp->sg_threadcount > grp->sg_maxthreads)
1187 				break;
1188 
1189 			/*
1190 			 * Before sleeping, see if we can find an
1191 			 * active transport which isn't being serviced
1192 			 * by a thread.
1193 			 */
1194 			if (svc_request_space_available(pool) &&
1195 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1196 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1197 				SVC_ACQUIRE(xprt);
1198 				xprt->xp_thread = st;
1199 				st->st_xprt = xprt;
1200 				continue;
1201 			}
1202 
1203 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1204 			if (ismaster || (!ismaster &&
1205 			    grp->sg_threadcount > grp->sg_minthreads))
1206 				error = cv_timedwait_sig(&st->st_cond,
1207 				    &grp->sg_lock, 5 * hz);
1208 			else
1209 				error = cv_wait_sig(&st->st_cond,
1210 				    &grp->sg_lock);
1211 			if (st->st_xprt == NULL)
1212 				LIST_REMOVE(st, st_ilink);
1213 
1214 			/*
1215 			 * Reduce worker thread count when idle.
1216 			 */
1217 			if (error == EWOULDBLOCK) {
1218 				if (!ismaster
1219 				    && (grp->sg_threadcount
1220 					> grp->sg_minthreads)
1221 					&& !st->st_xprt)
1222 					break;
1223 			} else if (error != 0) {
1224 				KASSERT(error == EINTR || error == ERESTART,
1225 				    ("non-signal error %d", error));
1226 				mtx_unlock(&grp->sg_lock);
1227 				p = curproc;
1228 				PROC_LOCK(p);
1229 				if (P_SHOULDSTOP(p) ||
1230 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1231 					thread_suspend_check(0);
1232 					PROC_UNLOCK(p);
1233 					mtx_lock(&grp->sg_lock);
1234 				} else {
1235 					PROC_UNLOCK(p);
1236 					svc_exit(pool);
1237 					mtx_lock(&grp->sg_lock);
1238 					break;
1239 				}
1240 			}
1241 			continue;
1242 		}
1243 		mtx_unlock(&grp->sg_lock);
1244 
1245 		/*
1246 		 * Drain the transport socket and queue up any RPCs.
1247 		 */
1248 		xprt->xp_lastactive = time_uptime;
1249 		do {
1250 			if (!svc_request_space_available(pool))
1251 				break;
1252 			rqstp = NULL;
1253 			stat = svc_getreq(xprt, &rqstp);
1254 			if (rqstp) {
1255 				svc_change_space_used(pool, rqstp->rq_size);
1256 				/*
1257 				 * See if the application has a preference
1258 				 * for some other thread.
1259 				 */
1260 				if (pool->sp_assign) {
1261 					stpref = pool->sp_assign(st, rqstp);
1262 					rqstp->rq_thread = stpref;
1263 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1264 					    rqstp, rq_link);
1265 					mtx_unlock(&stpref->st_lock);
1266 					if (stpref != st)
1267 						rqstp = NULL;
1268 				} else {
1269 					rqstp->rq_thread = st;
1270 					STAILQ_INSERT_TAIL(&st->st_reqs,
1271 					    rqstp, rq_link);
1272 				}
1273 			}
1274 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1275 		    && grp->sg_state != SVCPOOL_CLOSING);
1276 
1277 		/*
1278 		 * Move this transport to the end of the active list to
1279 		 * ensure fairness when multiple transports are active.
1280 		 * If this was the last queued request, svc_getreq will end
1281 		 * up calling xprt_inactive to remove from the active list.
1282 		 */
1283 		mtx_lock(&grp->sg_lock);
1284 		xprt->xp_thread = NULL;
1285 		st->st_xprt = NULL;
1286 		if (xprt->xp_active) {
1287 			if (!svc_request_space_available(pool) ||
1288 			    !xprt_assignthread(xprt))
1289 				TAILQ_INSERT_TAIL(&grp->sg_active,
1290 				    xprt, xp_alink);
1291 		}
1292 		mtx_unlock(&grp->sg_lock);
1293 		SVC_RELEASE(xprt);
1294 
1295 		/*
1296 		 * Execute what we have queued.
1297 		 */
1298 		mtx_lock(&st->st_lock);
1299 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1300 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1301 			mtx_unlock(&st->st_lock);
1302 			sz = (long)rqstp->rq_size;
1303 			svc_executereq(rqstp);
1304 			svc_change_space_used(pool, -sz);
1305 			mtx_lock(&st->st_lock);
1306 		}
1307 		mtx_unlock(&st->st_lock);
1308 		mtx_lock(&grp->sg_lock);
1309 	}
1310 
1311 	if (st->st_xprt) {
1312 		xprt = st->st_xprt;
1313 		st->st_xprt = NULL;
1314 		SVC_RELEASE(xprt);
1315 	}
1316 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1317 	mtx_destroy(&st->st_lock);
1318 	cv_destroy(&st->st_cond);
1319 	mem_free(st, sizeof(*st));
1320 
1321 	grp->sg_threadcount--;
1322 	if (!ismaster)
1323 		wakeup(grp);
1324 	mtx_unlock(&grp->sg_lock);
1325 }
1326 
1327 static void
1328 svc_thread_start(void *arg)
1329 {
1330 
1331 	svc_run_internal((SVCGROUP *) arg, FALSE);
1332 	kthread_exit();
1333 }
1334 
1335 static void
1336 svc_new_thread(SVCGROUP *grp)
1337 {
1338 	SVCPOOL *pool = grp->sg_pool;
1339 	struct thread *td;
1340 
1341 	mtx_lock(&grp->sg_lock);
1342 	grp->sg_threadcount++;
1343 	mtx_unlock(&grp->sg_lock);
1344 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1345 	    "%s: service", pool->sp_name);
1346 }
1347 
1348 void
1349 svc_run(SVCPOOL *pool)
1350 {
1351 	int g, i;
1352 	struct proc *p;
1353 	struct thread *td;
1354 	SVCGROUP *grp;
1355 
1356 	p = curproc;
1357 	td = curthread;
1358 	snprintf(td->td_name, sizeof(td->td_name),
1359 	    "%s: master", pool->sp_name);
1360 	pool->sp_state = SVCPOOL_ACTIVE;
1361 	pool->sp_proc = p;
1362 
1363 	/* Choose group count based on number of threads and CPUs. */
1364 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1365 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1366 	for (g = 0; g < pool->sp_groupcount; g++) {
1367 		grp = &pool->sp_groups[g];
1368 		grp->sg_minthreads = max(1,
1369 		    pool->sp_minthreads / pool->sp_groupcount);
1370 		grp->sg_maxthreads = max(1,
1371 		    pool->sp_maxthreads / pool->sp_groupcount);
1372 		grp->sg_lastcreatetime = time_uptime;
1373 	}
1374 
1375 	/* Starting threads */
1376 	pool->sp_groups[0].sg_threadcount++;
1377 	for (g = 0; g < pool->sp_groupcount; g++) {
1378 		grp = &pool->sp_groups[g];
1379 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1380 			svc_new_thread(grp);
1381 	}
1382 	svc_run_internal(&pool->sp_groups[0], TRUE);
1383 
1384 	/* Waiting for threads to stop. */
1385 	for (g = 0; g < pool->sp_groupcount; g++) {
1386 		grp = &pool->sp_groups[g];
1387 		mtx_lock(&grp->sg_lock);
1388 		while (grp->sg_threadcount > 0)
1389 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1390 		mtx_unlock(&grp->sg_lock);
1391 	}
1392 }
1393 
1394 void
1395 svc_exit(SVCPOOL *pool)
1396 {
1397 	SVCGROUP *grp;
1398 	SVCTHREAD *st;
1399 	int g;
1400 
1401 	pool->sp_state = SVCPOOL_CLOSING;
1402 	for (g = 0; g < pool->sp_groupcount; g++) {
1403 		grp = &pool->sp_groups[g];
1404 		mtx_lock(&grp->sg_lock);
1405 		if (grp->sg_state != SVCPOOL_CLOSING) {
1406 			grp->sg_state = SVCPOOL_CLOSING;
1407 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1408 				cv_signal(&st->st_cond);
1409 		}
1410 		mtx_unlock(&grp->sg_lock);
1411 	}
1412 }
1413 
1414 bool_t
1415 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1416 {
1417 	struct mbuf *m;
1418 	XDR xdrs;
1419 	bool_t stat;
1420 
1421 	m = rqstp->rq_args;
1422 	rqstp->rq_args = NULL;
1423 
1424 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1425 	stat = xargs(&xdrs, args);
1426 	XDR_DESTROY(&xdrs);
1427 
1428 	return (stat);
1429 }
1430 
1431 bool_t
1432 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1433 {
1434 	XDR xdrs;
1435 
1436 	if (rqstp->rq_addr) {
1437 		free(rqstp->rq_addr, M_SONAME);
1438 		rqstp->rq_addr = NULL;
1439 	}
1440 
1441 	xdrs.x_op = XDR_FREE;
1442 	return (xargs(&xdrs, args));
1443 }
1444 
1445 void
1446 svc_freereq(struct svc_req *rqstp)
1447 {
1448 	SVCTHREAD *st;
1449 	SVCPOOL *pool;
1450 
1451 	st = rqstp->rq_thread;
1452 	if (st) {
1453 		pool = st->st_pool;
1454 		if (pool->sp_done)
1455 			pool->sp_done(st, rqstp);
1456 	}
1457 
1458 	if (rqstp->rq_auth.svc_ah_ops)
1459 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1460 
1461 	if (rqstp->rq_xprt) {
1462 		SVC_RELEASE(rqstp->rq_xprt);
1463 	}
1464 
1465 	if (rqstp->rq_addr)
1466 		free(rqstp->rq_addr, M_SONAME);
1467 
1468 	if (rqstp->rq_args)
1469 		m_freem(rqstp->rq_args);
1470 
1471 	free(rqstp, M_RPC);
1472 }
1473